diff --git a/storage/tests/Makefile b/storage/tests/Makefile new file mode 100644 index 000000000..16cc25552 --- /dev/null +++ b/storage/tests/Makefile @@ -0,0 +1,12 @@ +.PHONY: tests + +build: + $(MAKE) -C c + $(MAKE) -C c0 + +## tests commands: +tests: + pytest -k "not hypothesis" + +tests_all: + pytest diff --git a/storage/tests/README.md b/storage/tests/README.md new file mode 100644 index 000000000..3fd1eacc8 --- /dev/null +++ b/storage/tests/README.md @@ -0,0 +1,12 @@ +# Trezor Storage tests + +This repository contains all the necessary files to properly test Trezor's internal storage, which is implemented in the [trezor-storage](https://github.com/trezor/trezor-storage) repository. + +The CI is available on the internal GitLab. + +This repository consists of: + +- `c`: The actual C version is implemented in [trezor-storage](https://github.com/trezor/trezor-storage), however we need some other accompanying files to build it on PC. +- `c0`: This is the older version of Trezor storage. It is used to test upgrades from the older format to the newer one. +- `python`: Python version. Serves as a reference implementation and is implemented purely for the goal of properly testing the C version. +- `tests`: Most of the tests run the two implementations against each other. Uses Pytest and [hypothesis](https://hypothesis.works) for random tests. diff --git a/storage/tests/c/Makefile b/storage/tests/c/Makefile new file mode 100644 index 000000000..a1fc96c2e --- /dev/null +++ b/storage/tests/c/Makefile @@ -0,0 +1,25 @@ +CC = gcc +CFLAGS = -Wall -Wshadow -Wextra -Wpedantic -Werror -fPIC -DTREZOR_STORAGE_TEST +LIBS = +INC = -I ../../../crypto -I ../../../storage -I . +OBJ = flash.o common.o +OBJ += ../../../storage/storage.o ../../../storage/norcow.o +OBJ += ../../../crypto/pbkdf2.o +OBJ += ../../../crypto/rand.o +OBJ += ../../../crypto/chacha20poly1305/rfc7539.o +OBJ += ../../../crypto/chacha20poly1305/chacha20poly1305.o +OBJ += ../../../crypto/chacha20poly1305/poly1305-donna.o +OBJ += ../../../crypto/chacha20poly1305/chacha_merged.o +OBJ += ../../../crypto/hmac.o +OBJ += ../../../crypto/sha2.o +OBJ += ../../../crypto/memzero.o +OUT = libtrezor-storage.so + +$(OUT): $(OBJ) + $(CC) $(CFLAGS) $(LIBS) $(OBJ) -shared -o $(OUT) + +%.o: %.c %.h + $(CC) $(CFLAGS) $(INC) -c $< -o $@ + +clean: + rm -f $(OUT) $(OBJ) diff --git a/storage/tests/c/common.c b/storage/tests/c/common.c new file mode 100644 index 000000000..faa109c07 --- /dev/null +++ b/storage/tests/c/common.c @@ -0,0 +1,57 @@ +/* + * This file is part of the TREZOR project, https://trezor.io/ + * + * Copyright (c) SatoshiLabs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include +#include +#include + +#include "common.h" + +void __shutdown(void) +{ + printf("SHUTDOWN\n"); + exit(3); +} + +void __fatal_error(const char *expr, const char *msg, const char *file, int line, const char *func) +{ + printf("\nFATAL ERROR:\n"); + if (expr) { + printf("expr: %s\n", expr); + } + if (msg) { + printf("msg : %s\n", msg); + } + if (file) { + printf("file: %s:%d\n", file, line); + } + if (func) { + printf("func: %s\n", func); + } + __shutdown(); +} + +void error_shutdown(const char *line1, const char *line2, const char *line3, const char *line4) { + // For testing do not treat pin_fails_check_max as a fatal error. + (void) line1; + (void) line2; + (void) line3; + (void) line4; + return; +} diff --git a/storage/tests/c/common.h b/storage/tests/c/common.h new file mode 100644 index 000000000..e58a1b007 --- /dev/null +++ b/storage/tests/c/common.h @@ -0,0 +1,32 @@ +/* + * This file is part of the TREZOR project, https://trezor.io/ + * + * Copyright (c) SatoshiLabs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef __TREZORHAL_COMMON_H__ +#define __TREZORHAL_COMMON_H__ + +#include "secbool.h" + +void __fatal_error(const char *expr, const char *msg, const char *file, int line, const char *func); +void error_shutdown(const char *line1, const char *line2, const char *line3, const char *line4); + +#define ensure(expr, msg) (((expr) == sectrue) ? (void)0 : __fatal_error(#expr, msg, __FILE__, __LINE__, __func__)) + +#define hal_delay(ms) (void)ms; + +#endif diff --git a/storage/tests/c/flash.c b/storage/tests/c/flash.c new file mode 100644 index 000000000..6282e3997 --- /dev/null +++ b/storage/tests/c/flash.c @@ -0,0 +1,129 @@ +/* + * This file is part of the TREZOR project, https://trezor.io/ + * + * Copyright (c) SatoshiLabs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include +#include +#include + +#include "common.h" +#include "flash.h" + +static const uint32_t FLASH_SECTOR_TABLE[FLASH_SECTOR_COUNT + 1] = { + [ 0] = 0x08000000, // - 0x08003FFF | 16 KiB + [ 1] = 0x08004000, // - 0x08007FFF | 16 KiB + [ 2] = 0x08008000, // - 0x0800BFFF | 16 KiB + [ 3] = 0x0800C000, // - 0x0800FFFF | 16 KiB + [ 4] = 0x08010000, // - 0x0801FFFF | 64 KiB + [ 5] = 0x08020000, // - 0x0803FFFF | 128 KiB + [ 6] = 0x08040000, // - 0x0805FFFF | 128 KiB + [ 7] = 0x08060000, // - 0x0807FFFF | 128 KiB + [ 8] = 0x08080000, // - 0x0809FFFF | 128 KiB + [ 9] = 0x080A0000, // - 0x080BFFFF | 128 KiB + [10] = 0x080C0000, // - 0x080DFFFF | 128 KiB + [11] = 0x080E0000, // - 0x080FFFFF | 128 KiB + [12] = 0x08100000, // - 0x08103FFF | 16 KiB + [13] = 0x08104000, // - 0x08107FFF | 16 KiB + [14] = 0x08108000, // - 0x0810BFFF | 16 KiB + [15] = 0x0810C000, // - 0x0810FFFF | 16 KiB + [16] = 0x08110000, // - 0x0811FFFF | 64 KiB + [17] = 0x08120000, // - 0x0813FFFF | 128 KiB + [18] = 0x08140000, // - 0x0815FFFF | 128 KiB + [19] = 0x08160000, // - 0x0817FFFF | 128 KiB + [20] = 0x08180000, // - 0x0819FFFF | 128 KiB + [21] = 0x081A0000, // - 0x081BFFFF | 128 KiB + [22] = 0x081C0000, // - 0x081DFFFF | 128 KiB + [23] = 0x081E0000, // - 0x081FFFFF | 128 KiB + [24] = 0x08200000, // last element - not a valid sector +}; +const uint32_t FLASH_SIZE = 0x200000; +uint8_t *FLASH_BUFFER = NULL; + +void flash_init(void) +{ + assert(FLASH_SIZE == FLASH_SECTOR_TABLE[FLASH_SECTOR_COUNT] - FLASH_SECTOR_TABLE[0]); +} + +secbool flash_unlock_write(void) +{ + return sectrue; +} + +secbool flash_lock_write(void) +{ + return sectrue; +} + +const void *flash_get_address(uint8_t sector, uint32_t offset, uint32_t size) +{ + if (sector >= FLASH_SECTOR_COUNT) { + return NULL; + } + const uint32_t addr = FLASH_SECTOR_TABLE[sector] + offset; + const uint32_t next = FLASH_SECTOR_TABLE[sector + 1]; + if (addr + size > next) { + return NULL; + } + return FLASH_BUFFER + addr - FLASH_SECTOR_TABLE[0]; +} + +secbool flash_erase_sectors(const uint8_t *sectors, int len, void (*progress)(int pos, int len)) +{ + if (progress) { + progress(0, len); + } + for (int i = 0; i < len; i++) { + const uint8_t sector = sectors[i]; + const uint32_t offset = FLASH_SECTOR_TABLE[sector] - FLASH_SECTOR_TABLE[0]; + const uint32_t size = FLASH_SECTOR_TABLE[sector + 1] - FLASH_SECTOR_TABLE[sector]; + memset(FLASH_BUFFER + offset, 0xFF, size); + if (progress) { + progress(i + 1, len); + } + } + return sectrue; +} + +secbool flash_write_byte(uint8_t sector, uint32_t offset, uint8_t data) +{ + uint8_t *flash = (uint8_t *)flash_get_address(sector, offset, 1); + if (!flash) { + return secfalse; + } + if ((flash[0] & data) != data) { + return secfalse; // we cannot change zeroes to ones + } + flash[0] = data; + return sectrue; +} + +secbool flash_write_word(uint8_t sector, uint32_t offset, uint32_t data) +{ + if (offset % 4) { // we write only at 4-byte boundary + return secfalse; + } + uint32_t *flash = (uint32_t *)flash_get_address(sector, offset, sizeof(data)); + if (!flash) { + return secfalse; + } + if ((flash[0] & data) != data) { + return secfalse; // we cannot change zeroes to ones + } + flash[0] = data; + return sectrue; +} diff --git a/storage/tests/c/flash.h b/storage/tests/c/flash.h new file mode 100644 index 000000000..102c9e190 --- /dev/null +++ b/storage/tests/c/flash.h @@ -0,0 +1,41 @@ +/* + * This file is part of the TREZOR project, https://trezor.io/ + * + * Copyright (c) SatoshiLabs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef FLASH_H +#define FLASH_H + +#include +#include +#include "secbool.h" + +#define FLASH_SECTOR_COUNT 24 + +void flash_init(void); + +secbool __wur flash_unlock_write(void); +secbool __wur flash_lock_write(void); + +const void *flash_get_address(uint8_t sector, uint32_t offset, uint32_t size); + +secbool __wur flash_erase_sectors(const uint8_t *sectors, int len, void (*progress)(int pos, int len)); +static inline secbool flash_erase(uint8_t sector) { return flash_erase_sectors(§or, 1, NULL); } +secbool __wur flash_write_byte(uint8_t sector, uint32_t offset, uint8_t data); +secbool __wur flash_write_word(uint8_t sector, uint32_t offset, uint32_t data); + +#endif diff --git a/storage/tests/c/libtrezor-storage.so b/storage/tests/c/libtrezor-storage.so new file mode 100755 index 000000000..83aa07284 Binary files /dev/null and b/storage/tests/c/libtrezor-storage.so differ diff --git a/storage/tests/c/norcow_config.h b/storage/tests/c/norcow_config.h new file mode 100644 index 000000000..04e278c76 --- /dev/null +++ b/storage/tests/c/norcow_config.h @@ -0,0 +1,43 @@ +/* + * This file is part of the TREZOR project, https://trezor.io/ + * + * Copyright (c) SatoshiLabs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef __NORCOW_CONFIG_H__ +#define __NORCOW_CONFIG_H__ + +#include "flash.h" + +#define NORCOW_SECTOR_COUNT 2 +#define NORCOW_SECTOR_SIZE (64*1024) +#define NORCOW_SECTORS {4, 16} + +/* + * The length of the sector header in bytes. The header is preserved between sector erasures. + */ +#if TREZOR_MODEL == 1 +#define NORCOW_HEADER_LEN (0x100) +#else +#define NORCOW_HEADER_LEN 0 +#endif + +/* + * Current storage version. + */ +#define NORCOW_VERSION ((uint32_t)0x00000001) + +#endif diff --git a/storage/tests/c/secbool.h b/storage/tests/c/secbool.h new file mode 100644 index 000000000..76dfb38dc --- /dev/null +++ b/storage/tests/c/secbool.h @@ -0,0 +1,33 @@ +/* + * This file is part of the TREZOR project, https://trezor.io/ + * + * Copyright (c) SatoshiLabs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef TREZORHAL_SECBOOL_H +#define TREZORHAL_SECBOOL_H + +#include + +typedef uint32_t secbool; +#define sectrue 0xAAAAAAAAU +#define secfalse 0x00000000U + +#ifndef __wur +#define __wur __attribute__ ((warn_unused_result)) +#endif + +#endif diff --git a/storage/tests/c/storage.py b/storage/tests/c/storage.py new file mode 100644 index 000000000..f9a1a6fa5 --- /dev/null +++ b/storage/tests/c/storage.py @@ -0,0 +1,72 @@ +import ctypes as c +import os + +sectrue = -1431655766 # 0xAAAAAAAAA +fname = os.path.join(os.path.dirname(__file__), "libtrezor-storage.so") + + +class Storage: + def __init__(self) -> None: + self.lib = c.cdll.LoadLibrary(fname) + self.flash_size = c.cast(self.lib.FLASH_SIZE, c.POINTER(c.c_uint32))[0] + self.flash_buffer = c.create_string_buffer(self.flash_size) + c.cast(self.lib.FLASH_BUFFER, c.POINTER(c.c_void_p))[0] = c.addressof(self.flash_buffer) + + def init(self, salt: bytes) -> None: + self.lib.storage_init(0, salt, c.c_uint16(len(salt))) + + def wipe(self) -> None: + self.lib.storage_wipe() + + def unlock(self, pin: int) -> bool: + return sectrue == self.lib.storage_unlock(c.c_uint32(pin)) + + def lock(self) -> None: + self.lib.storage_lock() + + def has_pin(self) -> bool: + return sectrue == self.lib.storage_has_pin() + + def get_pin_rem(self) -> int: + return self.lib.storage_get_pin_rem() + + def change_pin(self, oldpin: int, newpin: int) -> bool: + return sectrue == self.lib.storage_change_pin(c.c_uint32(oldpin), c.c_uint32(newpin)) + + def get(self, key: int) -> bytes: + val_len = c.c_uint16() + if sectrue != self.lib.storage_get(c.c_uint16(key), None, 0, c.byref(val_len)): + raise RuntimeError("Failed to find key in storage.") + s = c.create_string_buffer(val_len.value) + if sectrue != self.lib.storage_get(c.c_uint16(key), s, val_len, c.byref(val_len)): + raise RuntimeError("Failed to get value from storage.") + return s.raw + + def set(self, key: int, val: bytes) -> None: + if sectrue != self.lib.storage_set(c.c_uint16(key), val, c.c_uint16(len(val))): + raise RuntimeError("Failed to set value in storage.") + + def set_counter(self, key: int, count: int) -> bool: + return sectrue == self.lib.storage_set_counter(c.c_uint16(key), c.c_uint32(count)) + + def next_counter(self, key: int) -> int: + count = c.c_uint32() + if sectrue == self.lib.storage_next_counter(c.c_uint16(key), c.byref(count)): + return count.value + else: + return None + + def delete(self, key: int) -> bool: + return sectrue == self.lib.storage_delete(c.c_uint16(key)) + + def _dump(self) -> bytes: + # return just sectors 4 and 16 of the whole flash + return [self.flash_buffer[0x010000:0x010000 + 0x10000], self.flash_buffer[0x110000:0x110000 + 0x10000]] + + def _get_flash_buffer(self) -> bytes: + return bytes(self.flash_buffer) + + def _set_flash_buffer(self, buf: bytes) -> None: + if len(buf) != self.flash_size: + raise RuntimeError("Failed to set flash buffer due to length mismatch.") + self.flash_buffer.value = buf diff --git a/storage/tests/c0/Makefile b/storage/tests/c0/Makefile new file mode 100644 index 000000000..854ffa693 --- /dev/null +++ b/storage/tests/c0/Makefile @@ -0,0 +1,14 @@ +CC=gcc +CFLAGS=-Wall -fPIC +LIBS= +OBJ=storage.o norcow.o flash.o +OUT=libtrezor-storage0.so + +$(OUT): $(OBJ) + $(CC) $(CFLAGS) $(LIBS) $(OBJ) -shared -o $(OUT) + +%.o: %.c + $(CC) $(CFLAGS) -c $< -o $@ + +clean: + rm -f $(OUT) $(OBJ) diff --git a/storage/tests/c0/common.h b/storage/tests/c0/common.h new file mode 100644 index 000000000..6f2b178c8 --- /dev/null +++ b/storage/tests/c0/common.h @@ -0,0 +1,29 @@ +/* + * This file is part of the TREZOR project, https://trezor.io/ + * + * Copyright (c) SatoshiLabs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef __TREZORHAL_COMMON_H__ +#define __TREZORHAL_COMMON_H__ + +#include "secbool.h" + +#define ensure(expr, msg) (((expr) == sectrue) ? (void)0 : (void)1) + +#define hal_delay(ms) (void)ms; + +#endif diff --git a/storage/tests/c0/flash.c b/storage/tests/c0/flash.c new file mode 100644 index 000000000..3a0a143f2 --- /dev/null +++ b/storage/tests/c0/flash.c @@ -0,0 +1,130 @@ +/* + * This file is part of the TREZOR project, https://trezor.io/ + * + * Copyright (c) SatoshiLabs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include +#include +#include + +#include "common.h" +#include "flash.h" + +static const uint32_t FLASH_SECTOR_TABLE[FLASH_SECTOR_COUNT + 1] = { + [ 0] = 0x08000000, // - 0x08003FFF | 16 KiB + [ 1] = 0x08004000, // - 0x08007FFF | 16 KiB + [ 2] = 0x08008000, // - 0x0800BFFF | 16 KiB + [ 3] = 0x0800C000, // - 0x0800FFFF | 16 KiB + [ 4] = 0x08010000, // - 0x0801FFFF | 64 KiB + [ 5] = 0x08020000, // - 0x0803FFFF | 128 KiB + [ 6] = 0x08040000, // - 0x0805FFFF | 128 KiB + [ 7] = 0x08060000, // - 0x0807FFFF | 128 KiB + [ 8] = 0x08080000, // - 0x0809FFFF | 128 KiB + [ 9] = 0x080A0000, // - 0x080BFFFF | 128 KiB + [10] = 0x080C0000, // - 0x080DFFFF | 128 KiB + [11] = 0x080E0000, // - 0x080FFFFF | 128 KiB + [12] = 0x08100000, // - 0x08103FFF | 16 KiB + [13] = 0x08104000, // - 0x08107FFF | 16 KiB + [14] = 0x08108000, // - 0x0810BFFF | 16 KiB + [15] = 0x0810C000, // - 0x0810FFFF | 16 KiB + [16] = 0x08110000, // - 0x0811FFFF | 64 KiB + [17] = 0x08120000, // - 0x0813FFFF | 128 KiB + [18] = 0x08140000, // - 0x0815FFFF | 128 KiB + [19] = 0x08160000, // - 0x0817FFFF | 128 KiB + [20] = 0x08180000, // - 0x0819FFFF | 128 KiB + [21] = 0x081A0000, // - 0x081BFFFF | 128 KiB + [22] = 0x081C0000, // - 0x081DFFFF | 128 KiB + [23] = 0x081E0000, // - 0x081FFFFF | 128 KiB + [24] = 0x08200000, // last element - not a valid sector +}; + +const uint32_t FLASH_SIZE = 0x200000; +uint8_t *FLASH_BUFFER = NULL; + +void flash_init(void) +{ + assert(FLASH_SIZE == FLASH_SECTOR_TABLE[FLASH_SECTOR_COUNT] - FLASH_SECTOR_TABLE[0]); +} + +secbool flash_unlock(void) +{ + return sectrue; +} + +secbool flash_lock(void) +{ + return sectrue; +} + +const void *flash_get_address(uint8_t sector, uint32_t offset, uint32_t size) +{ + if (sector >= FLASH_SECTOR_COUNT) { + return NULL; + } + const uint32_t addr = FLASH_SECTOR_TABLE[sector] + offset; + const uint32_t next = FLASH_SECTOR_TABLE[sector + 1]; + if (addr + size > next) { + return NULL; + } + return FLASH_BUFFER + addr - FLASH_SECTOR_TABLE[0]; +} + +secbool flash_erase_sectors(const uint8_t *sectors, int len, void (*progress)(int pos, int len)) +{ + if (progress) { + progress(0, len); + } + for (int i = 0; i < len; i++) { + const uint8_t sector = sectors[i]; + const uint32_t offset = FLASH_SECTOR_TABLE[sector] - FLASH_SECTOR_TABLE[0]; + const uint32_t size = FLASH_SECTOR_TABLE[sector + 1] - FLASH_SECTOR_TABLE[sector]; + memset(FLASH_BUFFER + offset, 0xFF, size); + if (progress) { + progress(i + 1, len); + } + } + return sectrue; +} + +secbool flash_write_byte(uint8_t sector, uint32_t offset, uint8_t data) +{ + uint8_t *flash = (uint8_t *)flash_get_address(sector, offset, 1); + if (!flash) { + return secfalse; + } + if ((flash[0] & data) != data) { + return secfalse; // we cannot change zeroes to ones + } + flash[0] = data; + return sectrue; +} + +secbool flash_write_word(uint8_t sector, uint32_t offset, uint32_t data) +{ + if (offset % 4) { // we write only at 4-byte boundary + return secfalse; + } + uint32_t *flash = (uint32_t *)flash_get_address(sector, offset, sizeof(data)); + if (!flash) { + return secfalse; + } + if ((flash[0] & data) != data) { + return secfalse; // we cannot change zeroes to ones + } + flash[0] = data; + return sectrue; +} diff --git a/storage/tests/c0/flash.h b/storage/tests/c0/flash.h new file mode 100644 index 000000000..436dceb42 --- /dev/null +++ b/storage/tests/c0/flash.h @@ -0,0 +1,41 @@ +/* + * This file is part of the TREZOR project, https://trezor.io/ + * + * Copyright (c) SatoshiLabs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef FLASH_H +#define FLASH_H + +#include +#include +#include "secbool.h" + +#define FLASH_SECTOR_COUNT 24 + +void flash_init(void); + +secbool __wur flash_unlock(void); +secbool __wur flash_lock(void); + +const void *flash_get_address(uint8_t sector, uint32_t offset, uint32_t size); + +secbool __wur flash_erase_sectors(const uint8_t *sectors, int len, void (*progress)(int pos, int len)); +static inline secbool flash_erase_sector(uint8_t sector) { return flash_erase_sectors(§or, 1, NULL); } +secbool __wur flash_write_byte(uint8_t sector, uint32_t offset, uint8_t data); +secbool __wur flash_write_word(uint8_t sector, uint32_t offset, uint32_t data); + +#endif diff --git a/storage/tests/c0/libtrezor-storage0.so b/storage/tests/c0/libtrezor-storage0.so new file mode 100755 index 000000000..e03da87a7 Binary files /dev/null and b/storage/tests/c0/libtrezor-storage0.so differ diff --git a/storage/tests/c0/norcow.c b/storage/tests/c0/norcow.c new file mode 100644 index 000000000..ed54be3b2 --- /dev/null +++ b/storage/tests/c0/norcow.c @@ -0,0 +1,305 @@ +/* + * This file is part of the TREZOR project, https://trezor.io/ + * + * Copyright (c) SatoshiLabs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include + +#include "norcow.h" +#include "flash.h" +#include "common.h" + +// NRCW = 4e524357 +#define NORCOW_MAGIC ((uint32_t)0x5743524e) +#define NORCOW_MAGIC_LEN (sizeof(uint32_t)) + +static const uint8_t norcow_sectors[NORCOW_SECTOR_COUNT] = NORCOW_SECTORS; +static uint8_t norcow_active_sector = 0; +static uint32_t norcow_active_offset = NORCOW_MAGIC_LEN; + +/* + * Returns pointer to sector, starting with offset + * Fails when there is not enough space for data of given size + */ +static const void *norcow_ptr(uint8_t sector, uint32_t offset, uint32_t size) +{ + ensure(sectrue * (sector <= NORCOW_SECTOR_COUNT), "invalid sector"); + return flash_get_address(norcow_sectors[sector], offset, size); +} + +/* + * Writes data to given sector, starting from offset + */ +static secbool norcow_write(uint8_t sector, uint32_t offset, uint32_t prefix, const uint8_t *data, uint16_t len) +{ + if (sector >= NORCOW_SECTOR_COUNT) { + return secfalse; + } + ensure(flash_unlock(), NULL); + + // write prefix + ensure(flash_write_word(norcow_sectors[sector], offset, prefix), NULL); + + if (len > 0) { + offset += sizeof(uint32_t); + // write data + for (uint16_t i = 0; i < len; i++, offset++) { + ensure(flash_write_byte(norcow_sectors[sector], offset, data[i]), NULL); + } + // pad with zeroes + for (; offset % 4; offset++) { + ensure(flash_write_byte(norcow_sectors[sector], offset, 0x00), NULL); + } + } + ensure(flash_lock(), NULL); + return sectrue; +} + +/* + * Erases sector (and sets a magic) + */ +static void norcow_erase(uint8_t sector, secbool set_magic) +{ + ensure(sectrue * (sector <= NORCOW_SECTOR_COUNT), "invalid sector"); + ensure(flash_erase_sector(norcow_sectors[sector]), "erase failed"); + if (sectrue == set_magic) { + ensure(norcow_write(sector, 0, NORCOW_MAGIC, NULL, 0), "set magic failed"); + } +} + +#define ALIGN4(X) (X) = ((X) + 3) & ~3 + +/* + * Reads one item starting from offset + */ +static secbool read_item(uint8_t sector, uint32_t offset, uint16_t *key, const void **val, uint16_t *len, uint32_t *pos) +{ + *pos = offset; + + const void *k = norcow_ptr(sector, *pos, 2); + if (k == NULL) return secfalse; + *pos += 2; + memcpy(key, k, sizeof(uint16_t)); + if (*key == 0xFFFF) { + return secfalse; + } + + const void *l = norcow_ptr(sector, *pos, 2); + if (l == NULL) return secfalse; + *pos += 2; + memcpy(len, l, sizeof(uint16_t)); + + *val = norcow_ptr(sector, *pos, *len); + if (*val == NULL) return secfalse; + *pos += *len; + ALIGN4(*pos); + return sectrue; +} + +/* + * Writes one item starting from offset + */ +static secbool write_item(uint8_t sector, uint32_t offset, uint16_t key, const void *val, uint16_t len, uint32_t *pos) +{ + uint32_t prefix = (len << 16) | key; + *pos = offset + sizeof(uint32_t) + len; + ALIGN4(*pos); + return norcow_write(sector, offset, prefix, val, len); +} + +/* + * Finds item in given sector + */ +static secbool find_item(uint8_t sector, uint16_t key, const void **val, uint16_t *len) +{ + *val = 0; + *len = 0; + uint32_t offset = NORCOW_MAGIC_LEN; + for (;;) { + uint16_t k, l; + const void *v; + uint32_t pos; + if (sectrue != read_item(sector, offset, &k, &v, &l, &pos)) { + break; + } + if (key == k) { + *val = v; + *len = l; + } + offset = pos; + } + return sectrue * (*val != NULL); +} + +/* + * Finds first unused offset in given sector + */ +static uint32_t find_free_offset(uint8_t sector) +{ + uint32_t offset = NORCOW_MAGIC_LEN; + for (;;) { + uint16_t key, len; + const void *val; + uint32_t pos; + if (sectrue != read_item(sector, offset, &key, &val, &len, &pos)) { + break; + } + offset = pos; + } + return offset; +} + +/* + * Compacts active sector and sets new active sector + */ +static void compact() +{ + uint8_t norcow_next_sector = (norcow_active_sector + 1) % NORCOW_SECTOR_COUNT; + norcow_erase(norcow_next_sector, sectrue); + + uint32_t offset = NORCOW_MAGIC_LEN, offsetw = NORCOW_MAGIC_LEN; + + for (;;) { + // read item + uint16_t k, l; + const void *v; + uint32_t pos; + secbool r = read_item(norcow_active_sector, offset, &k, &v, &l, &pos); + if (sectrue != r) { + break; + } + offset = pos; + + // check if not already saved + const void *v2; + uint16_t l2; + r = find_item(norcow_next_sector, k, &v2, &l2); + if (sectrue == r) { + continue; + } + + // scan for latest instance + uint32_t offsetr = offset; + for (;;) { + uint16_t k2; + uint32_t posr; + r = read_item(norcow_active_sector, offsetr, &k2, &v2, &l2, &posr); + if (sectrue != r) { + break; + } + if (k == k2) { + v = v2; + l = l2; + } + offsetr = posr; + } + + // copy the last item + uint32_t posw; + ensure(write_item(norcow_next_sector, offsetw, k, v, l, &posw), "compaction write failed"); + offsetw = posw; + } + + norcow_erase(norcow_active_sector, secfalse); + norcow_active_sector = norcow_next_sector; + norcow_active_offset = find_free_offset(norcow_active_sector); +} + +/* + * Initializes storage + */ +void norcow_init(void) +{ + flash_init(); + secbool found = secfalse; + // detect active sector - starts with magic + for (uint8_t i = 0; i < NORCOW_SECTOR_COUNT; i++) { + const uint32_t *magic = norcow_ptr(i, 0, NORCOW_MAGIC_LEN); + if (magic != NULL && *magic == NORCOW_MAGIC) { + found = sectrue; + norcow_active_sector = i; + break; + } + } + // no active sectors found - let's erase + if (sectrue == found) { + norcow_active_offset = find_free_offset(norcow_active_sector); + } else { + norcow_wipe(); + } +} + +/* + * Wipe the storage + */ +void norcow_wipe(void) +{ + norcow_erase(0, sectrue); + for (uint8_t i = 1; i < NORCOW_SECTOR_COUNT; i++) { + norcow_erase(i, secfalse); + } + norcow_active_sector = 0; + norcow_active_offset = NORCOW_MAGIC_LEN; +} + +/* + * Looks for the given key, returns status of the operation + */ +secbool norcow_get(uint16_t key, const void **val, uint16_t *len) +{ + return find_item(norcow_active_sector, key, val, len); +} + +/* + * Sets the given key, returns status of the operation + */ +secbool norcow_set(uint16_t key, const void *val, uint16_t len) +{ + // check whether there is enough free space + // and compact if full + if (norcow_active_offset + sizeof(uint32_t) + len > NORCOW_SECTOR_SIZE) { + compact(); + } + // write item + uint32_t pos; + secbool r = write_item(norcow_active_sector, norcow_active_offset, key, val, len, &pos); + if (sectrue == r) { + norcow_active_offset = pos; + } + return r; +} + +/* + * Update a word in flash at the given pointer. The pointer must point + * into the NORCOW area. + */ +secbool norcow_update(uint16_t key, uint16_t offset, uint32_t value) +{ + const void *ptr; + uint16_t len; + if (sectrue != find_item(norcow_active_sector, key, &ptr, &len)) { + return secfalse; + } + if ((offset & 3) != 0 || offset >= len) { + return secfalse; + } + uint32_t sector_offset = (const uint8_t*) ptr - (const uint8_t *)norcow_ptr(norcow_active_sector, 0, NORCOW_SECTOR_SIZE) + offset; + ensure(flash_unlock(), NULL); + ensure(flash_write_word(norcow_sectors[norcow_active_sector], sector_offset, value), NULL); + ensure(flash_lock(), NULL); + return sectrue; +} diff --git a/storage/tests/c0/norcow.h b/storage/tests/c0/norcow.h new file mode 100644 index 000000000..00bab4d0a --- /dev/null +++ b/storage/tests/c0/norcow.h @@ -0,0 +1,58 @@ +/* + * This file is part of the TREZOR project, https://trezor.io/ + * + * Copyright (c) SatoshiLabs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef __NORCOW_H__ +#define __NORCOW_H__ + +#include +#include "secbool.h" + +/* + * Storage parameters + */ + +#include "norcow_config.h" + +/* + * Initialize storage + */ +void norcow_init(void); + +/* + * Wipe the storage + */ +void norcow_wipe(void); + +/* + * Looks for the given key, returns status of the operation + */ +secbool norcow_get(uint16_t key, const void **val, uint16_t *len); + +/* + * Sets the given key, returns status of the operation + */ +secbool norcow_set(uint16_t key, const void *val, uint16_t len); + +/* + * Update a word in flash in the given key at the given offset. + * Note that you can only change bits from 1 to 0. + */ +secbool norcow_update(uint16_t key, uint16_t offset, uint32_t value); + +#endif diff --git a/storage/tests/c0/norcow_config.h b/storage/tests/c0/norcow_config.h new file mode 100644 index 000000000..ff7b1eb0b --- /dev/null +++ b/storage/tests/c0/norcow_config.h @@ -0,0 +1,29 @@ +/* + * This file is part of the TREZOR project, https://trezor.io/ + * + * Copyright (c) SatoshiLabs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef __NORCOW_CONFIG_H__ +#define __NORCOW_CONFIG_H__ + +#include "flash.h" + +#define NORCOW_SECTOR_COUNT 2 +#define NORCOW_SECTOR_SIZE (64*1024) +#define NORCOW_SECTORS {4, 16} + +#endif diff --git a/storage/tests/c0/secbool.h b/storage/tests/c0/secbool.h new file mode 100644 index 000000000..76dfb38dc --- /dev/null +++ b/storage/tests/c0/secbool.h @@ -0,0 +1,33 @@ +/* + * This file is part of the TREZOR project, https://trezor.io/ + * + * Copyright (c) SatoshiLabs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef TREZORHAL_SECBOOL_H +#define TREZORHAL_SECBOOL_H + +#include + +typedef uint32_t secbool; +#define sectrue 0xAAAAAAAAU +#define secfalse 0x00000000U + +#ifndef __wur +#define __wur __attribute__ ((warn_unused_result)) +#endif + +#endif diff --git a/storage/tests/c0/storage.c b/storage/tests/c0/storage.c new file mode 100644 index 000000000..5e0f8343b --- /dev/null +++ b/storage/tests/c0/storage.c @@ -0,0 +1,238 @@ +/* + * This file is part of the TREZOR project, https://trezor.io/ + * + * Copyright (c) SatoshiLabs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include + +#include "common.h" +#include "norcow.h" +#include "storage.h" + +// Norcow storage key of configured PIN. +#define PIN_KEY 0x0000 + +// Maximum PIN length. +#define PIN_MAXLEN 32 + +// Byte-length of flash section containing fail counters. +#define PIN_FAIL_KEY 0x0001 +#define PIN_FAIL_SECTOR_SIZE 32 + +// Maximum number of failed unlock attempts. +#define PIN_MAX_TRIES 15 + +static secbool initialized = secfalse; +static secbool unlocked = secfalse; +static PIN_UI_WAIT_CALLBACK ui_callback = NULL; + +void storage_init(PIN_UI_WAIT_CALLBACK callback) +{ + initialized = secfalse; + unlocked = secfalse; + norcow_init(); + initialized = sectrue; + ui_callback = callback; +} + +static secbool pin_fails_reset(uint16_t ofs) +{ + return norcow_update(PIN_FAIL_KEY, ofs, 0); +} + +static secbool pin_fails_increase(const uint32_t *ptr, uint16_t ofs) +{ + uint32_t ctr = *ptr; + ctr = ctr << 1; + + if (sectrue != norcow_update(PIN_FAIL_KEY, ofs, ctr)) { + return secfalse; + } + + uint32_t check = *ptr; + if (ctr != check) { + return secfalse; + } + return sectrue; +} + +static void pin_fails_check_max(uint32_t ctr) +{ + if (~ctr >= (1 << PIN_MAX_TRIES)) { + norcow_wipe(); + ensure(secfalse, "pin_fails_check_max"); + } +} + +static secbool pin_cmp(const uint32_t pin) +{ + const void *spin = NULL; + uint16_t spinlen = 0; + norcow_get(PIN_KEY, &spin, &spinlen); + if (NULL != spin && spinlen == sizeof(uint32_t)) { + return sectrue * (pin == *(const uint32_t*)spin); + } else { + return sectrue * (1 == pin); + } +} + +static secbool pin_get_fails(const uint32_t **pinfail, uint32_t *pofs) +{ + const void *vpinfail; + uint16_t pinfaillen; + unsigned int ofs; + // The PIN_FAIL_KEY points to an area of words, initialized to + // 0xffffffff (meaning no pin failures). The first non-zero word + // in this area is the current pin failure counter. If PIN_FAIL_KEY + // has no configuration or is empty, the pin failure counter is 0. + // We rely on the fact that flash allows to clear bits and we clear one + // bit to indicate pin failure. On success, the word is set to 0, + // indicating that the next word is the pin failure counter. + + // Find the current pin failure counter + if (secfalse != norcow_get(PIN_FAIL_KEY, &vpinfail, &pinfaillen)) { + *pinfail = vpinfail; + for (ofs = 0; ofs < pinfaillen / sizeof(uint32_t); ofs++) { + if (((const uint32_t *) vpinfail)[ofs]) { + *pinfail = vpinfail; + *pofs = ofs; + return sectrue; + } + } + } + + // No pin failure section, or all entries used -> create a new one. + uint32_t pinarea[PIN_FAIL_SECTOR_SIZE]; + memset(pinarea, 0xff, sizeof(pinarea)); + if (sectrue != norcow_set(PIN_FAIL_KEY, pinarea, sizeof(pinarea))) { + return secfalse; + } + if (sectrue != norcow_get(PIN_FAIL_KEY, &vpinfail, &pinfaillen)) { + return secfalse; + } + *pinfail = vpinfail; + *pofs = 0; + return sectrue; +} + +secbool storage_check_pin(const uint32_t pin) +{ + const uint32_t *pinfail = NULL; + uint32_t ofs; + uint32_t ctr; + + // Get the pin failure counter + if (pin_get_fails(&pinfail, &ofs) != sectrue) { + return secfalse; + } + + // Read current failure counter + ctr = pinfail[ofs]; + // Wipe storage if too many failures + pin_fails_check_max(ctr); + + // Sleep for ~ctr seconds before checking the PIN. + uint32_t progress; + for (uint32_t wait = ~ctr; wait > 0; wait--) { + for (int i = 0; i < 10; i++) { + if (ui_callback) { + if ((~ctr) > 1000000) { // precise enough + progress = (~ctr - wait) / ((~ctr) / 1000); + } else { + progress = ((~ctr - wait) * 10 + i) * 100 / (~ctr); + } + ui_callback(wait, progress); + } + hal_delay(100); + } + } + // Show last frame if we were waiting + if ((~ctr > 0) && ui_callback) { + ui_callback(0, 1000); + } + + // First, we increase PIN fail counter in storage, even before checking the + // PIN. If the PIN is correct, we reset the counter afterwards. If not, we + // check if this is the last allowed attempt. + if (sectrue != pin_fails_increase(pinfail + ofs, ofs * sizeof(uint32_t))) { + return secfalse; + } + if (sectrue != pin_cmp(pin)) { + // Wipe storage if too many failures + pin_fails_check_max(ctr << 1); + return secfalse; + } + // Finally set the counter to 0 to indicate success. + return pin_fails_reset(ofs * sizeof(uint32_t)); +} + +secbool storage_unlock(const uint32_t pin) +{ + unlocked = secfalse; + if (sectrue == initialized && sectrue == storage_check_pin(pin)) { + unlocked = sectrue; + } + return unlocked; +} + +secbool storage_get(const uint16_t key, const void **val, uint16_t *len) +{ + const uint8_t app = key >> 8; + // APP == 0 is reserved for PIN related values + if (sectrue != initialized || app == 0) { + return secfalse; + } + // top bit of APP set indicates the value can be read from unlocked device + if (sectrue != unlocked && ((app & 0x80) == 0)) { + return secfalse; + } + return norcow_get(key, val, len); +} + +secbool storage_set(const uint16_t key, const void *val, uint16_t len) +{ + const uint8_t app = key >> 8; + // APP == 0 is reserved for PIN related values + if (sectrue != initialized || sectrue != unlocked || app == 0) { + return secfalse; + } + return norcow_set(key, val, len); +} + +secbool storage_has_pin(void) +{ + if (sectrue != initialized) { + return secfalse; + } + return sectrue == pin_cmp(1) ? secfalse : sectrue; +} + +secbool storage_change_pin(const uint32_t oldpin, const uint32_t newpin) +{ + if (sectrue != initialized || sectrue != unlocked) { + return secfalse; + } + if (sectrue != storage_check_pin(oldpin)) { + return secfalse; + } + return norcow_set(PIN_KEY, &newpin, sizeof(uint32_t)); +} + +void storage_wipe(void) +{ + norcow_wipe(); +} diff --git a/storage/tests/c0/storage.h b/storage/tests/c0/storage.h new file mode 100644 index 000000000..797528175 --- /dev/null +++ b/storage/tests/c0/storage.h @@ -0,0 +1,38 @@ +/* + * This file is part of the TREZOR project, https://trezor.io/ + * + * Copyright (c) SatoshiLabs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef __STORAGE_H__ +#define __STORAGE_H__ + +#include +#include +#include "secbool.h" + +typedef void (*PIN_UI_WAIT_CALLBACK)(uint32_t wait, uint32_t progress); + +void storage_init(PIN_UI_WAIT_CALLBACK callback); +void storage_wipe(void); +secbool storage_check_pin(const uint32_t pin); +secbool storage_unlock(const uint32_t pin); +secbool storage_has_pin(void); +secbool storage_change_pin(const uint32_t oldpin, const uint32_t newpin); +secbool storage_get(const uint16_t key, const void **val, uint16_t *len); +secbool storage_set(const uint16_t key, const void *val, uint16_t len); + +#endif diff --git a/storage/tests/c0/storage.py b/storage/tests/c0/storage.py new file mode 100644 index 000000000..64c9bf8bb --- /dev/null +++ b/storage/tests/c0/storage.py @@ -0,0 +1,54 @@ +import ctypes as c +import os + +sectrue = -1431655766 # 0xAAAAAAAAA +fname = os.path.join(os.path.dirname(__file__), "libtrezor-storage0.so") + +class Storage: + + def __init__(self) -> None: + self.lib = c.cdll.LoadLibrary(fname) + self.flash_size = c.cast(self.lib.FLASH_SIZE, c.POINTER(c.c_uint32))[0] + self.flash_buffer = c.create_string_buffer(self.flash_size) + c.cast(self.lib.FLASH_BUFFER, c.POINTER(c.c_void_p))[0] = c.addressof(self.flash_buffer) + + def init(self) -> None: + self.lib.storage_init(0) + + def wipe(self) -> None: + self.lib.storage_wipe() + + def check_pin(self, pin: int) -> bool: + return sectrue == self.lib.storage_check_pin(c.c_uint32(pin)) + + def unlock(self, pin: int) -> bool: + return sectrue == self.lib.storage_unlock(c.c_uint32(pin)) + + def has_pin(self) -> bool: + return sectrue == self.lib.storage_has_pin() + + def change_pin(self, oldpin: int, newpin: int) -> bool: + return sectrue == self.lib.storage_change_pin(c.c_uint32(oldpin), c.c_uint32(newpin)) + + def get(self, key: int) -> bytes: + val_ptr = c.c_void_p() + val_len = c.c_uint16() + if sectrue != self.lib.storage_get(c.c_uint16(key), c.byref(val_ptr), c.byref(val_len)): + raise RuntimeError("Failed to find key in storage.") + return c.string_at(val_ptr, size=val_len.value) + + def set(self, key: int, val: bytes) -> None: + if sectrue != self.lib.storage_set(c.c_uint16(key), val, c.c_uint16(len(val))): + raise RuntimeError("Failed to set value in storage.") + + def _dump(self) -> bytes: + # return just sectors 4 and 16 of the whole flash + return [self.flash_buffer[0x010000:0x010000 + 0x10000], self.flash_buffer[0x110000:0x110000 + 0x10000]] + + def _get_flash_buffer(self) -> bytes: + return bytes(self.flash_buffer) + + def _set_flash_buffer(self, buf: bytes) -> None: + if len(buf) != self.flash_size: + raise RuntimeError("Failed to set flash buffer due to length mismatch.") + self.flash_buffer = buf diff --git a/storage/tests/python/__init__.py b/storage/tests/python/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/storage/tests/python/src/__init__.py b/storage/tests/python/src/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/storage/tests/python/src/consts.py b/storage/tests/python/src/consts.py new file mode 100644 index 000000000..1785c3623 --- /dev/null +++ b/storage/tests/python/src/consts.py @@ -0,0 +1,152 @@ +# ----- PIN and encryption related ----- # + +# App ID where PIN log is stored. +PIN_APP_ID = 0x00 + +# Storage key of the combined salt, EDEK, ESEK and PIN verification code entry. +EDEK_ESEK_PVC_KEY = (PIN_APP_ID << 8) | 0x02 + +# Storage key of the PIN set flag. +PIN_NOT_SET_KEY = (PIN_APP_ID << 8) | 0x03 + +# Norcow storage key of the storage version. +VERSION_KEY = (PIN_APP_ID << 8) | 0x04 + +# Norcow storage key of the storage authentication tag. +SAT_KEY = (PIN_APP_ID << 8) | 0x05 + +# The PIN value corresponding to an empty PIN. +PIN_EMPTY = 1 + +# Maximum number of failed unlock attempts. +PIN_MAX_TRIES = 16 + +# The total number of iterations to use in PBKDF2. +PIN_ITER_COUNT = 20000 + +# The length of the data encryption key in bytes. +DEK_SIZE = 32 + +# The length of the storage authentication key in bytes. +SAK_SIZE = 16 + +# The length of the storage authentication tag in bytes. +SAT_SIZE = 16 + +# The length of the random salt in bytes. +PIN_SALT_SIZE = 4 +PIN_HARDWARE_SALT_SIZE = 32 + +# The length of the PIN verification code in bytes. +PVC_SIZE = 8 + +# The length of KEK in bytes. +KEK_SIZE = 32 + +# The length of KEIV in bytes. +KEIV_SIZE = 12 + +# Size of counter. 4B integer and 8B tail. +COUNTER_TAIL = 12 +COUNTER_TAIL_SIZE = 8 +COUNTER_MAX_TAIL = 64 + +# ----- PIN logs ----- # + +# Storage key of the PIN entry log and PIN success log. +PIN_LOG_KEY = (PIN_APP_ID << 8) | 0x01 + +# Length of items in the PIN entry log +PIN_LOG_GUARD_KEY_SIZE = 4 + +# Values used for the guard key integrity check. +GUARD_KEY_MODULUS = 6311 +GUARD_KEY_REMAINDER = 15 +GUARD_KEY_RANDOM_MAX = (0xFFFFFFFF // GUARD_KEY_MODULUS) + 1 + +# Length of both success log and entry log +PIN_LOG_SIZE = 64 + +# Used for in guard bits operations. +LOW_MASK = 0x55555555 + +# Log initialized to all FFs. +ALL_FF_LOG = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF + +# ----- Bytes ----- + +# If the top bit of APP is set, then the value is not encrypted. +FLAG_PUBLIC = 0x80 + +# If the top two bits of APP are set, then the value is not encrypted and it +# can be written even when the storage is locked. +FLAG_WRITE = 0xC0 + +# Length of word in bytes. +WORD_SIZE = 4 + +# Boolean values are stored as a simple 0/1 int. +TRUE_BYTE = b"\x01" +FALSE_BYTE = b"\x00" + +# ----- Crypto ----- # + +# The length of the Poly1305 MAC in bytes. +POLY1305_MAC_SIZE = 16 + +# The length of the ChaCha20 IV (aka nonce) in bytes as per RFC 7539. +CHACHA_IV_SIZE = 12 + +# ----- Norcow ----- # + +NORCOW_SECTOR_COUNT = 2 +NORCOW_SECTOR_SIZE = 64 * 1024 + +# Magic flag at the beggining of an active sector. +NORCOW_MAGIC = b"NRC2" + +# Norcow version, set in the storage header, but also as an encrypted item. +NORCOW_VERSION = b"\x01\x00\x00\x00" + +# Norcow magic combined with the version, which is stored as its negation. +NORCOW_MAGIC_AND_VERSION = NORCOW_MAGIC + bytes( + [ + ~NORCOW_VERSION[0] & 0xFF, + ~NORCOW_VERSION[1] & 0xFF, + ~NORCOW_VERSION[2] & 0xFF, + ~NORCOW_VERSION[3] & 0xFF, + ] +) + +# Signalizes free storage. +NORCOW_KEY_FREE = 0xFFFF + + +# |-----------|-------------------| +# | Private | APP = 0 | +# | Protected | 1 <= APP <= 127 | +# | Public | 128 <= APP <= 255 | + + +def is_app_public(app: int): + if app & FLAG_PUBLIC: + return True + return False + + +def is_app_protected(app: int): + if is_app_public(app): + return False + if is_app_private(app): + return False + return True + + +def is_app_private(app: int): + return app == PIN_APP_ID + + +def is_app_lock_writable(app: int): + if app & FLAG_WRITE == FLAG_WRITE: + return True + return False diff --git a/storage/tests/python/src/crypto.py b/storage/tests/python/src/crypto.py new file mode 100644 index 000000000..037d82a0e --- /dev/null +++ b/storage/tests/python/src/crypto.py @@ -0,0 +1,112 @@ +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, hmac +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms +from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + +from . import consts, prng + + +def derive_kek_keiv(salt: bytes, pin: int) -> (bytes, bytes): + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=consts.KEK_SIZE + consts.KEIV_SIZE, + salt=bytes(salt), + iterations=10000, + backend=default_backend(), + ) + pbkdf_output = kdf.derive(pin.to_bytes(4, "little")) + # the first 256b is Key Encryption Key + kek = pbkdf_output[: consts.KEK_SIZE] + # following with 96b of Initialization Vector + keiv = pbkdf_output[consts.KEK_SIZE :] + + return kek, keiv + + +def chacha_poly_encrypt( + key: bytes, iv: bytes, data: bytes, additional_data: bytes = None +) -> (bytes, bytes): + chacha = ChaCha20Poly1305(key) + chacha_output = chacha.encrypt(iv, bytes(data), additional_data) + # cipher text and 128b authentication tag + return chacha_output[: len(data)], chacha_output[len(data) :] + + +def chacha_poly_decrypt( + key: bytes, app_key: int, iv: bytes, data: bytes, additional_data: bytes = None +) -> bytes: + chacha = ChaCha20Poly1305(key) + chacha_output = chacha.decrypt(bytes(iv), bytes(data), additional_data) + return chacha_output + + +def decrypt_edek_esak( + pin: int, salt: bytes, edek_esak: bytes, pvc: bytes +) -> (bytes, bytes): + """ + Decrypts EDEK, ESAK to DEK, SAK and checks PIN in the process. + Raises: + InvalidPinError: if PIN is invalid + """ + kek, keiv = derive_kek_keiv(salt, pin) + + algorithm = algorithms.ChaCha20(kek, (1).to_bytes(4, "little") + keiv) + cipher = Cipher(algorithm, mode=None, backend=default_backend()) + decryptor = cipher.decryptor() + dek_sak = decryptor.update(bytes(edek_esak)) + dek = dek_sak[: consts.DEK_SIZE] + sak = dek_sak[consts.DEK_SIZE :] + + if not validate_pin(kek, keiv, dek_sak, pvc): + raise InvalidPinError("Invalid PIN") + + return dek, sak + + +def validate_pin(kek: bytes, keiv: bytes, dek_sak: bytes, pvc: bytes) -> bool: + """ + This a little bit hackish. We do not store the whole + authentication tag so we can't decrypt using ChaCha20Poly1305 + because it obviously checks the tag first and fails. + So we are using the sole ChaCha20 cipher to decipher and then encrypt + again with Chacha20Poly1305 here to get the tag and compare it to PVC. + """ + _, tag = chacha_poly_encrypt(kek, keiv, dek_sak) + prng.random32() + prng.random32() + return tag[: consts.PVC_SIZE] == pvc + + +def calculate_hmacs(sak: bytes, keys: bytes) -> bytes: + """ + This calculates HMAC-SHA-256(SAK, (XOR_i) HMAC-SHA-256(SAK, KEY_i)). + In other words, it does HMAC for every KEY and XORs it all together. + One more final HMAC is then performed on the result. + """ + hmacs = _hmac(sak, keys[0]) + for key in keys[1:]: + hmacs = _xor(hmacs, _hmac(sak, key)) + return _final_hmac(sak, hmacs) + + +def init_hmacs(sak: bytes) -> bytes: + return _final_hmac(sak, b"\x00" * hashes.SHA256.digest_size) + + +def _final_hmac(sak: bytes, data: bytes) -> bytes: + return _hmac(sak, data)[: consts.SAT_SIZE] + + +def _hmac(key: bytes, data: bytes) -> bytes: + h = hmac.HMAC(key, hashes.SHA256(), backend=default_backend()) + h.update(data) + return h.finalize() + + +def _xor(first: bytes, second: bytes) -> bytes: + return bytes(a ^ b for a, b in zip(first, second)) + + +class InvalidPinError(ValueError): + pass diff --git a/storage/tests/python/src/helpers.py b/storage/tests/python/src/helpers.py new file mode 100644 index 000000000..83ec34e2e --- /dev/null +++ b/storage/tests/python/src/helpers.py @@ -0,0 +1,45 @@ +import sys + +from . import consts + + +def expand_to_log_size(value: int) -> int: + result = 0 + for i in range(0, consts.PIN_LOG_SIZE, 4): + result = result | (value << i * 8) + return result + + +def to_int_by_words(array: bytes) -> int: + """ + Converts array of bytes into an int by reading word size + of bytes then converted to int using the system's endianness. + """ + assert len(array) % consts.WORD_SIZE == 0 + n = 0 + for i in range(0, len(array), consts.WORD_SIZE): + n = (n << (consts.WORD_SIZE * 8)) + int.from_bytes( + array[i : i + consts.WORD_SIZE], sys.byteorder + ) + return n + + +def to_bytes_by_words(n: int, length: int) -> bytes: + """ + Converting int back to bytes by words. + """ + mask = (1 << (consts.WORD_SIZE * 8)) - 1 + array = bytes() + for i in reversed(range(0, length, consts.WORD_SIZE)): + array = array + ((n >> (i * 8)) & mask).to_bytes( + consts.WORD_SIZE, sys.byteorder + ) + return array + + +def int_to_word(n: int) -> bytes: + return n.to_bytes(consts.WORD_SIZE, sys.byteorder) + + +def word_to_int(b: bytes) -> int: + return int.from_bytes(b, sys.byteorder) diff --git a/storage/tests/python/src/norcow.py b/storage/tests/python/src/norcow.py new file mode 100644 index 000000000..3a8baca33 --- /dev/null +++ b/storage/tests/python/src/norcow.py @@ -0,0 +1,174 @@ +import sys +from struct import pack + +from . import consts + + +def align4_int(i: int): + return (4 - i) % 4 + + +def align4_data(data): + return data + b"\x00" * align4_int(len(data)) + + +class Norcow: + def __init__(self): + self.sectors = None + + def init(self): + if self.sectors: + for sector in range(consts.NORCOW_SECTOR_COUNT): + if self.sectors[sector][:8] == consts.NORCOW_MAGIC_AND_VERSION: + self.active_sector = sector + self.active_offset = len(consts.NORCOW_MAGIC_AND_VERSION) + break + else: + self.wipe() + + def wipe(self, sector: int = 0): + self.sectors = [ + bytearray([0xFF] * consts.NORCOW_SECTOR_SIZE) + for _ in range(consts.NORCOW_SECTOR_COUNT) + ] + self.sectors[sector][:8] = consts.NORCOW_MAGIC_AND_VERSION + self.active_sector = sector + self.active_offset = len(consts.NORCOW_MAGIC_AND_VERSION) + + def get(self, key: int) -> bytes: + value, _ = self._find_item(key) + return value + + def set(self, key: int, val: bytes): + if key == consts.NORCOW_KEY_FREE: + raise RuntimeError("Norcow: key 0xFFFF is not allowed") + + found_value, pos = self._find_item(key) + if found_value is not False: + if self._is_updatable(found_value, val): + self._write(pos, key, val) + return + else: + self._delete_old(pos, found_value) + + if self.active_offset + 4 + len(val) > consts.NORCOW_SECTOR_SIZE: + self._compact() + + self._append(key, val) + + def delete(self, key: int): + if key == consts.NORCOW_KEY_FREE: + raise RuntimeError("Norcow: key 0xFFFF is not allowed") + + found_value, pos = self._find_item(key) + if found_value is False: + return False + self._delete_old(pos, found_value) + return True + + def replace(self, key: int, new_value: bytes) -> bool: + old_value, offset = self._find_item(key) + if not old_value: + raise RuntimeError("Norcow: key not found") + if len(old_value) != len(new_value): + raise RuntimeError( + "Norcow: replace works only with items of the same length" + ) + self._write(offset, key, new_value) + + def _is_updatable(self, old: bytes, new: bytes) -> bool: + """ + Item is updatable if the new value is the same or + it changes 1 to 0 only (the flash memory does not + allow to flip 0 to 1 unless you wipe it). + """ + if len(old) != len(new): + return False + if old == new: + return True + for a, b in zip(old, new): + if a & b != b: + return False + return True + + def _delete_old(self, pos: int, value: bytes): + wiped_data = b"\x00" * len(value) + self._write(pos, 0x0000, wiped_data) + + def _append(self, key: int, value: bytes): + self.active_offset += self._write(self.active_offset, key, value) + + def _write(self, pos: int, key: int, new_value: bytes) -> int: + data = pack(" consts.NORCOW_SECTOR_SIZE: + raise RuntimeError("Norcow: item too big") + self.sectors[self.active_sector][pos : pos + len(data)] = data + return len(data) + + def _find_item(self, key: int) -> (bytes, int): + offset = len(consts.NORCOW_MAGIC_AND_VERSION) + value = False + pos = offset + while True: + try: + k, v = self._read_item(offset) + if k == key: + value = v + pos = offset + except ValueError: + break + offset = offset + self._norcow_item_length(v) + return value, pos + + def _get_all_keys(self) -> (bytes, int): + offset = len(consts.NORCOW_MAGIC_AND_VERSION) + keys = set() + while True: + try: + k, v = self._read_item(offset) + keys.add(k) + except ValueError: + break + offset = offset + self._norcow_item_length(v) + return keys + + def _norcow_item_length(self, data: bytes) -> int: + # APP_ID, KEY_ID, LENGTH, DATA, ALIGNMENT + return 1 + 1 + 2 + len(data) + align4_int(len(data)) + + def _read_item(self, offset: int) -> (int, bytes): + key = self.sectors[self.active_sector][offset : offset + 2] + key = int.from_bytes(key, sys.byteorder) + if key == consts.NORCOW_KEY_FREE: + raise ValueError("Norcow: no data on this offset") + length = self.sectors[self.active_sector][offset + 2 : offset + 4] + length = int.from_bytes(length, sys.byteorder) + value = self.sectors[self.active_sector][offset + 4 : offset + 4 + length] + return key, value + + def _compact(self): + offset = len(consts.NORCOW_MAGIC_AND_VERSION) + data = list() + while True: + try: + k, v = self._read_item(offset) + if k != 0x00: + data.append((k, v)) + except ValueError: + break + offset = offset + self._norcow_item_length(v) + sector = self.active_sector + self.wipe((sector + 1) % consts.NORCOW_SECTOR_COUNT) + for key, value in data: + self._append(key, value) + + def _set_sectors(self, data): + if list(map(len, data)) != [ + consts.NORCOW_SECTOR_SIZE, + consts.NORCOW_SECTOR_SIZE, + ]: + raise RuntimeError("Norcow: set_sectors called with invalid data length") + self.sectors = [bytearray(sector) for sector in data] + + def _dump(self): + return [bytes(sector) for sector in self.sectors] diff --git a/storage/tests/python/src/pin_log.py b/storage/tests/python/src/pin_log.py new file mode 100644 index 000000000..0416baaeb --- /dev/null +++ b/storage/tests/python/src/pin_log.py @@ -0,0 +1,123 @@ +from . import consts, helpers, prng + + +class PinLog: + def __init__(self, norcow): + self.norcow = norcow + + def init(self): + guard_key = self._generate_guard_key() + guard_mask, guard = self.derive_guard_mask_and_value(guard_key) + + pin_success_log = (~guard_mask & consts.ALL_FF_LOG) | guard + pin_entry_log = (~guard_mask & consts.ALL_FF_LOG) | guard + self._write_log(guard_key, pin_success_log, pin_entry_log) + + def derive_guard_mask_and_value(self, guard_key: int) -> (int, int): + if guard_key > 0xFFFFFFFF: + raise ValueError("Invalid guard key") + + guard_mask = ((guard_key & consts.LOW_MASK) << 1) | ( + (~guard_key & 0xFFFFFFFF) & consts.LOW_MASK + ) + guard = (((guard_key & consts.LOW_MASK) << 1) & guard_key) | ( + ((~guard_key & 0xFFFFFFFF) & consts.LOW_MASK) & (guard_key >> 1) + ) + return helpers.expand_to_log_size(guard_mask), helpers.expand_to_log_size(guard) + + def write_attempt(self): + guard_key, pin_success_log, pin_entry_log = self._get_logs() + guard_mask, guard = self.derive_guard_mask_and_value(guard_key) + assert (pin_entry_log & guard_mask) == guard + + clean_pin_entry_log = self.remove_guard_bits(guard_mask, pin_entry_log) + clean_pin_entry_log = clean_pin_entry_log >> 2 # set 11 to 00 + pin_entry_log = ( + clean_pin_entry_log & (~guard_mask & consts.ALL_FF_LOG) + ) | guard + + self._write_log(guard_key, pin_success_log, pin_entry_log) + + def write_success(self): + guard_key, pin_success_log, pin_entry_log = self._get_logs() + pin_success_log = pin_entry_log + + self._write_log(guard_key, pin_success_log, pin_entry_log) + + def get_failures_count(self) -> int: + guard_key, pin_succes_log, pin_entry_log = self._get_logs() + guard_mask, _ = self.derive_guard_mask_and_value(guard_key) + + pin_succes_log = self.remove_guard_bits(guard_mask, pin_succes_log) + pin_entry_log = self.remove_guard_bits(guard_mask, pin_entry_log) + + # divide by two because bits are doubled after remove_guard_bits() + return bin(pin_succes_log - pin_entry_log).count("1") // 2 + + def remove_guard_bits(self, guard_mask: int, log: int) -> int: + """ + Removes all guard bits and replaces each guard bit + with its neighbour value. + Example: 0g0gg1 -> 000011 + """ + log = log & (~guard_mask & consts.ALL_FF_LOG) + log = ((log >> 1) | log) & helpers.expand_to_log_size(consts.LOW_MASK) + log = log | (log << 1) + return log + + def _generate_guard_key(self) -> int: + while True: + r = prng.random_uniform(consts.GUARD_KEY_RANDOM_MAX) + r = (r * consts.GUARD_KEY_MODULUS + consts.GUARD_KEY_REMAINDER) & 0xFFFFFFFF + if self._check_guard_key(r): + return r + + def _check_guard_key(self, guard_key: int) -> bool: + """ + Checks if guard_key is congruent to 15 modulo 6311 and + some other conditions, see the docs. + """ + count = (guard_key & 0x22222222) + ((guard_key >> 2) & 0x22222222) + count = count + (count >> 4) + + zero_runs = ~guard_key & 0xFFFFFFFF + zero_runs = zero_runs & (zero_runs >> 2) + zero_runs = zero_runs & (zero_runs >> 1) + zero_runs = zero_runs & (zero_runs >> 1) + one_runs = guard_key + one_runs = one_runs & (one_runs >> 2) + one_runs = one_runs & (one_runs >> 1) + one_runs = one_runs & (one_runs >> 1) + + return ( + ((count & 0x0E0E0E0E) == 0x04040404) + & (one_runs == 0) + & (zero_runs == 0) + & (guard_key % consts.GUARD_KEY_MODULUS == consts.GUARD_KEY_REMAINDER) + ) + + def _get_logs(self) -> (int, int, int): + pin_log = self.norcow.get(consts.PIN_LOG_KEY) + guard_key = pin_log[: consts.PIN_LOG_GUARD_KEY_SIZE] + guard_key = helpers.word_to_int(guard_key) + guard_mask, guard = self.derive_guard_mask_and_value(guard_key) + pin_entry_log = pin_log[consts.PIN_LOG_GUARD_KEY_SIZE + consts.PIN_LOG_SIZE :] + pin_entry_log = helpers.to_int_by_words(pin_entry_log) + pin_success_log = pin_log[ + consts.PIN_LOG_GUARD_KEY_SIZE : consts.PIN_LOG_GUARD_KEY_SIZE + + consts.PIN_LOG_SIZE + ] + pin_success_log = helpers.to_int_by_words(pin_success_log) + + return guard_key, pin_success_log, pin_entry_log + + def _write_log(self, guard_key: int, pin_success_log: int, pin_entry_log: int): + pin_log = ( + helpers.int_to_word(guard_key) + + helpers.to_bytes_by_words(pin_success_log, consts.PIN_LOG_SIZE) + + helpers.to_bytes_by_words(pin_entry_log, consts.PIN_LOG_SIZE) + ) + try: + self.norcow.replace(consts.PIN_LOG_KEY, pin_log) + except RuntimeError: + self.norcow.set(consts.PIN_LOG_KEY, pin_log) diff --git a/storage/tests/python/src/prng.py b/storage/tests/python/src/prng.py new file mode 100644 index 000000000..6461bfccc --- /dev/null +++ b/storage/tests/python/src/prng.py @@ -0,0 +1,34 @@ +import sys + +seed = 0 + + +def random_buffer(length: int) -> bytes: + length = length + if length % 4 != 0: + raise ValueError("Use only for whole words (multiples of 4 bytes)") + b = bytearray(length) + for i in range(length): + if i % 4 == 0: + rand = random32().to_bytes(4, sys.byteorder) + b[i] = rand[i % 4] + return bytes(b) + + +def random_reseed(reseed: int = 0): + global seed + seed = reseed + + +def random32(): + global seed + seed = (1664525 * seed + 1013904223) & 0xFFFFFFFF + return seed + + +def random_uniform(n: int): + max = 0xFFFFFFFF - (0xFFFFFFFF % n) + while True: + x = random32() + if x < max: + return x // (max // n) diff --git a/storage/tests/python/src/storage.py b/storage/tests/python/src/storage.py new file mode 100644 index 000000000..e223d287f --- /dev/null +++ b/storage/tests/python/src/storage.py @@ -0,0 +1,237 @@ +import hashlib +import sys + +from . import consts, crypto, helpers, prng +from .norcow import Norcow +from .pin_log import PinLog + + +class Storage: + def __init__(self): + self.initialized = False + self.unlocked = False + self.dek = None + self.sak = None + self.nc = Norcow() + self.pin_log = PinLog(self.nc) + + def init(self, hardware_salt: bytes = b""): + """ + Initializes storage. Normally we would check if EDEK is already present, + but we simplify things in the python version and suppose we are starting + a new storage each time. + """ + self.nc.init() + self.initialized = True + self.hw_salt_hash = hashlib.sha256(hardware_salt).digest() + + edek_esak_pvc = self.nc.get(consts.EDEK_ESEK_PVC_KEY) + if not edek_esak_pvc: + self._init_pin() + + def _init_pin(self): + """ + Initalizes PIN counters, generates random + Data Encryption Key and Storage Authentication Key + """ + self.dek = prng.random_buffer(consts.DEK_SIZE) + self.sak = prng.random_buffer(consts.SAK_SIZE) + + self.nc.set(consts.SAT_KEY, crypto.init_hmacs(self.sak)) + self._set_encrypt(consts.VERSION_KEY, b"\x01\x00\x00\x00") + self.pin_log.init() + self._set_pin(consts.PIN_EMPTY) + self.unlocked = False + + def _set_pin(self, pin: int): + random_salt = prng.random_buffer(consts.PIN_SALT_SIZE) + salt = self.hw_salt_hash + random_salt + kek, keiv = crypto.derive_kek_keiv(salt, pin) + + # Encrypted Data Encryption Key and Encrypted Storage Authentication Key + edek_esak, tag = crypto.chacha_poly_encrypt(kek, keiv, self.dek + self.sak) + # Pin Verification Code + pvc = tag[: consts.PVC_SIZE] + + self.nc.set(consts.EDEK_ESEK_PVC_KEY, random_salt + edek_esak + pvc) + if pin == consts.PIN_EMPTY: + self._set_bool(consts.PIN_NOT_SET_KEY, True) + else: + self._set_bool(consts.PIN_NOT_SET_KEY, False) + + def wipe(self): + self.nc.wipe() + self._init_pin() + + def check_pin(self, pin: int) -> bool: + self.pin_log.write_attempt() + + data = self.nc.get(consts.EDEK_ESEK_PVC_KEY) + salt = self.hw_salt_hash + data[: consts.PIN_SALT_SIZE] + edek_esak = data[consts.PIN_SALT_SIZE : -consts.PVC_SIZE] + pvc = data[-consts.PVC_SIZE :] + + try: + dek, sak = crypto.decrypt_edek_esak(pin, salt, edek_esak, pvc) + self.pin_log.write_success() + self.dek = dek + self.sak = sak + return True + except crypto.InvalidPinError: + fails = self.pin_log.get_failures_count() + if fails >= consts.PIN_MAX_TRIES: + self.wipe() + return False + + def lock(self) -> None: + self.unlocked = False + + def unlock(self, pin: int) -> bool: + if not self.initialized or not self.check_pin(pin): + return False + + version = self._decrypt(consts.VERSION_KEY) + if version != consts.NORCOW_VERSION: + return False + + self.unlocked = True + return True + + def has_pin(self) -> bool: + val = self.nc.get(consts.PIN_NOT_SET_KEY) + return val != consts.TRUE_BYTE + + def get_pin_rem(self) -> int: + return consts.PIN_MAX_TRIES - self.pin_log.get_failures_count() + + def change_pin(self, oldpin: int, newpin: int) -> bool: + if not self.initialized or not self.unlocked: + return False + if not self.check_pin(oldpin): + return False + self._set_pin(newpin) + return True + + def get(self, key: int) -> bytes: + app = key >> 8 + if not self.initialized or consts.is_app_private(app): + raise RuntimeError("Storage not initialized or app is private") + if not self.unlocked and not consts.is_app_public(app): + # public fields can be read from an unlocked device + raise RuntimeError("Storage locked") + if consts.is_app_public(app): + return self.nc.get(key) + return self._get_encrypted(key) + + def set(self, key: int, val: bytes) -> bool: + app = key >> 8 + self._check_lock(app) + if consts.is_app_public(app): + return self.nc.set(key, val) + return self._set_encrypt(key, val) + + def set_counter(self, key: int, val: int): + app = key >> 8 + if not consts.is_app_public(app): + raise RuntimeError("Counter can be set only for public items") + counter = val.to_bytes(4, sys.byteorder) + bytearray( + b"\xFF" * consts.COUNTER_TAIL_SIZE + ) + self.set(key, counter) + + def next_counter(self, key: int) -> int: + app = key >> 8 + self._check_lock(app) + + current = self.get(key) + if current is False: + self.set_counter(key, 0) + return 0 + + base = int.from_bytes(current[:4], sys.byteorder) + tail = helpers.to_int_by_words(current[4:]) + tail_count = "{0:064b}".format(tail).count("0") + increased_count = base + tail_count + 1 + + if tail_count == consts.COUNTER_MAX_TAIL: + self.set_counter(key, increased_count) + return increased_count + + self.set( + key, + current[:4] + + helpers.to_bytes_by_words(tail >> 1, consts.COUNTER_TAIL_SIZE), + ) + return increased_count + + def delete(self, key: int) -> bool: + app = key >> 8 + self._check_lock(app) + ret = self.nc.delete(key) + if consts.is_app_protected(app): + sat = self._calculate_authentication_tag() + self.nc.set(consts.SAT_KEY, sat) + return ret + + def _check_lock(self, app: int): + if not self.initialized or consts.is_app_private(app): + raise RuntimeError("Storage not initialized or app is private") + if not self.unlocked and not consts.is_app_lock_writable(app): + raise RuntimeError("Storage locked and app is not public-writable") + + def _get_encrypted(self, key: int) -> bytes: + if not consts.is_app_protected(key): + raise RuntimeError("Only protected values are encrypted") + sat = self.nc.get(consts.SAT_KEY) + if not sat: + raise RuntimeError("SAT not found") + if sat != self._calculate_authentication_tag(): + raise RuntimeError("Storage authentication tag mismatch") + return self._decrypt(key) + + def _decrypt(self, key: int) -> bytes: + data = self.nc.get(key) + iv = data[: consts.CHACHA_IV_SIZE] + # cipher text with MAC + tag = data[ + consts.CHACHA_IV_SIZE : consts.CHACHA_IV_SIZE + consts.POLY1305_MAC_SIZE + ] + ciphertext = data[consts.CHACHA_IV_SIZE + consts.POLY1305_MAC_SIZE :] + return crypto.chacha_poly_decrypt( + self.dek, key, iv, ciphertext + tag, key.to_bytes(2, sys.byteorder) + ) + + def _set_encrypt(self, key: int, val: bytes): + # In C, data are preallocated beforehand for encrypted values, + # to match the behaviour we do the same. + preallocate = b"\xFF" * ( + consts.CHACHA_IV_SIZE + len(val) + consts.POLY1305_MAC_SIZE + ) + self.nc.set(key, preallocate) + if consts.is_app_protected(key >> 8): + sat = self._calculate_authentication_tag() + self.nc.set(consts.SAT_KEY, sat) + + iv = prng.random_buffer(consts.CHACHA_IV_SIZE) + cipher_text, tag = crypto.chacha_poly_encrypt( + self.dek, iv, val, key.to_bytes(2, sys.byteorder) + ) + return self.nc.replace(key, iv + tag + cipher_text) + + def _calculate_authentication_tag(self) -> bytes: + keys = [] + for key in self.nc._get_all_keys(): + if consts.is_app_protected(key >> 8): + keys.append(key.to_bytes(2, sys.byteorder)) + if not keys: + return crypto.init_hmacs(self.sak) + return crypto.calculate_hmacs(self.sak, keys) + + def _set_bool(self, key: int, val: bool) -> bool: + if val: + return self.nc.set(key, consts.TRUE_BYTE) + # False is stored as an empty value + return self.nc.set(key, consts.FALSE_BYTE) + + def _dump(self) -> bytes: + return self.nc._dump() diff --git a/storage/tests/python/tests/__init__.py b/storage/tests/python/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/storage/tests/python/tests/common.py b/storage/tests/python/tests/common.py new file mode 100644 index 000000000..e1497054a --- /dev/null +++ b/storage/tests/python/tests/common.py @@ -0,0 +1,2 @@ +def all_ff_bytes(data: bytes): + return all(i == 0xFF for i in data) diff --git a/storage/tests/python/tests/conftest.py b/storage/tests/python/tests/conftest.py new file mode 100644 index 000000000..9c908e2d1 --- /dev/null +++ b/storage/tests/python/tests/conftest.py @@ -0,0 +1,8 @@ +from ..src import prng + + +def pytest_runtest_teardown(item): + """ + Called after each test ran to reset the PRNG + """ + prng.seed = 0 diff --git a/storage/tests/python/tests/test_helpers.py b/storage/tests/python/tests/test_helpers.py new file mode 100644 index 000000000..4e5a58f71 --- /dev/null +++ b/storage/tests/python/tests/test_helpers.py @@ -0,0 +1,13 @@ +from ..src import consts, helpers + + +def test_read_bytes_by_words(): + array = b"\x04\x03\x02\x01\x08\x07\x06\x05" + n = helpers.to_int_by_words(array) + assert n == 0x0102030405060708 + assert array == helpers.to_bytes_by_words(n, consts.PIN_LOG_SIZE)[56:] + + array = b"\xFF\xFF\xFF\x01\x01\x05\x09\x01" + n = helpers.to_int_by_words(array) + assert n == 0x01FFFFFF01090501 + assert array == helpers.to_bytes_by_words(n, consts.PIN_LOG_SIZE)[56:] diff --git a/storage/tests/python/tests/test_norcow.py b/storage/tests/python/tests/test_norcow.py new file mode 100644 index 000000000..78a6819c7 --- /dev/null +++ b/storage/tests/python/tests/test_norcow.py @@ -0,0 +1,155 @@ +import pytest + +from . import common +from ..src import consts, norcow + + +def test_norcow_set(): + n = norcow.Norcow() + n.init() + n.set(0x0001, b"123") + data = n._dump()[0][:256] + assert data[:8] == b"NRC2\xFE\xFF\xFF\xFF" + assert data[8:10] == b"\x01\x00" # app + key + assert data[10:12] == b"\x03\x00" # length + assert data[12:15] == b"123" # data + assert common.all_ff_bytes(data[16:]) + + n.wipe() + n.set(0x0901, b"hello") + data = n._dump()[0][:256] + assert data[:8] == b"NRC2\xFE\xFF\xFF\xFF" + assert data[8:10] == b"\x01\x09" # app + key + assert data[10:12] == b"\x05\x00" # length + assert data[12:17] == b"hello" # data + assert data[17:20] == b"\x00\x00\x00" # alignment + assert common.all_ff_bytes(data[20:]) + + offset = 20 + n.set(0x0102, b"world!") + data = n._dump()[0][:256] + assert data[offset : offset + 2] == b"\x02\x01" # app + key + assert data[offset + 2 : offset + 4] == b"\x06\x00" # length + assert data[offset + 4 : offset + 10] == b"world!" # data + assert data[offset + 10 : offset + 12] == b"\x00\x00" # alignment + assert common.all_ff_bytes(data[offset + 12 :]) + + +def test_norcow_read_item(): + n = norcow.Norcow() + n.init() + n.set(0x0001, b"123") + n.set(0x0002, b"456") + n.set(0x0101, b"789") + key, value = n._read_item(16) + assert key == 0x0002 + assert value == b"456" + key, value = n._read_item(24) + assert key == 0x0101 + assert value == b"789" + + with pytest.raises(ValueError) as e: + key, value = n._read_item(204) + assert "no data" in str(e) + + +def test_norcow_get_item(): + n = norcow.Norcow() + n.init() + n.set(0x0001, b"123") + n.set(0x0002, b"456") + n.set(0x0101, b"789") + value = n.get(0x0001) + assert value == b"123" + assert ( + n._dump()[0][:40].hex() + == "4e524332feffffff010003003132330002000300343536000101030037383900ffffffffffffffff" + ) + + # replacing item with the same value (update) + n.set(0x0101, b"789") + value = n.get(0x0101) + assert value == b"789" + assert ( + n._dump()[0][:40].hex() + == "4e524332feffffff010003003132330002000300343536000101030037383900ffffffffffffffff" + ) + + # replacing item with value with less 1 bits than before (update) + n.set(0x0101, b"788") + value = n.get(0x0101) + assert value == b"788" + assert ( + n._dump()[0][:40].hex() + == "4e524332feffffff010003003132330002000300343536000101030037383800ffffffffffffffff" + ) + + # replacing item with value with more 1 bits than before (wipe and new entry) + n.set(0x0101, b"787") + value = n.get(0x0101) + assert value == b"787" + assert ( + n._dump()[0][:44].hex() + == "4e524332feffffff0100030031323300020003003435360000000300000000000101030037383700ffffffff" + ) + + n.set(0x0002, b"world") + n.set(0x0002, b"earth") + value = n.get(0x0002) + assert value == b"earth" + + +def test_norcow_replace_item(): + n = norcow.Norcow() + n.init() + n.set(0x0001, b"123") + n.set(0x0002, b"456") + n.set(0x0101, b"789") + value = n.get(0x0002) + assert value == b"456" + + n.replace(0x0001, b"000") + value = n.get(0x0001) + assert value == b"000" + + n.replace(0x0002, b"111") + value = n.get(0x0002) + assert value == b"111" + value = n.get(0x0001) + assert value == b"000" + value = n.get(0x0101) + assert value == b"789" + + with pytest.raises(RuntimeError) as e: + n.replace(0x0001, b"00000") + assert "same length" in str(e) + + +def test_norcow_compact(): + n = norcow.Norcow() + n.init() + n.set(0x0101, b"ahoj") + n.set(0x0101, b"a" * (consts.NORCOW_SECTOR_SIZE - 100)) + n.set(0x0101, b"hello") + + n.set(0x0103, b"123456789x") + n.set(0x0104, b"123456789x") + n.set(0x0105, b"123456789x") + n.set(0x0106, b"123456789x") + mem = n._dump() + assert mem[0][:8] == consts.NORCOW_MAGIC_AND_VERSION + assert mem[0][200:300] == b"\x00" * 100 + + # compact is triggered + n.set(0x0107, b"123456789x") + mem = n._dump() + # assert the other sector is active + assert mem[1][:8] == consts.NORCOW_MAGIC_AND_VERSION + # assert the deleted item was not copied + assert mem[0][200:300] == b"\xff" * 100 + + n.set(0x0108, b"123456789x") + n.set(0x0109, b"123456789x") + + assert n.get(0x0101) == b"hello" + assert n.get(0x0103) == b"123456789x" diff --git a/storage/tests/python/tests/test_pin.py b/storage/tests/python/tests/test_pin.py new file mode 100644 index 000000000..a6f517922 --- /dev/null +++ b/storage/tests/python/tests/test_pin.py @@ -0,0 +1,28 @@ +from ..src.storage import Storage + + +def test_set_pin_success(): + s = Storage() + hw_salt = b"\x00\x00\x00\x00\x00\x00" + s.init(hw_salt) + s._set_pin(1) + assert s.unlock(1) + + s = Storage() + s.init(hw_salt) + s._set_pin(229922) + assert s.unlock(229922) + + +def test_set_pin_failure(): + s = Storage() + hw_salt = b"\x00\x00\x00\x00\x00\x00" + s.init(hw_salt) + s._set_pin(1) + assert s.unlock(1) + assert not s.unlock(1234) + + s = Storage() + s.init(hw_salt) + s._set_pin(229922) + assert not s.unlock(1122992211) diff --git a/storage/tests/python/tests/test_pin_log.py b/storage/tests/python/tests/test_pin_log.py new file mode 100644 index 000000000..5ef72c521 --- /dev/null +++ b/storage/tests/python/tests/test_pin_log.py @@ -0,0 +1,12 @@ +from ..src import pin_log + + +def test_generate_guard_key(): + p = pin_log.PinLog(None) + + assert p._generate_guard_key() == 2267428717 + assert p._generate_guard_key() == 1653399972 + assert p._check_guard_key(2267428717) + assert p._check_guard_key(1653399972) + assert p._check_guard_key(3706993061) + assert p._check_guard_key(3593237286) diff --git a/storage/tests/python/tests/test_prng.py b/storage/tests/python/tests/test_prng.py new file mode 100644 index 000000000..5d2ae8e43 --- /dev/null +++ b/storage/tests/python/tests/test_prng.py @@ -0,0 +1,11 @@ +from ..src import prng + + +def test_prng(): + buf = prng.random_buffer(4) + assert buf == b"\x5f\xf3\x6e\x3c" + buf = prng.random_buffer(4) + assert buf == b"\x32\x29\x50\x47" + + buf = prng.random_buffer(8) + assert buf == b"\xe9\xf6\xcc\xd1\x34\x53\xf9\xaa" diff --git a/storage/tests/test.py b/storage/tests/test.py new file mode 100755 index 000000000..982766ba4 --- /dev/null +++ b/storage/tests/test.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 + +from c.storage import Storage as StorageC +from c0.storage import Storage as StorageC0 +from python.src.storage import Storage as StoragePy + +from hashlib import sha256 + + +def hash(data): + return sha256(data).hexdigest()[:16] + + +# Strings for testing ChaCha20 encryption. +test_strings = [b"Short string.", b"", b"Although ChaCha20 is a stream cipher, it operates on blocks of 64 bytes. This string is over 152 bytes in length so that we test multi-block encryption.", b"This string is exactly 64 bytes long, that is exactly one block."] + +# Unique device ID for testing. +uid = b"\x67\xce\x6a\xe8\xf7\x9b\x73\x96\x83\x88\x21\x5e" + +sc = StorageC() +sp = StoragePy() +a = [] + +for s in [sc, sp]: + print(s.__class__) + s.init(uid) + assert s.unlock(3) == False + assert s.unlock(1) == True + s.set(0xbeef, b"hello") + s.set(0x03fe, b"world!") + s.set(0xbeef, b"satoshi") + s.set(0xbeef, b"Satoshi") + for value in test_strings: + s.set(0x0301, value) + assert s.get(0x0301) == value + d = s._dump() + print(d[0][:512].hex()) + h = [hash(x) for x in d] + print(h) + a.append(h[0]) + a.append(h[1]) + print() + +print("-------------") +print("Equals:", a[0] == a[2] and a[1] == a[3]) diff --git a/storage/tests/tests/__init__.py b/storage/tests/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/storage/tests/tests/common.py b/storage/tests/tests/common.py new file mode 100644 index 000000000..c66ea8091 --- /dev/null +++ b/storage/tests/tests/common.py @@ -0,0 +1,23 @@ +from c.storage import Storage as StorageC +from python.src import prng +from python.src.storage import Storage as StoragePy + +test_uid = b"\x67\xce\x6a\xe8\xf7\x9b\x73\x96\x83\x88\x21\x5e" + + +def init( + unlock: bool = False, reseed: int = 0, uid: int = test_uid +) -> (StorageC, StoragePy): + sc = StorageC() + sp = StoragePy() + sc.lib.random_reseed(reseed) + prng.random_reseed(reseed) + for s in (sc, sp): + s.init(uid) + if unlock: + assert s.unlock(1) + return sc, sp + + +def memory_equals(sc, sp) -> bool: + return sc._dump() == sp._dump() diff --git a/storage/tests/tests/storage_model.py b/storage/tests/tests/storage_model.py new file mode 100644 index 000000000..06eedc460 --- /dev/null +++ b/storage/tests/tests/storage_model.py @@ -0,0 +1,68 @@ +# Logical storage model used for testing. + + +class StorageModel: + _EMPTY_PIN = 1 + _PIN_MAX_TRIES = 16 + + def __init__(self) -> None: + self.wipe() + + def init(self, salt: bytes) -> None: + self.unlocked = False + + def wipe(self) -> None: + self.unlocked = False + self.pin = 1 + self.pin_rem = self._PIN_MAX_TRIES + self.dict = {} + + def lock(self) -> None: + self.unlocked = False + + def unlock(self, pin: int) -> bool: + if pin == self.pin: + self.pin_rem = self._PIN_MAX_TRIES + self.unlocked = True + return True + else: + self.pin_rem -= 1 + if self.pin_rem <= 0: + self.wipe() + return False + + def has_pin(self) -> bool: + return self.pin != self._EMPTY_PIN + + def get_pin_rem(self) -> int: + return self.pin_rem + + def change_pin(self, oldpin: int, newpin: int) -> bool: + if self.unlocked and self.unlock(oldpin): + self.pin = newpin + return True + else: + return False + + def get(self, key: int) -> bytes: + if (key & 0x8000 != 0 or self.unlocked) and self.dict.get(key) is not None: + return self.dict[key] + raise RuntimeError("Failed to find key in storage.") + + def set(self, key: int, val: bytes) -> None: + if self.unlocked: + self.dict[key] = val + else: + raise RuntimeError("Failed to set value in storage.") + + def delete(self, key: int) -> bool: + if not self.unlocked: + return False + try: + self.dict.pop(key) + except KeyError: + return False + return True + + def __iter__(self): + return iter(self.dict.items()) diff --git a/storage/tests/tests/test_compact.py b/storage/tests/tests/test_compact.py new file mode 100644 index 000000000..671b5d659 --- /dev/null +++ b/storage/tests/tests/test_compact.py @@ -0,0 +1,32 @@ +import pytest + +from python.src import consts + +from . import common + + +def test_compact(): + sc, sp = common.init(unlock=True) + for s in (sc, sp): + s.set(0xBEEF, b"hello") + s.set(0xBEEF, b"asdasdasdasd") + s.set(0xBEEF, b"fsdasdasdasdasdsadasdsadasdasd") + s.set(0x0101, b"a" * (consts.NORCOW_SECTOR_SIZE - 600)) + s.set(0x03FE, b"world!") + s.set(0x04FE, b"world!xfffffffffffffffffffffffffffff") + s.set(0x05FE, b"world!affffffffffffffffffffffffffffff") + s.set(0x0101, b"s") + s.set(0x06FE, b"world!aaaaaaaaaaaaaaaaaaaaaaaaab") + s.set(0x07FE, b"worxxxxxxxxxxxxxxxxxx") + s.set(0x09EE, b"worxxxxxxxxxxxxxxxxxx") + assert common.memory_equals(sc, sp) + + sc, sp = common.init(unlock=True) + for s in (sc, sp): + s.set(0xBEEF, b"asdasdasdasd") + s.set(0xBEEF, b"fsdasdasdasdasdsadasdsadasdasd") + s.set(0x8101, b"a" * (consts.NORCOW_SECTOR_SIZE - 1000)) + with pytest.raises(RuntimeError): + s.set(0x0101, b"a" * (consts.NORCOW_SECTOR_SIZE - 100)) + s.set(0x0101, b"hello") + assert common.memory_equals(sc, sp) diff --git a/storage/tests/tests/test_pin.py b/storage/tests/tests/test_pin.py new file mode 100644 index 000000000..915deb4b3 --- /dev/null +++ b/storage/tests/tests/test_pin.py @@ -0,0 +1,66 @@ +import pytest + +from python.src import consts + +from . import common + + +def test_init_pin(): + sc, sp = common.init(uid=b"\x00\x00\x00\x00\x00\x00") + assert common.memory_equals(sc, sp) + + sc, sp = common.init(uid=b"\x22\x00\xDD\x00\x00\xBE") + assert common.memory_equals(sc, sp) + + +def test_change_pin(): + sc, sp = common.init(unlock=True) + for s in (sc, sp): + assert s.change_pin(1, 2221) + # invalid PIN + assert not s.change_pin(99991, 1) + assert s.unlock(2221) + assert s.change_pin(2221, 999991) + assert s.change_pin(999991, 991) + assert s.unlock(991) + assert not s.unlock(99991) + + assert common.memory_equals(sc, sp) + + +def test_has_pin(): + sc, sp = common.init() + for s in (sc, sp): + assert not s.has_pin() + assert s.unlock(1) + assert not s.has_pin() + assert s.change_pin(1, 221) + assert s.has_pin() + assert s.change_pin(221, 1) + assert not s.has_pin() + + +def test_wipe_after_max_pin(): + sc, sp = common.init(unlock=True) + for s in (sc, sp): + assert s.change_pin(1, 2221) + assert s.unlock(2221) + s.set(0x0202, b"Hello") + + # try an invalid PIN MAX - 1 times + for i in range(consts.PIN_MAX_TRIES - 1): + assert not s.unlock(99991) + # this should pass + assert s.unlock(2221) + assert s.get(0x0202) == b"Hello" + + # try an invalid PIN MAX times, the storage should get wiped + for i in range(consts.PIN_MAX_TRIES): + assert not s.unlock(99991) + assert i == consts.PIN_MAX_TRIES - 1 + # this should return False and raise an exception, the storage is wiped + assert not s.unlock(2221) + with pytest.raises(RuntimeError): + assert s.get(0x0202) == b"Hello" + + assert common.memory_equals(sc, sp) diff --git a/storage/tests/tests/test_random.py b/storage/tests/tests/test_random.py new file mode 100644 index 000000000..01f4f71c6 --- /dev/null +++ b/storage/tests/tests/test_random.py @@ -0,0 +1,83 @@ +import hypothesis.strategies as st +from hypothesis import assume, settings +from hypothesis.stateful import Bundle, RuleBasedStateMachine, invariant, rule + +from . import common +from .storage_model import StorageModel + + +class StorageComparison(RuleBasedStateMachine): + def __init__(self): + super(StorageComparison, self).__init__() + self.sc, self.sp = common.init(unlock=True) + self.sm = StorageModel() + self.sm.init(b"") + self.sm.unlock(1) + self.storages = (self.sc, self.sp, self.sm) + + keys = Bundle("keys") + values = Bundle("values") + pins = Bundle("pins") + + @rule(target=keys, app=st.integers(1, 0xFF), key=st.integers(0, 0xFF)) + def k(self, app, key): + return (app << 8) | key + + @rule(target=values, v=st.binary(min_size=0, max_size=10000)) + def v(self, v): + return v + + @rule(target=pins, p=st.integers(1, 3)) + def p(self, p): + return p + + @rule(k=keys, v=values) + def set(self, k, v): + assume(k != 0xFFFF) + for s in self.storages: + s.set(k, v) + + @rule(k=keys) + def delete(self, k): + assume(k != 0xFFFF) + assert len(set(s.delete(k) for s in self.storages)) == 1 + + @rule(p=pins) + def check_pin(self, p): + assert len(set(s.unlock(p) for s in self.storages)) == 1 + self.ensure_unlocked() + + @rule(oldpin=pins, newpin=pins) + def change_pin(self, oldpin, newpin): + assert len(set(s.change_pin(oldpin, newpin) for s in self.storages)) == 1 + self.ensure_unlocked() + + @rule() + def lock(self): + for s in self.storages: + s.lock() + self.ensure_unlocked() + + @invariant() + def values_agree(self): + for k, v in self.sm: + assert self.sc.get(k) == v + + @invariant() + def dumps_agree(self): + assert self.sc._dump() == self.sp._dump() + + @invariant() + def pin_counters_agree(self): + assert len(set(s.get_pin_rem() for s in self.storages)) == 1 + + def ensure_unlocked(self): + if not self.sm.unlocked: + for s in self.storages: + assert s.unlock(self.sm.pin) + + +TestStorageComparison = StorageComparison.TestCase +TestStorageComparison.settings = settings( + deadline=2000, max_examples=30, stateful_step_count=50 +) diff --git a/storage/tests/tests/test_random_upgrade.py b/storage/tests/tests/test_random_upgrade.py new file mode 100644 index 000000000..4aaa1a507 --- /dev/null +++ b/storage/tests/tests/test_random_upgrade.py @@ -0,0 +1,73 @@ +import hypothesis.strategies as st +from hypothesis import assume, settings +from hypothesis.stateful import Bundle, RuleBasedStateMachine, invariant, rule + +from c0.storage import Storage as StorageC0 +from c.storage import Storage as StorageC + +from . import common +from .storage_model import StorageModel + + +class StorageUpgrade(RuleBasedStateMachine): + def __init__(self): + super(StorageUpgrade, self).__init__() + self.sc = StorageC0() + self.sc.init() + self.sm = StorageModel() + self.sm.init(common.test_uid) + self.storages = (self.sc, self.sm) + self.ensure_unlocked() + + keys = Bundle("keys") + values = Bundle("values") + pins = Bundle("pins") + + @rule(target=keys, app=st.integers(1, 0xFF), key=st.integers(0, 0xFF)) + def k(self, app, key): + return (app << 8) | key + + @rule(target=values, v=st.binary(min_size=0, max_size=10000)) + def v(self, v): + return v + + @rule(target=pins, p=st.integers(1, 3)) + def p(self, p): + return p + + @rule(k=keys, v=values) + def set(self, k, v): + assume(k != 0xFFFF) + for s in self.storages: + s.set(k, v) + + @rule(p=pins) + def check_pin(self, p): + assert self.sm.unlock(p) == self.sc.check_pin(p) + self.ensure_unlocked() + + @rule(oldpin=pins, newpin=pins) + def change_pin(self, oldpin, newpin): + assert self.sm.change_pin(oldpin, newpin) == self.sc.change_pin(oldpin, newpin) + self.ensure_unlocked() + + @invariant() + def check_upgrade(self): + sc1 = StorageC() + sc1._set_flash_buffer(self.sc._get_flash_buffer()) + sc1.init(common.test_uid) + assert self.sm.get_pin_rem() == sc1.get_pin_rem() + assert sc1.unlock(self.sm.pin) + for k, v in self.sm: + assert sc1.get(k) == v + + def ensure_unlocked(self): + if not self.sm.unlocked: + for s in self.storages: + assert s.unlock(self.sm.pin) + + +TestStorageUpgrade = StorageUpgrade.TestCase +TestStorageUpgrade.settings = settings( + deadline=None, max_examples=30, stateful_step_count=50 +) diff --git a/storage/tests/tests/test_set_get.py b/storage/tests/tests/test_set_get.py new file mode 100644 index 000000000..727bc20a2 --- /dev/null +++ b/storage/tests/tests/test_set_get.py @@ -0,0 +1,178 @@ +import pytest + +from . import common + +# Strings for testing ChaCha20 encryption. +chacha_strings = [ + b"Short string.", + b"", + b"Although ChaCha20 is a stream cipher, it operates on blocks of 64 bytes. This string is over 152 bytes in length so that we test multi-block encryption.", + b"This string is exactly 64 bytes long, that is exactly one block.", +] + + +def test_set_get(): + sc, sp = common.init(unlock=True) + for s in (sc, sp): + s.set(0xBEEF, b"Hello") + s.set(0xCAFE, b"world! ") + s.set(0xDEAD, b"How\n") + s.set(0xAAAA, b"are") + s.set(0x0901, b"you?") + s.set(0x0902, b"Lorem") + s.set(0x0903, b"ipsum") + s.set(0xDEAD, b"A\n") + s.set(0xDEAD, b"AAAAAAAAAAA") + s.set(0x2200, b"BBBB") + assert common.memory_equals(sc, sp) + + for s in (sc, sp): + s.change_pin(1, 2221) + s.change_pin(2221, 991) + s.set(0xAAAA, b"something else") + assert common.memory_equals(sc, sp) + + # check data are not changed by gets + datasc = sc._dump() + datasp = sp._dump() + + for s in (sc, sp): + assert s.get(0xAAAA) == b"something else" + assert s.get(0x0901) == b"you?" + assert s.get(0x0902) == b"Lorem" + assert s.get(0x0903) == b"ipsum" + assert s.get(0xDEAD) == b"AAAAAAAAAAA" + assert s.get(0x2200) == b"BBBB" + + assert datasc == sc._dump() + assert datasp == sp._dump() + + # test locked storage + for s in (sc, sp): + s.lock() + with pytest.raises(RuntimeError): + s.set(0xAAAA, b"test public") + with pytest.raises(RuntimeError): + s.set(0x0901, b"test protected") + with pytest.raises(RuntimeError): + s.get(0x0901) + assert s.get(0xAAAA) == b"something else" + + # check that storage functions after unlock + for s in (sc, sp): + s.unlock(991) + s.set(0xAAAA, b"public") + s.set(0x0902, b"protected") + assert s.get(0xAAAA) == b"public" + assert s.get(0x0902) == b"protected" + + # test delete + for s in (sc, sp): + assert s.delete(0x0902) + assert common.memory_equals(sc, sp) + + for s in (sc, sp): + assert not s.delete(0x7777) + assert not s.delete(0x0902) + assert common.memory_equals(sc, sp) + + +def test_invalid_key(): + for s in common.init(unlock=True): + with pytest.raises(RuntimeError): + s.set(0xFFFF, b"Hello") + + +def test_chacha_strings(): + sc, sp = common.init(unlock=True) + for s in (sc, sp): + for i, string in enumerate(chacha_strings): + s.set(0x0301 + i, string) + assert common.memory_equals(sc, sp) + + for s in (sc, sp): + for i, string in enumerate(chacha_strings): + assert s.get(0x0301 + i) == string + + +def test_set_repeated(): + test_strings = [[0x0501, b""], [0x0502, b"test"], [0x8501, b""], [0x8502, b"test"]] + sc, sp = common.init(unlock=True) + for s in (sc, sp): + for key, val in test_strings: + s.set(key, val) + s.set(key, val) + assert common.memory_equals(sc, sp) + + for s in (sc, sp): + for key, val in test_strings: + s.set(key, val) + assert common.memory_equals(sc, sp) + + for key, val in test_strings: + for s in (sc, sp): + assert s.delete(key) + assert common.memory_equals(sc, sp) + + +def test_set_similar(): + sc, sp = common.init(unlock=True) + for s in (sc, sp): + s.set(0xBEEF, b"Satoshi") + s.set(0xBEEF, b"satoshi") + assert common.memory_equals(sc, sp) + + for s in (sc, sp): + s.wipe() + s.unlock(1) + s.set(0xBEEF, b"satoshi") + s.set(0xBEEF, b"Satoshi") + assert common.memory_equals(sc, sp) + + for s in (sc, sp): + s.wipe() + s.unlock(1) + s.set(0xBEEF, b"satoshi") + s.set(0xBEEF, b"Satoshi") + s.set(0xBEEF, b"Satoshi") + s.set(0xBEEF, b"SatosHi") + s.set(0xBEEF, b"satoshi") + s.set(0xBEEF, b"satoshi\x00") + assert common.memory_equals(sc, sp) + + +def test_set_locked(): + sc, sp = common.init() + for s in (sc, sp): + with pytest.raises(RuntimeError): + s.set(0x0303, b"test") + with pytest.raises(RuntimeError): + s.set(0x8003, b"test") + assert common.memory_equals(sc, sp) + + for s in (sc, sp): + s.set(0xC001, b"Ahoj") + s.set(0xC003, b"test") + assert common.memory_equals(sc, sp) + + for s in (sc, sp): + s.get(0xC001) == b"Ahoj" + s.get(0xC003) == b"test" + + +def test_counter(): + sc, sp = common.init(unlock=True) + for i in range(0, 200): + for s in (sc, sp): + assert i == s.next_counter(0xC001) + assert common.memory_equals(sc, sp) + + for s in (sc, sp): + s.lock() + s.set_counter(0xC001, 500) + assert common.memory_equals(sc, sp) + + for i in range(501, 700): + for s in (sc, sp): + assert i == s.next_counter(0xC001) + assert common.memory_equals(sc, sp) diff --git a/storage/tests/tests/test_upgrade.py b/storage/tests/tests/test_upgrade.py new file mode 100644 index 000000000..a089438ca --- /dev/null +++ b/storage/tests/tests/test_upgrade.py @@ -0,0 +1,75 @@ +from c0.storage import Storage as StorageC0 +from c.storage import Storage as StorageC +from python.src.storage import Storage as StoragePy + +from . import common + +# Strings for testing ChaCha20 encryption. +chacha_strings = [ + b"Short string.", + b"", + b"Although ChaCha20 is a stream cipher, it operates on blocks of 64 bytes. This string is over 152 bytes in length so that we test multi-block encryption.", + b"This string is exactly 64 bytes long, that is exactly one block.", +] + + +def set_values(s): + s.set(0xBEEF, b"Hello") + s.set(0xCAFE, b"world! ") + s.set(0xDEAD, b"How\n") + s.change_pin(1, 1222) + s.set(0xAAAA, b"are") + s.set(0x0901, b"you?") + s.set(0x0902, b"Lorem") + s.set(0x0903, b"ipsum") + s.change_pin(1222, 199) + s.set(0xDEAD, b"A\n") + s.set(0xDEAD, b"AAAAAAAAAAA") + s.set(0x2200, b"BBBB") + for i, string in enumerate(chacha_strings): + s.set(0x0301 + i, string) + + +def check_values(s): + assert s.unlock(199) + assert s.get(0xAAAA) == b"are" + assert s.get(0x0901) == b"you?" + assert s.get(0x0902) == b"Lorem" + assert s.get(0x0903) == b"ipsum" + assert s.get(0xDEAD) == b"AAAAAAAAAAA" + assert s.get(0x2200) == b"BBBB" + for i, string in enumerate(chacha_strings): + assert s.get(0x0301 + i) == string + + +def test_upgrade(): + sc0 = StorageC0() + sc0.init() + assert sc0.unlock(1) + set_values(sc0) + for _ in range(10): + assert not sc0.unlock(3) + + sc1 = StorageC() + sc1._set_flash_buffer(sc0._get_flash_buffer()) + sc1.init(common.test_uid) + assert sc1.get_pin_rem() == 6 + check_values(sc1) + + +def test_python_set_sectors(): + sp0 = StoragePy() + sp0.init(common.test_uid) + assert sp0.unlock(1) + set_values(sp0) + for _ in range(10): + assert not sp0.unlock(3) + assert sp0.get_pin_rem() == 6 + + sp1 = StoragePy() + sp1.nc._set_sectors(sp0._dump()) + sp1.init(common.test_uid) + common.memory_equals(sp0, sp1) + + assert sp1.get_pin_rem() == 6 + check_values(sp1)