1
0
mirror of https://github.com/trezor/trezor-firmware.git synced 2025-02-02 02:41:28 +00:00
trezor-firmware/core/tools/provision_device.py

210 lines
6.3 KiB
Python
Raw Normal View History

from __future__ import annotations
import os
import secrets
from dataclasses import dataclass
from typing import Any
from typing_extensions import Self
import click
import requests
import serial
import shlex
SERVER_TOKEN = ""
SERVER_URL = "http://localhost:8000/provision"
@dataclass
class ProvisioningResult:
device_cert: bytes
fido_privkey: bytes
fido_cert: bytes
production: bool
@classmethod
def from_json(cls, json: dict[str, Any]) -> Self:
return cls(
device_cert=bytes.fromhex(json["device_cert"]),
fido_privkey=bytes.fromhex(json["fido_privkey"]),
fido_cert=bytes.fromhex(json["fido_cert"]),
production=json["production"],
)
def write(self, connection: Connection) -> None:
connection.command("optiga-certdev-write", self.device_cert)
cert_dev = connection.command("optiga-certdev-read")
if cert_dev != self.device_cert:
print("Device certificate mismatch")
print("Expected:", self.device_cert)
print("Got: ", cert_dev)
assert cert_dev == self.device_cert
connection.command("optiga-certfido-write", self.fido_cert)
cert_fido = connection.command("optiga-certfido-read")
assert cert_fido == self.fido_cert
connection.command("optiga-keyfido-write", self.fido_privkey)
key_fido = connection.command("optiga-keyfido-read")
assert key_fido is not None
assert key_fido in self.fido_cert
@dataclass
class DeviceInfo:
optiga_id: bytes
cpu_id: bytes
device_cert: bytes
@classmethod
def read(cls, connection: Connection) -> Self:
cpu_id = connection.command("get-cpuid")
optiga_id = connection.command("optiga-id-read")
cert_bytes = connection.command("optiga-certinf-read")
assert optiga_id is not None
assert cpu_id is not None
assert cert_bytes is not None
return cls(optiga_id, cpu_id, cert_bytes)
class ProdtestException(Exception):
def __init__(self, text: str) -> None:
super().__init__(text)
self.text = text
class Connection:
def __init__(self, path: str = "/dev/ttyACM0") -> None:
self.connection = serial.Serial(path, 115200, timeout=5)
def readline(self) -> bytes:
line = self.connection.readline().strip()
line_str = line.decode()
if len(line_str) > 100:
line_str = line_str[:100] + "..."
print("<<<", line_str)
return line
def writeline(self, data: bytes) -> None:
data_str = data.decode()
if len(data_str) > 100:
print(">>>", data_str[:100] + "...")
else:
print(">>>", data_str)
for byte in data:
if byte < 32 or byte > 126:
print("!!!", byte, "is not printable")
continue
self.connection.write(bytes([byte]))
self.connection.write(b"\r")
def command(self, cmd: str, *args: Any) -> bytes | None:
cmd_line = cmd
for arg in args:
if isinstance(arg, bytes):
cmd_line += " " + arg.hex()
else:
cmd_line += " " + str(arg)
self.writeline(cmd_line.encode())
while True:
res = self.readline()
if res.startswith(b"ERROR"):
error_args = res[len(b"ERROR ") :].decode()
parts = shlex.split(error_args)
error_text = parts[0] # error code
if len(parts) > 1:
error_text = parts[1] # error description
raise ProdtestException(error_text)
elif res.startswith(b"OK"):
res_arg = res[len(b"OK ") :]
if not res_arg:
return None
try:
return bytes.fromhex(res_arg.decode())
except ValueError:
return res_arg
elif not res.startswith(b"#"):
raise ProdtestException("Unexpected response: " + res.decode())
def provision_request(
device: DeviceInfo, url: str, model: str, verify: bool = True
) -> ProvisioningResult:
request = {
"tester_id": SERVER_TOKEN,
"run_id": secrets.token_hex(16),
"optiga_id": device.optiga_id.hex(),
"cpu_id": device.cpu_id.hex(),
"cert": device.device_cert.hex(),
"model": model,
}
resp = requests.post(url + '/provision', json=request, verify=verify)
if resp.status_code == 400:
print("Server returned error:", resp.text)
resp.raise_for_status()
resp_json = resp.json()
return ProvisioningResult.from_json(resp_json)
@click.group()
def cli() -> None:
pass
@cli.command()
@click.option("-d", "--device", default="/dev/ttyACM0", help="Device path")
def identify(device) -> None:
connection = Connection(device)
connection.command("ping")
DeviceInfo.read(connection)
@cli.command()
@click.option("-d", "--device", default="/dev/ttyACM0", help="Device path")
@click.option("--wipe", is_flag=True, help="Wipe the device")
def lock(device, wipe) -> None:
connection = Connection(device)
connection.command("ping")
connection.command("optiga-lock")
if wipe:
connection.command("prodtest-wipe")
@cli.command()
@click.option("-u", "--url", default=SERVER_URL, help="Server URL")
@click.option("-d", "--device", default="/dev/ttyACM0", help="Device path")
@click.option("-m", "--model", help="Device path")
@click.option(
"--no-verify", is_flag=True, help="Disable server certificate verification"
)
@click.option(
"--lock/--no-lock", default=True, help="Lock the device after provisioning"
)
def provision(url, device, model, no_verify, lock) -> None:
global SERVER_TOKEN
SERVER_TOKEN = os.environ.get("SERVER_TOKEN")
if SERVER_TOKEN is None:
raise click.ClickException("SERVER_TOKEN environment variable is not set")
connection = Connection(device)
# test the connection
connection.command("ping")
# grab CPUID, OPTIGAID and device certificate
device = DeviceInfo.read(connection)
# call the provisioning server
result = provision_request(device, url, model, not no_verify)
# write provisioning result to the device
result.write(connection)
if lock:
connection.command("optiga-lock")
connection.command("prodtest-wipe")
if __name__ == "__main__":
cli()