WIP - dasbhoard with all recent PRs branches

grdddj/ci_report_resolver
grdddj 11 months ago
parent 86c7862c09
commit 5f8bdfb2f4

@ -0,0 +1,3 @@
*.json
*.log
*.out

@ -0,0 +1,76 @@
# Deployed by:
# uvicorn app:app --reload --host 0.0.0.0 --port 8002
from __future__ import annotations
import time
from datetime import datetime
from pathlib import Path
from fastapi import FastAPI, HTTPException, Request
from fastapi.templating import Jinja2Templates
from starlette.responses import RedirectResponse
from cli import do_update_pulls
from common_all import get_logger
from github import load_cache_file
from gitlab import get_latest_infos_for_branch
HERE = Path(__file__).parent
log_file = HERE / "app.log"
logger = get_logger(__name__, log_file)
app = FastAPI()
templates = Jinja2Templates(directory="templates", trim_blocks=True, lstrip_blocks=True)
LAST_UPDATE_TS = 0
UPDATE_ALLOWED_EVERY_S = 30
@app.get("/branch/{branch_name:path}")
async def get_branch_info(branch_name: str):
try:
logger.info(f"Branch: {branch_name}")
info = get_latest_infos_for_branch(branch_name, find_status=True)
return {"info": info}
except Exception as e:
logger.exception(f"Error: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/dashboard")
async def get_dashboard(request: Request):
try:
logger.info("get_dashboard")
cached_info = load_cache_file()
last_update = cached_info["metadata"]["last_update"]
branches_dict = cached_info["branches"]
branches_list = sorted(
branches_dict.values(),
key=lambda branch_info: branch_info["pull_request_number"],
reverse=True,
)
branches_list = [branch for branch in branches_list if branch["job_infos"]]
for branch in branches_list:
branch[
"pr_link"
] = f"https://github.com/trezor/trezor-firmware/pull/{branch['pull_request_number']}"
return templates.TemplateResponse( # type: ignore
"dashboard.html",
{"request": request, "branches": branches_list, "last_update": last_update},
)
except Exception as e:
logger.exception(f"Error: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/update")
async def update_dashboard():
logger.info("update_dashboard")
global LAST_UPDATE_TS
if time.time() - LAST_UPDATE_TS > UPDATE_ALLOWED_EVERY_S:
do_update_pulls()
LAST_UPDATE_TS = time.time()
else:
time.sleep(5)
return RedirectResponse(url="/dashboard")

@ -0,0 +1,50 @@
from __future__ import annotations
import click
from github import update_cache, yield_recently_updated_gh_pr_branches
from gitlab import get_latest_infos_for_branch
@click.group()
def cli():
pass
@cli.command(name="branch")
@click.argument("branch", default="master")
@click.option("--no-status", is_flag=True, default=False)
def get_branch(branch: str, no_status: bool):
print(f"Getting links for branch: {branch}")
tests_info = get_latest_infos_for_branch(branch, not no_status)
for name, info in tests_info.items():
print(
f"{name}\n - LINK: {info.link}\n - STATUS: {info.status}\n - DIFF SCREENS: {info.diff_screens}"
)
def do_update_pulls():
new_branch_infos = list(yield_recently_updated_gh_pr_branches())
print(80 * "*")
print(f"Found {len(new_branch_infos)} new branches")
for branch in new_branch_infos:
print(f"Getting links for branch: {branch}")
try:
tests_info = get_latest_infos_for_branch(branch.name, True)
branch.job_infos = tests_info
except Exception as e:
print(f"Failed to get links for branch: {branch.name}")
print(e)
branch_dict = {branch.name: branch for branch in new_branch_infos}
update_cache(branch_dict)
@cli.command(name="pulls")
def update_pulls():
do_update_pulls()
if __name__ == "__main__":
cli()

@ -0,0 +1,37 @@
from __future__ import annotations
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Any
AnyDict = dict[Any, Any]
@dataclass
class BranchInfo:
name: str
pull_request_number: int
pull_request_name: str
last_commit_sha: str
last_commit_timestamp: int
last_commit_datetime: str
job_infos: dict[str, JobInfo]
@dataclass
class JobInfo:
name: str
link: str
status: str | None = None
diff_screens: int | None = None
def get_logger(name: str, log_file_path: str | Path) -> logging.Logger:
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
log_handler = logging.FileHandler(log_file_path)
log_formatter = logging.Formatter("%(asctime)s %(message)s")
log_handler.setFormatter(log_formatter)
logger.addHandler(log_handler)
return logger

@ -0,0 +1,97 @@
from __future__ import annotations
import json
import os
from dataclasses import asdict
from datetime import datetime
from pathlib import Path
from typing import Iterator
import requests
from common_all import AnyDict, BranchInfo
HERE = Path(__file__).parent
GITHUB_PR_API = "https://api.github.com/repos/trezor/trezor-firmware/pulls"
GH_TOKEN = os.getenv("GH_TOKEN")
GH_HEADERS = {"Authorization": f"token {GH_TOKEN}"} if GH_TOKEN else {}
def load_cache_file() -> AnyDict:
return json.loads(CACHE_FILE.read_text())
def load_branches_cache() -> dict[str, BranchInfo]:
cache_dict = load_cache_file()["branches"]
return {key: BranchInfo(**value) for key, value in cache_dict.items()}
def update_cache(cache_dict: dict[str, BranchInfo]) -> None:
CACHE.update(cache_dict)
json_writable_cache_dict = {key: asdict(value) for key, value in CACHE.items()}
content = {
"branches": json_writable_cache_dict,
"metadata": {
"last_update_timestamp": int(datetime.now().timestamp()),
"last_update": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
},
}
CACHE_FILE.write_text(json.dumps(content, indent=2))
CACHE_FILE = HERE / "github_cache.json"
if not CACHE_FILE.exists():
CACHE_FILE.write_text("{}")
CACHE: dict[str, BranchInfo] = load_branches_cache()
def get_commit_ts(commit_hash: str) -> int:
res = requests.get(
f"https://api.github.com/repos/trezor/trezor-firmware/commits/{commit_hash}",
headers=GH_HEADERS,
)
res.raise_for_status()
return int(
datetime.fromisoformat(
res.json()["commit"]["committer"]["date"].replace("Z", "")
).timestamp()
)
def get_all_gh_pulls() -> list[AnyDict]:
res = requests.get(GITHUB_PR_API, headers=GH_HEADERS)
res.raise_for_status()
return res.json()
def yield_recently_updated_gh_pr_branches() -> Iterator[BranchInfo]:
for pr in get_all_gh_pulls():
last_commit_sha = pr["head"]["sha"]
branch_name = pr["head"]["ref"]
print(f"Getting branch {branch_name}")
# Skip when we already have this commit in cache
if branch_name in CACHE:
cache_info = CACHE[branch_name]
if cache_info.last_commit_sha == last_commit_sha:
print(f"Skipping, commit did not change - {last_commit_sha}")
continue
# It can come from a fork - we do not have UI tests for it
if branch_name == "master":
print("Ignoring a fork")
continue
last_commit_timestamp = get_commit_ts(last_commit_sha)
last_commit_datetime = datetime.fromtimestamp(last_commit_timestamp).isoformat()
yield BranchInfo(
name=branch_name,
pull_request_number=pr["number"],
pull_request_name=pr["title"],
last_commit_sha=last_commit_sha,
last_commit_timestamp=last_commit_timestamp,
last_commit_datetime=last_commit_datetime,
job_infos={},
)

@ -1,19 +1,36 @@
from __future__ import annotations
import requests
import json
from functools import lru_cache
from pathlib import Path
from typing import Callable, Iterator
import click
import requests
from common_all import AnyDict, JobInfo
HERE = Path(__file__).parent
BRANCHES_API_TEMPLATE = "https://gitlab.com/satoshilabs/trezor/trezor-firmware/-/pipelines.json?scope=branches&page={}"
GRAPHQL_API = "https://gitlab.com/api/graphql"
SCREEN_AMOUNT_CACHE_FILE = HERE / "gitlab_cache.json"
if not SCREEN_AMOUNT_CACHE_FILE.exists():
SCREEN_AMOUNT_CACHE_FILE.write_text("{}")
BRANCH_CACHE: dict[str, int] = json.loads(SCREEN_AMOUNT_CACHE_FILE.read_text())
def get_gitlab_branches(page: int) -> list[dict]:
def update_branch_cache(link: str, amount: int) -> None:
BRANCH_CACHE[link] = amount
SCREEN_AMOUNT_CACHE_FILE.write_text(json.dumps(BRANCH_CACHE, indent=2))
@lru_cache(maxsize=32)
def get_gitlab_branches(page: int) -> list[AnyDict]:
return requests.get(BRANCHES_API_TEMPLATE.format(page)).json()["pipelines"]
def get_branch_obj(branch_name: str) -> dict:
def get_branch_obj(branch_name: str) -> AnyDict:
# Trying first 10 pages of branches
for page in range(1, 11):
if page > 1:
@ -24,12 +41,7 @@ def get_branch_obj(branch_name: str) -> dict:
raise ValueError(f"Branch {branch_name} not found")
def get_last_pipeline_id(branch_name: str) -> int:
branch_obj = get_branch_obj(branch_name)
return branch_obj["id"]
def get_pipeline_jobs_info(pipeline_iid: int) -> dict:
def get_pipeline_jobs_info(pipeline_iid: int) -> AnyDict:
query = {
"query": "fragment CiNeeds on JobNeedUnion {\n ...CiBuildNeedFields\n ...CiJobNeedFields\n}\n\nfragment CiBuildNeedFields on CiBuildNeed {\n id\n name\n}\n\nfragment CiJobNeedFields on CiJob {\n id\n name\n}\n\nfragment LinkedPipelineData on Pipeline {\n __typename\n id\n iid\n path\n cancelable\n retryable\n userPermissions {\n updatePipeline\n }\n status: detailedStatus {\n __typename\n id\n group\n label\n icon\n }\n sourceJob {\n __typename\n id\n name\n retried\n }\n project {\n __typename\n id\n name\n fullPath\n }\n}\n\nquery getPipelineDetails($projectPath: ID!, $iid: ID!) {\n project(fullPath: $projectPath) {\n __typename\n id\n pipeline(iid: $iid) {\n __typename\n id\n iid\n complete\n usesNeeds\n userPermissions {\n updatePipeline\n }\n downstream {\n __typename\n nodes {\n ...LinkedPipelineData\n }\n }\n upstream {\n ...LinkedPipelineData\n }\n stages {\n __typename\n nodes {\n __typename\n id\n name\n status: detailedStatus {\n __typename\n id\n action {\n __typename\n id\n icon\n path\n title\n }\n }\n groups {\n __typename\n nodes {\n __typename\n id\n status: detailedStatus {\n __typename\n id\n label\n group\n icon\n }\n name\n size\n jobs {\n __typename\n nodes {\n __typename\n id\n name\n kind\n scheduledAt\n needs {\n __typename\n nodes {\n __typename\n id\n name\n }\n }\n previousStageJobsOrNeeds {\n __typename\n nodes {\n ...CiNeeds\n }\n }\n status: detailedStatus {\n __typename\n id\n icon\n tooltip\n hasDetails\n detailsPath\n group\n label\n action {\n __typename\n id\n buttonTitle\n icon\n path\n title\n }\n }\n }\n }\n }\n }\n }\n }\n }\n }\n}\n",
"variables": {
@ -58,7 +70,7 @@ def get_jobs_of_interests() -> list[tuple[str, Callable[[str], str]]]:
]
def yield_pipeline_jobs(pipeline_iid: int) -> Iterator[dict]:
def yield_pipeline_jobs(pipeline_iid: int) -> Iterator[AnyDict]:
jobs_info = get_pipeline_jobs_info(pipeline_iid)
stages = jobs_info["data"]["project"]["pipeline"]["stages"]["nodes"]
for stage in stages:
@ -69,29 +81,47 @@ def yield_pipeline_jobs(pipeline_iid: int) -> Iterator[dict]:
yield job
def get_latest_links_for_branch(branch_name: str) -> dict[str, str]:
branch_obj = get_branch_obj(branch_name)
pipeline_iid = branch_obj["iid"]
def get_status_from_link(job: AnyDict, link: str) -> tuple[str, int]:
if job["status"]["label"] == "skipped":
return "SKIPPED", 0
links: dict[str, str] = {}
if link in BRANCH_CACHE:
return "OK", BRANCH_CACHE[link]
for job in yield_pipeline_jobs(pipeline_iid):
for job_of_interest, func in get_jobs_of_interests():
if job["name"] == job_of_interest:
job_id = job["id"].split("/")[-1]
links[job["name"]] = func(job_id)
res = requests.get(link)
status = res.status_code
if status == 200:
row_identifier = 'bgcolor="red"'
diff_screens = res.text.count(row_identifier)
update_branch_cache(link, diff_screens)
return "OK", diff_screens
else:
return "NOT YET AVAILABLE", 0
return links
def get_job_info(job: AnyDict, link: str, find_status: bool = True) -> JobInfo:
if find_status:
status, diff_screens = get_status_from_link(job, link)
else:
status, diff_screens = None, None
@click.command()
@click.argument("branch", default="master")
def main(branch: str):
print(f"Getting links for branch: {branch}")
links = get_latest_links_for_branch(branch)
for name, link in links.items():
print(f"{name}: {link}")
return JobInfo(
name=job["name"], link=link, status=status, diff_screens=diff_screens
)
def get_latest_infos_for_branch(
branch_name: str, find_status: bool
) -> dict[str, JobInfo]:
branch_obj = get_branch_obj(branch_name)
pipeline_iid = branch_obj["iid"]
def yield_key_value() -> Iterator[tuple[str, JobInfo]]:
for job in yield_pipeline_jobs(pipeline_iid):
for job_of_interest, link_func in get_jobs_of_interests():
if job["name"] == job_of_interest:
job_id = job["id"].split("/")[-1]
link = link_func(job_id)
yield job["name"], get_job_info(job, link, find_status)
if __name__ == "__main__":
main()
return dict(yield_key_value())

@ -0,0 +1,51 @@
<!DOCTYPE html>
<html>
<head>
<title>Dashboard</title>
<style>
table,
th,
td {
border: 1px solid black;
}
</style>
</head>
<body>
<h1>UI diff dashboard</h1>
<h3>Last Update: {{ last_update }}</h3>
<form action="/update" method="get">
<button type="submit">Update</button>
</form>
<p id="confirmation-message" style="display: none;color: red">Please wait a moment, the page will refresh. Updates
are allowed only every 30 seconds.</p>
<script>
document.querySelector('form').addEventListener('submit', function () {
document.getElementById('confirmation-message').style.display = 'block';
});
</script>
<hr>
{% for branch in branches %}
<p><b>PR:</b> <a href="{{ branch['pr_link'] }}" target="_blank">{{ branch["pull_request_name"] }}</a></p>
<p><b>Branch:</b> {{ branch["name"] }}</p>
<p><b>Last commit:</b> {{ branch["last_commit_datetime"] }}</p>
<table>
<tr>
<th>Test</th>
<th>Diff screens</th>
</tr>
{% for job in branch["job_infos"].values() %}
<tr style="{% if job.diff_screens > 0 %}background-color: red;{% endif %}">
<td><a href="{{ job['link'] }}" target="_blank">{{ job["name"] }}</a></td>
<td>{{ job["diff_screens"] }}</td>
</tr>
{% endfor %}
</table>
<br>
<hr>
{% endfor %}
</body>
</html>
Loading…
Cancel
Save