chore(core): add prodtest provisioning script

[no changelog]
pull/3654/head
matejcik 7 months ago committed by Andrew Kozlik
parent d7c1221293
commit a9afaee8a9

@ -0,0 +1,208 @@
from __future__ import annotations
import os
import secrets
from dataclasses import dataclass
from typing import Any
from typing_extensions import Self
import click
import requests
import serial
SERVER_TOKEN = ""
SERVER_URL = "http://localhost:8000/provision"
@dataclass
class ProvisioningResult:
device_cert: bytes
fido_privkey: bytes
fido_cert: bytes
production: bool
@classmethod
def from_json(cls, json: dict[str, Any]) -> Self:
return cls(
device_cert=bytes.fromhex(json["device_cert"]),
fido_privkey=bytes.fromhex(json["fido_privkey"]),
fido_cert=bytes.fromhex(json["fido_cert"]),
production=json["production"],
)
def write(self, connection: Connection) -> None:
connection.command("CERTDEV WRITE", self.device_cert)
cert_dev = connection.command("CERTDEV READ")
if cert_dev != self.device_cert:
print("Device certificate mismatch")
print("Expected:", self.device_cert)
print("Got: ", cert_dev)
assert cert_dev == self.device_cert
connection.command("CERTFIDO WRITE", self.fido_cert)
cert_fido = connection.command("CERTFIDO READ")
assert cert_fido == self.fido_cert
connection.command("KEYFIDO WRITE", self.fido_privkey)
key_fido = connection.command("KEYFIDO READ")
assert key_fido is not None
assert key_fido in self.fido_cert
@dataclass
class DeviceInfo:
optiga_id: bytes
cpu_id: bytes
device_cert: bytes
@classmethod
def read(cls, connection: Connection) -> Self:
cpu_id = connection.command("CPUID READ")
optiga_id = connection.command("OPTIGAID READ")
cert_bytes = connection.command("CERTINF READ")
assert optiga_id is not None
assert cpu_id is not None
assert cert_bytes is not None
return cls(optiga_id, cpu_id, cert_bytes)
class ProdtestException(Exception):
def __init__(self, text: str) -> None:
super().__init__(text)
self.text = text
class Connection:
def __init__(self, path: str = "/dev/ttyACM0") -> None:
self.connection = serial.Serial(path, 115200, timeout=5)
def readline(self) -> bytes:
line = self.connection.readline().strip()
line_str = line.decode()
if len(line_str) > 100:
line_str = line_str[:100] + "..."
print("<<<", line_str)
return line
def writeline(self, data: bytes) -> None:
data_str = data.decode()
if len(data_str) > 100:
print(">>>", data_str[:100] + "...")
else:
print(">>>", data_str)
for byte in data:
if byte < 32 or byte > 126:
print("!!!", byte, "is not printable")
continue
self.connection.write(bytes([byte]))
echo = self.connection.read(1)
assert echo[0] == byte
self.connection.write(b"\r")
assert self.connection.read(2) == b"\r\n"
# self.connection.write(data + b"\r")
# echo = self.connection.read(len(data) + 2)
# print(len(echo), len(data) + 2)
# assert echo[:-2] == data
# assert echo[-2:] == b"\r\n"
def command(self, cmd: str, *args: Any) -> bytes | None:
cmd_line = cmd
for arg in args:
if isinstance(arg, bytes):
cmd_line += " " + arg.hex()
else:
cmd_line += " " + str(arg)
self.writeline(cmd_line.encode())
res = self.readline()
if res.startswith(b"ERROR"):
error_text = res[len(b"ERROR ") :].decode()
raise ProdtestException(error_text)
if not res.startswith(b"OK"):
raise ProdtestException("Unexpected response: " + res.decode())
res_arg = res[len(b"OK ") :]
if not res_arg:
return None
try:
return bytes.fromhex(res_arg.decode())
except ValueError:
return res_arg
def provision_request(
device: DeviceInfo, url: str, verify: bool = True
) -> ProvisioningResult:
request = {
"tester_id": SERVER_TOKEN,
"run_id": secrets.token_hex(16),
"optiga_id": device.optiga_id.hex(),
"cpu_id": device.cpu_id.hex(),
"cert": device.device_cert.hex(),
"model": "T2B1",
}
resp = requests.post(url, json=request, verify=verify)
if resp.status_code == 400:
print("Server returned error:", resp.text)
resp.raise_for_status()
resp_json = resp.json()
return ProvisioningResult.from_json(resp_json)
@click.group()
def cli() -> None:
pass
@cli.command()
def identify() -> None:
connection = Connection()
connection.command("PING")
DeviceInfo.read(connection)
@cli.command()
@click.option("--wipe", is_flag=True, help="Wipe the device")
def lock(wipe) -> None:
connection = Connection()
connection.command("PING")
connection.command("LOCK")
if wipe:
connection.command("WIPE")
@cli.command()
@click.option("-u", "--url", default=SERVER_URL, help="Server URL")
@click.option("-d", "--device", default="/dev/ttyACM0", help="Device path")
@click.option(
"--no-verify", is_flag=True, help="Disable server certificate verification"
)
@click.option(
"--lock/--no-lock", default=True, help="Lock the device after provisioning"
)
def provision(url, device, no_verify, lock) -> None:
global SERVER_TOKEN
SERVER_TOKEN = os.environ.get("SERVER_TOKEN")
if SERVER_TOKEN is None:
raise click.ClickException("SERVER_TOKEN environment variable is not set")
connection = Connection(device)
# test the connection
connection.command("PING")
# grab CPUID, OPTIGAID and device certificate
device = DeviceInfo.read(connection)
# call the provisioning server
result = provision_request(device, url, not no_verify)
# write provisioning result to the device
result.write(connection)
if lock:
connection.command("LOCK")
connection.command("WIPE")
if __name__ == "__main__":
cli()
Loading…
Cancel
Save