WIP - count with header in translations blob

tychovrahe/fw_translations/mpu
grdddj 8 months ago
parent 1108016fbb
commit 359605dd4e

@ -13,10 +13,6 @@
// const VALUE2: &'static str = "World!";
// }
/// The first entry is the language identifier
pub const LANGUAGE_INDEX: usize = 0;
const TRANSLATIONS_OFFSET: usize = 1;
#[rustfmt::skip]
#[allow(non_snake_case)]
pub struct TranslationsGeneral {
@ -840,7 +836,7 @@ impl TranslationsGeneral {
}
pub fn get_position(&self, key: &str) -> Option<usize> {
self.get_info(key).map(|(_, pos)| pos + TRANSLATIONS_OFFSET)
self.get_info(key).map(|(_, pos)| pos)
}
fn get_info(&self, key: &str) -> Option<(&'static str, usize)> {

@ -39,10 +39,6 @@ en_keys = get_all_json_keys(en_data)
if cs_keys != en_keys:
raise ValueError("cs.json and en.json have different keys")
%>\
/// The first entry is the language identifier
pub const LANGUAGE_INDEX: usize = 0;
const TRANSLATIONS_OFFSET: usize = 1;
#[rustfmt::skip]
#[allow(non_snake_case)]
pub struct TranslationsGeneral {
@ -58,7 +54,7 @@ impl TranslationsGeneral {
}
pub fn get_position(&self, key: &str) -> Option<usize> {
self.get_info(key).map(|(_, pos)| pos + TRANSLATIONS_OFFSET)
self.get_info(key).map(|(_, pos)| pos)
}
fn get_info(&self, key: &str) -> Option<(&'static str, usize)> {

@ -6,7 +6,6 @@ mod general;
mod micropython;
use en::EN_TRANSLATIONS;
use general::LANGUAGE_INDEX;
use crate::trezorhal::translations::translations_get;
@ -15,6 +14,7 @@ use core::str;
// Translations strings are delimited by a star
const DELIMITER_BYTE: u8 = 0x00;
const TERMINATE_BYTE: u8 = 0xFF;
const HEADER_LEN: usize = 256;
/// Translation function for Rust.
pub fn tr(key: &str) -> &'static str {
@ -23,7 +23,14 @@ pub fn tr(key: &str) -> &'static str {
/// Get the language name.
fn get_language_name() -> Option<&'static str> {
get_translation_by_index(LANGUAGE_INDEX)
// TODO: create a parser struct for this
let language = &translations_get()[4 + 16..4 + 16 + 32];
for (i, &byte) in language.iter().enumerate() {
if byte == DELIMITER_BYTE {
return str::from_utf8(&language[..i]).ok();
}
}
None
}
/// Try to find the translation in flash (for a non-english language).
@ -55,15 +62,15 @@ fn get_translation_by_index(index: usize) -> Option<&'static str> {
let mut current_index = 0;
let mut chunk_start = 0;
let data_buffer = translations_get();
let translations_data = &translations_get()[HEADER_LEN..];
for (i, &byte) in data_buffer.iter().enumerate() {
for (i, &byte) in translations_data.iter().enumerate() {
if byte == TERMINATE_BYTE {
return None;
}
if byte == DELIMITER_BYTE {
if current_index == index {
return str::from_utf8(&data_buffer[chunk_start..i]).ok();
return str::from_utf8(&translations_data[chunk_start..i]).ok();
}
chunk_start = i + 1;
current_index += 1;

@ -1,65 +1,131 @@
from micropython import const
from typing import TYPE_CHECKING
from trezor.wire import DataError
if TYPE_CHECKING:
from trezor.messages import ChangeLanguage, Success, TranslationDataAck
from trezor.messages import ChangeLanguage, Success
_CHUNK_SIZE = const(1024)
_DELIMITER = b"\x00"
_HEADER_SIZE = const(256)
_FILL_BYTE = b"\x00"
class TranslationsHeader:
MAGIC = b"TRTR"
LANG_LEN = 32
VERSION_LEN = 16
DATA_HASH_LEN = 32
def __init__(
self,
language: str,
version: str,
data_length: int,
items_num: int,
data_hash: bytes,
):
self.language = language
self.version = version
self.data_length = data_length
self.items_num = items_num
self.data_hash = data_hash
@classmethod
def from_bytes(cls, data: bytes) -> "TranslationsHeader":
from trezor.utils import BufferReader
from apps.common import readers
if len(data) != _HEADER_SIZE:
raise DataError("Invalid header length")
try:
r = BufferReader(data)
magic = r.read(len(cls.MAGIC))
if magic != cls.MAGIC:
raise DataError("Invalid header magic")
version = r.read(cls.VERSION_LEN).rstrip(_FILL_BYTE).decode()
language = r.read(cls.LANG_LEN).rstrip(_FILL_BYTE).decode()
data_length = readers.read_uint16_le(r)
items_num = readers.read_uint16_le(r)
data_hash = r.read(cls.DATA_HASH_LEN)
# Rest must be empty bytes
for b in r.read():
if b != 0:
raise DataError("Invalid header data")
return cls(
language=language,
version=version,
data_length=data_length,
items_num=items_num,
data_hash=data_hash,
)
except EOFError:
raise DataError("Invalid header data")
async def change_language(msg: ChangeLanguage) -> Success:
import storage.device as storage_device
import storage.translations as storage_translations
from trezor import wire
from trezor import translations
from trezor.messages import Success
language = msg.language # local_cache_attribute
data_length = msg.data_length # local_cache_attribute
if len(language) > storage_device.LANGUAGE_MAXLENGTH:
raise wire.DataError("Language identifier too long")
# When empty data, reverting the language to default (english)
if data_length == 0:
await _require_confirm_change_language("")
translations.wipe()
return Success(message="Language reverted to default")
if _DELIMITER.decode() in language:
raise wire.DataError(f"Language name contains delimiter '{_DELIMITER}'")
if data_length > translations.DATA_MAXLENGTH:
raise DataError("Translations too long")
if data_length < _HEADER_SIZE:
raise DataError("Translations too short")
if data_length > storage_translations.TRANSLATIONS_MAXLENGTH:
raise wire.DataError("Translations too long")
data_left = data_length
offset = 0
# When empty data, reverting the language to default (english)
if data_length == 0:
language = ""
# Getting and parsing the header
header_data = await get_data_chunk(_HEADER_SIZE, offset)
header = TranslationsHeader.from_bytes(header_data)
if header.data_length + _HEADER_SIZE != data_length:
raise DataError("Invalid header data length")
# TODO: verify the hash of the data (get all of them and hash them)
# TODO: verify the header signature (signature of sha256(header))
# Confirm with user and wipe old data
await _require_confirm_change_language(header.language)
translations.wipe()
await _require_confirm_change_language(language)
storage_translations.wipe()
# Write the header
translations.write(header_data, offset)
offset += len(header_data)
data_left -= len(header_data)
# Requesting the data in chunks and saving them
if data_length > 0:
offset = 0
data_left = data_length
# Store the language name as the first item
# (Done so that we can get the language name even after device/storage is wiped)
language_entry = language.encode() + _DELIMITER
storage_translations.write(language_entry, offset)
offset += len(language_entry)
while data_left > 0:
resp = await send_request_chunk(data_left)
data_left -= len(resp.data_chunk)
storage_translations.write(resp.data_chunk, offset)
offset += len(resp.data_chunk)
storage_device.set_language(language)
while data_left > 0:
data_chunk = await get_data_chunk(data_left, offset)
translations.write(data_chunk, offset)
data_left -= len(data_chunk)
offset += len(data_chunk)
return Success(message="Language changed")
async def send_request_chunk(data_left: int) -> TranslationDataAck:
async def get_data_chunk(data_left: int, offset: int) -> bytes:
from trezor.messages import TranslationDataAck, TranslationDataRequest
from trezor.wire.context import call
req = TranslationDataRequest()
req.data_length = min(data_left, _CHUNK_SIZE)
return await call(req, TranslationDataAck)
data_length = min(data_left, _CHUNK_SIZE)
req = TranslationDataRequest(data_length=data_length, data_offset=offset)
res = await call(req, TranslationDataAck)
return res.data_chunk
async def _require_confirm_change_language(language: str) -> None:

@ -204,38 +204,27 @@ def label(client: "TrezorClient", label: str) -> str:
@cli.command()
@click.option("-l", "--language")
@click.option(
"-f", "--file", type=click.File("r"), help="Language JSON file with translations."
)
@click.option("-u", "--url", help="Link to translation JSON file.")
@click.option("-r", "--remove", is_flag=True, help="Switch back to english.")
@with_client
def language(
client: "TrezorClient", language: str, file: TextIO, url: str, remove: bool
) -> str:
def language(client: "TrezorClient", file: TextIO, url: str, remove: bool) -> str:
"""Set new language with translations."""
if file and url:
raise click.ClickException("Please provide only one of -f or -u")
if remove:
language_data = b""
language = ""
else:
if file:
language_data = translations.blob_from_file(file)
lang_guess = file.name.split("/")[-1].split(".")[0]
elif url:
language_data = translations.blob_from_url(url)
lang_guess = url.split("/")[-1].split(".")[0]
else:
raise click.ClickException("Please provide either -f or -u")
if not language:
click.echo(f"Language not specified, guessing {lang_guess}")
language = lang_guess
return device.change_language(
client, language=language, language_data=language_data
)
return device.change_language(client, language_data=language_data)
@cli.command()

@ -67,19 +67,16 @@ def apply_settings(
@session
def change_language(
client: "TrezorClient",
language: str,
language_data: bytes,
) -> "MessageType":
msg = messages.ChangeLanguage(
language=language,
data_length=len(language_data),
)
msg = messages.ChangeLanguage(data_length=len(language_data))
response = client.call(msg)
while not isinstance(response, messages.Success):
assert isinstance(response, messages.TranslationDataRequest)
data_length = response.data_length
language_data, chunk = language_data[data_length:], language_data[:data_length]
data_offset = response.data_offset
chunk = language_data[data_offset : data_offset + data_length]
response = client.call(messages.TranslationDataAck(data_chunk=chunk))
assert isinstance(response, messages.Success)

@ -1,26 +1,54 @@
import json
from typing import Dict, List, TextIO, Tuple
import struct
from hashlib import sha256
from typing import Any, Dict, List, TextIO, Tuple
import requests
DELIMITER = b"\x00"
MAGIC = b"TRTR"
TranslationData = Dict[str, Dict[str, str]]
HeaderData = Dict[str, str]
def blob_from_file(file: TextIO) -> bytes:
data: Dict[str, Dict[str, str]] = json.load(file)
return _blob_from_data(data)
data = json.load(file)
return _blob_from_dict(data)
def blob_from_url(url: str) -> bytes:
r = requests.get(url)
r.raise_for_status()
data: Dict[str, Dict[str, str]] = r.json()
return _blob_from_data(data)
data = r.json()
return _blob_from_dict(data)
def _blob_from_dict(data: Dict[str, Any]) -> bytes:
header: HeaderData = data["header"]
translations: TranslationData = data["translations"]
return _blob_from_data(header, translations)
def _blob_from_data(header: HeaderData, translations: TranslationData) -> bytes:
data_blob, items_num = _create_data_blob(translations)
header_blob = _create_header_blob(
magic=MAGIC,
lang=header["language"],
version=header["version"],
data_length=len(data_blob),
items_num=items_num,
data_hash=sha256(data_blob).digest(),
)
assert len(header_blob) == 256
def _blob_from_data(data: Dict[str, Dict[str, str]]) -> bytes:
return header_blob + data_blob
def _create_data_blob(translations: TranslationData) -> Tuple[bytes, int]:
items_to_write: List[Tuple[str, str]] = []
for section_name, section in data.items():
for section_name, section in translations.items():
for k, v in section.items():
if DELIMITER.decode() in v:
raise ValueError(f"Delimiter '{DELIMITER}' found in {k}")
@ -28,11 +56,49 @@ def _blob_from_data(data: Dict[str, Dict[str, str]]) -> bytes:
items_to_write.append((name, v))
# Sorting alphabetically according the key
# TODO: maintain a stable order in future versions - write new entries
# to the end
items_to_write.sort(key=lambda x: x[0])
buffer_blob = b""
data_blob = b""
for _key, value in items_to_write:
buffer_blob += value.encode()
buffer_blob += DELIMITER
data_blob += value.encode()
data_blob += DELIMITER
# TODO: try to apply some compression of the data_blob
return data_blob, len(items_to_write)
def _create_header_blob(
magic: bytes,
lang: str,
version: str,
data_length: int,
items_num: int,
data_hash: bytes,
) -> bytes:
header = b""
# Magic (4 bytes)
header += struct.pack("4s", magic)
# Version (16 bytes)
header += struct.pack("16s", version.encode())
# Language name (32 bytes)
header += struct.pack("32s", lang.encode())
# Data length (2 bytes)
header += struct.pack("H", data_length)
# Items amount (2 bytes)
header += struct.pack("H", items_num)
# Data hash (32 bytes)
header += struct.pack("32s", data_hash)
# Fill rest with zeros
while not len(header) == 256:
header += struct.pack("B", 0)
return buffer_blob
return header

@ -154,15 +154,13 @@ def _raw_client(request: pytest.FixtureRequest) -> Client:
def _set_language(client: Client, lang: str) -> Client:
if lang == "en":
with client:
change_language(client, language="", language_data=b"")
change_language(client, language_data=b"")
elif lang == "cs":
with client, open(CS_JSON, "r") as f:
language_data = translations.blob_from_file(f)
change_language(client, language="cs", language_data=language_data)
change_language(client, language_data=translations.blob_from_file(f))
elif lang == "fr":
with client, open(FR_JSON, "r") as f:
language_data = translations.blob_from_file(f)
change_language(client, language="fr", language_data=language_data)
change_language(client, language_data=translations.blob_from_file(f))
else:
raise RuntimeError(f"Unknown language: {lang}")

@ -32,9 +32,7 @@ TRANSLATIONS = CORE / "embed" / "rust" / "src" / "ui" / "translations"
CS_JSON = TRANSLATIONS / "cs.json"
FR_JSON = TRANSLATIONS / "fr.json"
MOCK_LANG_DATA = "abc*def*".encode()
MAX_LANGUAGE_LENGTH = 32
MAX_DATA_LENGTH = 32 * 1024 - (MAX_LANGUAGE_LENGTH + 1)
MAX_DATA_LENGTH = 32 * 1024
@contextmanager
@ -56,63 +54,40 @@ def _set_english_return_back(client: Client) -> Generator[Client, None, None]:
def _set_full_czech(client: Client):
with client, open(CS_JSON, "r") as f:
language_data = translations.blob_from_file(f)
device.change_language(client, language="cs", language_data=language_data)
device.change_language(client, language_data=translations.blob_from_file(f))
def _set_full_french(client: Client):
with client, open(FR_JSON, "r") as f:
language_data = translations.blob_from_file(f)
device.change_language(client, language="fr", language_data=language_data)
device.change_language(client, language_data=translations.blob_from_file(f))
def _set_default_english(client: Client):
with client:
device.change_language(client, language="", language_data=b"")
def test_change_language(client: Client):
with _set_english_return_back(client) as client:
assert client.features.language == "en-US"
# Setting cs language
with client:
device.change_language(client, language="cs", language_data=MOCK_LANG_DATA)
assert client.features.language == "cs"
# Setting the default language via empty data
with client:
device.change_language(client, language="", language_data=b"")
assert client.features.language == "en-US"
# Max length is accepted
with client:
device.change_language(
client, language="cs", language_data=b"a" * MAX_DATA_LENGTH
)
assert client.features.language == "cs"
device.change_language(client, language_data=b"")
def test_change_language_errors(client: Client):
with _set_english_return_back(client) as client:
assert client.features.language == "en-US"
# Language name too long
# TODO: invalid header
# TODO: invalid data hash
# TODO: invalid signature
# TODO: invalid data-length
# Translations too short
with pytest.raises(
exceptions.TrezorFailure, match="Language identifier too long"
exceptions.TrezorFailure, match="Translations too short"
), client:
device.change_language(
client, language=10 * "abcd", language_data=MOCK_LANG_DATA
)
device.change_language(client, language_data=10 * b"a")
assert client.features.language == "en-US"
# Translations too long
with pytest.raises(
exceptions.TrezorFailure, match="Translations too long"
), client:
device.change_language(
client, language="cs", language_data=(MAX_DATA_LENGTH + 1) * b"a"
)
device.change_language(client, language_data=(MAX_DATA_LENGTH + 1) * b"a")
assert client.features.language == "en-US"

Loading…
Cancel
Save