feat(ci): create pyright tool and include it in CI

[no changelog]
pull/2150/head
grdddj 2 years ago committed by Jiří Musil
parent d48aa2637f
commit 3d505874aa

@ -113,8 +113,10 @@ mypy: ## deprecated; use "make typecheck"
@echo "mypy is deprecated; use 'make typecheck'" @echo "mypy is deprecated; use 'make typecheck'"
make typecheck make typecheck
typecheck: typecheck: pyright
pyright
pyright:
python ./../tools/pyright_tool.py --dir core
clippy: clippy:
cd embed/rust ; cargo clippy cd embed/rust ; cargo clippy

@ -10,5 +10,6 @@
"stubPath": "mocks/generated", "stubPath": "mocks/generated",
"typeCheckingMode": "basic", "typeCheckingMode": "basic",
"pythonVersion": "3.10", "pythonVersion": "3.10",
"enableTypeIgnoreComments": false,
"reportMissingModuleSource": false "reportMissingModuleSource": false
} }

@ -176,7 +176,7 @@ async def handle_DoPreauthorized(
if handler is None: if handler is None:
return wire.unexpected_message() return wire.unexpected_message()
return await handler(ctx, req, authorization.get()) # type: ignore return await handler(ctx, req, authorization.get()) # type: ignore [Expected 2 positional arguments]
async def handle_CancelAuthorization( async def handle_CancelAuthorization(

@ -776,7 +776,7 @@ class Bitcoin:
# The nHashType is the 8 least significant bits of the sighash type. # The nHashType is the 8 least significant bits of the sighash type.
# Some coins set the 24 most significant bits of the sighash type to # Some coins set the 24 most significant bits of the sighash type to
# the fork ID value. # the fork ID value.
return self.get_hash_type(txi) & 0xFF # type: ignore return self.get_hash_type(txi) & 0xFF # type: ignore [int-into-enum]
def write_tx_input_derived( def write_tx_input_derived(
self, self,

@ -199,55 +199,55 @@ class UiConfirmNonDefaultLocktime(UiConfirm):
) )
def confirm_output(output: TxOutput, coin: CoinInfo, amount_unit: AmountUnit) -> Awaitable[None]: # type: ignore def confirm_output(output: TxOutput, coin: CoinInfo, amount_unit: AmountUnit) -> Awaitable[None]: # type: ignore [awaitable-is-generator]
return (yield UiConfirmOutput(output, coin, amount_unit)) return (yield UiConfirmOutput(output, coin, amount_unit))
def confirm_decred_sstx_submission(output: TxOutput, coin: CoinInfo, amount_unit: AmountUnit) -> Awaitable[None]: # type: ignore def confirm_decred_sstx_submission(output: TxOutput, coin: CoinInfo, amount_unit: AmountUnit) -> Awaitable[None]: # type: ignore [awaitable-is-generator]
return (yield UiConfirmDecredSSTXSubmission(output, coin, amount_unit)) return (yield UiConfirmDecredSSTXSubmission(output, coin, amount_unit))
def confirm_payment_request(payment_req: TxAckPaymentRequest, coin: CoinInfo, amount_unit: AmountUnit) -> Awaitable[Any]: # type: ignore def confirm_payment_request(payment_req: TxAckPaymentRequest, coin: CoinInfo, amount_unit: AmountUnit) -> Awaitable[Any]: # type: ignore [awaitable-is-generator]
return (yield UiConfirmPaymentRequest(payment_req, coin, amount_unit)) return (yield UiConfirmPaymentRequest(payment_req, coin, amount_unit))
def confirm_replacement(description: str, txid: bytes) -> Awaitable[Any]: # type: ignore def confirm_replacement(description: str, txid: bytes) -> Awaitable[Any]: # type: ignore [awaitable-is-generator]
return (yield UiConfirmReplacement(description, txid)) return (yield UiConfirmReplacement(description, txid))
def confirm_modify_output(txo: TxOutput, orig_txo: TxOutput, coin: CoinInfo, amount_unit: AmountUnit) -> Awaitable[Any]: # type: ignore def confirm_modify_output(txo: TxOutput, orig_txo: TxOutput, coin: CoinInfo, amount_unit: AmountUnit) -> Awaitable[Any]: # type: ignore [awaitable-is-generator]
return (yield UiConfirmModifyOutput(txo, orig_txo, coin, amount_unit)) return (yield UiConfirmModifyOutput(txo, orig_txo, coin, amount_unit))
def confirm_modify_fee(user_fee_change: int, total_fee_new: int, coin: CoinInfo, amount_unit: AmountUnit) -> Awaitable[Any]: # type: ignore def confirm_modify_fee(user_fee_change: int, total_fee_new: int, coin: CoinInfo, amount_unit: AmountUnit) -> Awaitable[Any]: # type: ignore [awaitable-is-generator]
return (yield UiConfirmModifyFee(user_fee_change, total_fee_new, coin, amount_unit)) return (yield UiConfirmModifyFee(user_fee_change, total_fee_new, coin, amount_unit))
def confirm_total(spending: int, fee: int, coin: CoinInfo, amount_unit: AmountUnit) -> Awaitable[None]: # type: ignore def confirm_total(spending: int, fee: int, coin: CoinInfo, amount_unit: AmountUnit) -> Awaitable[None]: # type: ignore [awaitable-is-generator]
return (yield UiConfirmTotal(spending, fee, coin, amount_unit)) return (yield UiConfirmTotal(spending, fee, coin, amount_unit))
def confirm_joint_total(spending: int, total: int, coin: CoinInfo, amount_unit: AmountUnit) -> Awaitable[Any]: # type: ignore def confirm_joint_total(spending: int, total: int, coin: CoinInfo, amount_unit: AmountUnit) -> Awaitable[Any]: # type: ignore [awaitable-is-generator]
return (yield UiConfirmJointTotal(spending, total, coin, amount_unit)) return (yield UiConfirmJointTotal(spending, total, coin, amount_unit))
def confirm_feeoverthreshold(fee: int, coin: CoinInfo, amount_unit: AmountUnit) -> Awaitable[Any]: # type: ignore def confirm_feeoverthreshold(fee: int, coin: CoinInfo, amount_unit: AmountUnit) -> Awaitable[Any]: # type: ignore [awaitable-is-generator]
return (yield UiConfirmFeeOverThreshold(fee, coin, amount_unit)) return (yield UiConfirmFeeOverThreshold(fee, coin, amount_unit))
def confirm_change_count_over_threshold(change_count: int) -> Awaitable[Any]: # type: ignore def confirm_change_count_over_threshold(change_count: int) -> Awaitable[Any]: # type: ignore [awaitable-is-generator]
return (yield UiConfirmChangeCountOverThreshold(change_count)) return (yield UiConfirmChangeCountOverThreshold(change_count))
def confirm_foreign_address(address_n: list) -> Awaitable[Any]: # type: ignore def confirm_foreign_address(address_n: list) -> Awaitable[Any]: # type: ignore [awaitable-is-generator]
return (yield UiConfirmForeignAddress(address_n)) return (yield UiConfirmForeignAddress(address_n))
def confirm_nondefault_locktime(lock_time: int, lock_time_disabled: bool) -> Awaitable[Any]: # type: ignore def confirm_nondefault_locktime(lock_time: int, lock_time_disabled: bool) -> Awaitable[Any]: # type: ignore [awaitable-is-generator]
return (yield UiConfirmNonDefaultLocktime(lock_time, lock_time_disabled)) return (yield UiConfirmNonDefaultLocktime(lock_time, lock_time_disabled))
def request_tx_meta(tx_req: TxRequest, coin: CoinInfo, tx_hash: bytes | None = None) -> Awaitable[PrevTx]: # type: ignore def request_tx_meta(tx_req: TxRequest, coin: CoinInfo, tx_hash: bytes | None = None) -> Awaitable[PrevTx]: # type: ignore [awaitable-is-generator]
assert tx_req.details is not None assert tx_req.details is not None
tx_req.request_type = RequestType.TXMETA tx_req.request_type = RequestType.TXMETA
tx_req.details.tx_hash = tx_hash tx_req.details.tx_hash = tx_hash
@ -258,7 +258,7 @@ def request_tx_meta(tx_req: TxRequest, coin: CoinInfo, tx_hash: bytes | None = N
def request_tx_extra_data( def request_tx_extra_data(
tx_req: TxRequest, offset: int, size: int, tx_hash: bytes | None = None tx_req: TxRequest, offset: int, size: int, tx_hash: bytes | None = None
) -> Awaitable[bytearray]: # type: ignore ) -> Awaitable[bytearray]: # type: ignore [awaitable-is-generator]
assert tx_req.details is not None assert tx_req.details is not None
tx_req.request_type = RequestType.TXEXTRADATA tx_req.request_type = RequestType.TXEXTRADATA
tx_req.details.extra_data_offset = offset tx_req.details.extra_data_offset = offset
@ -269,7 +269,7 @@ def request_tx_extra_data(
return ack.tx.extra_data_chunk return ack.tx.extra_data_chunk
def request_tx_input(tx_req: TxRequest, i: int, coin: CoinInfo, tx_hash: bytes | None = None) -> Awaitable[TxInput]: # type: ignore def request_tx_input(tx_req: TxRequest, i: int, coin: CoinInfo, tx_hash: bytes | None = None) -> Awaitable[TxInput]: # type: ignore [awaitable-is-generator]
assert tx_req.details is not None assert tx_req.details is not None
if tx_hash: if tx_hash:
tx_req.request_type = RequestType.TXORIGINPUT tx_req.request_type = RequestType.TXORIGINPUT
@ -282,7 +282,7 @@ def request_tx_input(tx_req: TxRequest, i: int, coin: CoinInfo, tx_hash: bytes |
return sanitize_tx_input(ack.tx.input, coin) return sanitize_tx_input(ack.tx.input, coin)
def request_tx_prev_input(tx_req: TxRequest, i: int, coin: CoinInfo, tx_hash: bytes | None = None) -> Awaitable[PrevInput]: # type: ignore def request_tx_prev_input(tx_req: TxRequest, i: int, coin: CoinInfo, tx_hash: bytes | None = None) -> Awaitable[PrevInput]: # type: ignore [awaitable-is-generator]
assert tx_req.details is not None assert tx_req.details is not None
tx_req.request_type = RequestType.TXINPUT tx_req.request_type = RequestType.TXINPUT
tx_req.details.request_index = i tx_req.details.request_index = i
@ -292,7 +292,7 @@ def request_tx_prev_input(tx_req: TxRequest, i: int, coin: CoinInfo, tx_hash: by
return sanitize_tx_prev_input(ack.tx.input, coin) return sanitize_tx_prev_input(ack.tx.input, coin)
def request_tx_output(tx_req: TxRequest, i: int, coin: CoinInfo, tx_hash: bytes | None = None) -> Awaitable[TxOutput]: # type: ignore def request_tx_output(tx_req: TxRequest, i: int, coin: CoinInfo, tx_hash: bytes | None = None) -> Awaitable[TxOutput]: # type: ignore [awaitable-is-generator]
assert tx_req.details is not None assert tx_req.details is not None
if tx_hash: if tx_hash:
tx_req.request_type = RequestType.TXORIGOUTPUT tx_req.request_type = RequestType.TXORIGOUTPUT
@ -305,7 +305,7 @@ def request_tx_output(tx_req: TxRequest, i: int, coin: CoinInfo, tx_hash: bytes
return sanitize_tx_output(ack.tx.output, coin) return sanitize_tx_output(ack.tx.output, coin)
def request_tx_prev_output(tx_req: TxRequest, i: int, coin: CoinInfo, tx_hash: bytes | None = None) -> Awaitable[PrevOutput]: # type: ignore def request_tx_prev_output(tx_req: TxRequest, i: int, coin: CoinInfo, tx_hash: bytes | None = None) -> Awaitable[PrevOutput]: # type: ignore [awaitable-is-generator]
assert tx_req.details is not None assert tx_req.details is not None
tx_req.request_type = RequestType.TXOUTPUT tx_req.request_type = RequestType.TXOUTPUT
tx_req.details.request_index = i tx_req.details.request_index = i
@ -316,7 +316,7 @@ def request_tx_prev_output(tx_req: TxRequest, i: int, coin: CoinInfo, tx_hash: b
return ack.tx.output return ack.tx.output
def request_payment_req(tx_req: TxRequest, i: int) -> Awaitable[TxAckPaymentRequest]: # type: ignore def request_payment_req(tx_req: TxRequest, i: int) -> Awaitable[TxAckPaymentRequest]: # type: ignore [awaitable-is-generator]
assert tx_req.details is not None assert tx_req.details is not None
tx_req.request_type = RequestType.TXPAYMENTREQ tx_req.request_type = RequestType.TXPAYMENTREQ
tx_req.details.request_index = i tx_req.details.request_index = i
@ -325,7 +325,7 @@ def request_payment_req(tx_req: TxRequest, i: int) -> Awaitable[TxAckPaymentRequ
return sanitize_payment_req(ack) return sanitize_payment_req(ack)
def request_tx_finish(tx_req: TxRequest) -> Awaitable[None]: # type: ignore def request_tx_finish(tx_req: TxRequest) -> Awaitable[None]: # type: ignore [awaitable-is-generator]
tx_req.request_type = RequestType.TXFINISHED tx_req.request_type = RequestType.TXFINISHED
yield None, tx_req yield None, tx_req
_clear_tx_request(tx_req) _clear_tx_request(tx_req)
@ -344,7 +344,7 @@ def _clear_tx_request(tx_req: TxRequest) -> None:
tx_req.serialized.signature_index = None tx_req.serialized.signature_index = None
# typechecker thinks serialized_tx is `bytes`, which is immutable # typechecker thinks serialized_tx is `bytes`, which is immutable
# we know that it is `bytearray` in reality # we know that it is `bytearray` in reality
tx_req.serialized.serialized_tx[:] = bytes() # type: ignore tx_req.serialized.serialized_tx[:] = bytes() # type: ignore ["__setitem__" method not defined on type "bytes"]
# Data sanitizers # Data sanitizers

@ -282,7 +282,7 @@ def get_address_bytes_unsafe(address: str) -> bytes:
def _get_address_type(address: bytes) -> CardanoAddressType: def _get_address_type(address: bytes) -> CardanoAddressType:
return address[0] >> 4 # type: ignore return address[0] >> 4 # type: ignore [int-into-enum]
def _validate_shelley_address( def _validate_shelley_address(

@ -265,7 +265,7 @@ class PathSchema:
# Which in practice it is, the only non-Collection is Interval. # Which in practice it is, the only non-Collection is Interval.
# But we're not going to introduce an additional type requirement # But we're not going to introduce an additional type requirement
# for the sake of __repr__ that doesn't exist in production anyway # for the sake of __repr__ that doesn't exist in production anyway
collection: Collection[int] = component # type: ignore collection: Collection[int] = component # type: ignore [Expression of type "Container[int]" cannot be assigned to declared type "Collection[int]"]
component_str = ",".join(str(unharden(i)) for i in collection) component_str = ",".join(str(unharden(i)) for i in collection)
if len(collection) > 1: if len(collection) > 1:
component_str = "[" + component_str + "]" component_str = "[" + component_str + "]"

@ -106,8 +106,7 @@ async def verify_user_pin(
raise RuntimeError raise RuntimeError
while retry: while retry:
# request_pin_on_device possibly unbound pin = await request_pin_on_device( # type: ignore ["request_pin_on_device" is possibly unbound]
pin = await request_pin_on_device( # type: ignore
ctx, "Wrong PIN, enter again", config.get_pin_rem(), allow_cancel ctx, "Wrong PIN, enter again", config.get_pin_rem(), allow_cancel
) )
if config.unlock(pin, salt): if config.unlock(pin, salt):

@ -11,7 +11,7 @@ def read_setting() -> SafetyCheckLevel:
""" """
temporary_safety_check_level = storage.cache.get(APP_COMMON_SAFETY_CHECKS_TEMPORARY) temporary_safety_check_level = storage.cache.get(APP_COMMON_SAFETY_CHECKS_TEMPORARY)
if temporary_safety_check_level: if temporary_safety_check_level:
return int.from_bytes(temporary_safety_check_level, "big") # type: ignore return int.from_bytes(temporary_safety_check_level, "big") # type: ignore [int-into-enum]
else: else:
stored = storage.device.safety_check_level() stored = storage.device.safety_check_level()
if stored == SAFETY_CHECK_LEVEL_STRICT: if stored == SAFETY_CHECK_LEVEL_STRICT:

@ -214,8 +214,8 @@ if __debug__:
return Success() return Success()
def boot() -> None: def boot() -> None:
workflow_handlers.register(MessageType.DebugLinkDecision, dispatch_DebugLinkDecision) # type: ignore workflow_handlers.register(MessageType.DebugLinkDecision, dispatch_DebugLinkDecision) # type: ignore [Argument of type "(ctx: Context, msg: DebugLinkDecision) -> Coroutine[Any, Any, None]" cannot be assigned to parameter "handler" of type "Handler[Msg@register]" in function "register"]
workflow_handlers.register(MessageType.DebugLinkGetState, dispatch_DebugLinkGetState) # type: ignore workflow_handlers.register(MessageType.DebugLinkGetState, dispatch_DebugLinkGetState) # type: ignore [Argument of type "(ctx: Context, msg: DebugLinkGetState) -> Coroutine[Any, Any, DebugLinkState | None]" cannot be assigned to parameter "handler" of type "Handler[Msg@register]" in function "register"]
workflow_handlers.register( workflow_handlers.register(
MessageType.DebugLinkReseedRandom, dispatch_DebugLinkReseedRandom MessageType.DebugLinkReseedRandom, dispatch_DebugLinkReseedRandom
) )

@ -86,7 +86,7 @@ async def _operations(ctx: Context, w: Writer, num_operations: int) -> None:
writers.write_uint32(w, num_operations) writers.write_uint32(w, num_operations)
for _ in range(num_operations): for _ in range(num_operations):
op = await ctx.call_any(StellarTxOpRequest(), *consts.op_wire_types) op = await ctx.call_any(StellarTxOpRequest(), *consts.op_wire_types)
await process_operation(ctx, w, op) # type: ignore await process_operation(ctx, w, op) # type: ignore [Argument of type "MessageType" cannot be assigned to parameter "op" of type "StellarMessageType" in function "process_operation"]
async def _memo(ctx: Context, w: Writer, msg: StellarSignTx) -> None: async def _memo(ctx: Context, w: Writer, msg: StellarSignTx) -> None:

@ -134,7 +134,7 @@ def get_backup_type() -> BackupType:
): ):
# Invalid backup type # Invalid backup type
raise RuntimeError raise RuntimeError
return backup_type # type: ignore return backup_type # type: ignore [int-into-enum]
def is_passphrase_enabled() -> bool: def is_passphrase_enabled() -> bool:
@ -310,7 +310,7 @@ def safety_check_level() -> StorageSafetyCheckLevel:
if level not in (SAFETY_CHECK_LEVEL_STRICT, SAFETY_CHECK_LEVEL_PROMPT): if level not in (SAFETY_CHECK_LEVEL_STRICT, SAFETY_CHECK_LEVEL_PROMPT):
return _DEFAULT_SAFETY_CHECK_LEVEL return _DEFAULT_SAFETY_CHECK_LEVEL
else: else:
return level # type: ignore return level # type: ignore [int-into-enum]
# do not use this function directly, see apps.common.safety_checks instead # do not use this function directly, see apps.common.safety_checks instead

@ -65,11 +65,11 @@ def exception(name: str, exc: BaseException) -> None:
name, name,
DEBUG, DEBUG,
"ui.Result: %s", "ui.Result: %s",
exc.value, # type: ignore[attr-defined] # noqa: F821 exc.value, # type: ignore[Cannot access member "value" for type "BaseException"]
) )
elif exc.__class__.__name__ == "Cancelled": elif exc.__class__.__name__ == "Cancelled":
_log(name, DEBUG, "ui.Cancelled") _log(name, DEBUG, "ui.Cancelled")
else: else:
_log(name, ERROR, "exception:") _log(name, ERROR, "exception:")
# since mypy 0.770 we cannot override sys, so print_exception is unknown # since mypy 0.770 we cannot override sys, so print_exception is unknown
sys.print_exception(exc) # type: ignore sys.print_exception(exc) # type: ignore ["print_exception" is not a known member of module]

@ -147,7 +147,7 @@ def run() -> None:
# timeout occurred, run the first scheduled task # timeout occurred, run the first scheduled task
if _queue: if _queue:
_queue.pop(task_entry) _queue.pop(task_entry)
_step(task_entry[1], task_entry[2]) # type: ignore _step(task_entry[1], task_entry[2]) # type: ignore [Argument of type "int" cannot be assigned to parameter "task" of type "Task" in function "_step"]
# error: Argument 1 to "_step" has incompatible type "int"; expected "Coroutine[Any, Any, Any]" # error: Argument 1 to "_step" has incompatible type "int"; expected "Coroutine[Any, Any, Any]"
# rationale: We use untyped lists here, because that is what the C API supports. # rationale: We use untyped lists here, because that is what the C API supports.
@ -323,7 +323,7 @@ class race(Syscall):
# child is a layout -- type-wise, it is an Awaitable, but # child is a layout -- type-wise, it is an Awaitable, but
# implementation-wise it is an Iterable and we know that its __iter__ # implementation-wise it is an Iterable and we know that its __iter__
# will return a Generator. # will return a Generator.
child_task = child.__iter__() # type: ignore child_task = child.__iter__() # type: ignore [Cannot access member "__iter__" for type "Awaitable[Unknown]";;Cannot access member "__iter__" for type "Coroutine[Unknown, Unknown, Unknown]"]
schedule(child_task, None, None, finalizer) schedule(child_task, None, None, finalizer)
scheduled.append(child_task) scheduled.append(child_task)
@ -347,7 +347,7 @@ class race(Syscall):
self.exit(task) self.exit(task)
schedule(self.callback, result) schedule(self.callback, result)
def __iter__(self) -> Task: # type: ignore def __iter__(self) -> Task: # type: ignore [awaitable-is-generator]
try: try:
return (yield self) return (yield self)
except: # noqa: E722 except: # noqa: E722
@ -411,7 +411,7 @@ class chan:
self.putters: list[tuple[Task | None, Any]] = [] self.putters: list[tuple[Task | None, Any]] = []
self.takers: list[Task] = [] self.takers: list[Task] = []
def put(self, value: Any) -> Awaitable[None]: # type: ignore def put(self, value: Any) -> Awaitable[None]: # type: ignore [awaitable-is-generator]
put = chan.Put(self, value) put = chan.Put(self, value)
try: try:
return (yield put) return (yield put)
@ -421,7 +421,7 @@ class chan:
self.putters.remove(entry) self.putters.remove(entry)
raise raise
def take(self) -> Awaitable[Any]: # type: ignore def take(self) -> Awaitable[Any]: # type: ignore [awaitable-is-generator]
take = chan.Take(self) take = chan.Take(self)
try: try:
return (yield take) return (yield take)
@ -521,7 +521,7 @@ class spawn(Syscall):
if self.finalizer_callback is not None: if self.finalizer_callback is not None:
self.finalizer_callback(self) self.finalizer_callback(self)
def __iter__(self) -> Task: # type: ignore def __iter__(self) -> Task: # type: ignore [awaitable-is-generator]
if self.finished: if self.finished:
# exit immediately if we already have a return value # exit immediately if we already have a return value
if isinstance(self.return_value, BaseException): if isinstance(self.return_value, BaseException):

@ -2,8 +2,8 @@ try:
from trezorio import fatfs, sdcard from trezorio import fatfs, sdcard
HAVE_SDCARD = True HAVE_SDCARD = True
is_present = sdcard.is_present # type: ignore is_present = sdcard.is_present # type: ignore [obscured-by-same-name]
capacity = sdcard.capacity # type: ignore capacity = sdcard.capacity # type: ignore [obscured-by-same-name]
except Exception: except Exception:
HAVE_SDCARD = False HAVE_SDCARD = False

@ -45,7 +45,7 @@ if __debug__:
display.refresh() display.refresh()
else: else:
refresh = display.refresh # type: ignore refresh = display.refresh # type: ignore [obscured-by-same-name]
# in both debug and production, emulator needs to draw the screen explicitly # in both debug and production, emulator needs to draw the screen explicitly
@ -115,7 +115,7 @@ async def click() -> Pos:
ev, *pos = await touch ev, *pos = await touch
if ev == io.TOUCH_END: if ev == io.TOUCH_END:
break break
return pos # type: ignore return pos # type: ignore [Expression of type "list[Unknown]" cannot be assigned to return type "Pos"]
def backlight_fade(val: int, delay: int = 14000, step: int = 15) -> None: def backlight_fade(val: int, delay: int = 14000, step: int = 15) -> None:
@ -360,7 +360,7 @@ class Layout(Component):
if TYPE_CHECKING: if TYPE_CHECKING:
def __await__(self) -> Generator: def __await__(self) -> Generator:
return self.__iter__() # type: ignore return self.__iter__() # type: ignore [Expression of type "Coroutine[Any, Any, Any]" cannot be assigned to return type "Generator[Unknown, Unknown, Unknown]"]
else: else:
__await__ = __iter__ __await__ = __iter__
@ -427,7 +427,7 @@ class Layout(Component):
refresh() refresh()
backlight_fade(self.BACKLIGHT_LEVEL) backlight_fade(self.BACKLIGHT_LEVEL)
def handle_rendering(self) -> loop.Task: # type: ignore def handle_rendering(self) -> loop.Task: # type: ignore [awaitable-is-generator]
"""Task that is rendering the layout in a busy loop.""" """Task that is rendering the layout in a busy loop."""
self._before_render() self._before_render()
sleep = self.RENDER_SLEEP sleep = self.RENDER_SLEEP
@ -440,6 +440,6 @@ class Layout(Component):
self.dispatch(RENDER, 0, 0) self.dispatch(RENDER, 0, 0)
def wait_until_layout_is_running() -> Awaitable[None]: # type: ignore def wait_until_layout_is_running() -> Awaitable[None]: # type: ignore [awaitable-is-generator]
while not layout_chan.takers: while not layout_chan.takers:
yield yield

@ -105,9 +105,9 @@ class Swipe(ui.Component):
raise ui.Result(swipe) raise ui.Result(swipe)
def __await__(self) -> Generator: def __await__(self) -> Generator:
return self.__iter__() # type: ignore return self.__iter__() # type: ignore [Expression of type "Task" cannot be assigned to return type "Generator[Unknown, Unknown, Unknown]"]
def __iter__(self) -> loop.Task: # type: ignore def __iter__(self) -> loop.Task: # type: ignore [awaitable-is-generator]
try: try:
touch = loop.wait(io.TOUCH) touch = loop.wait(io.TOUCH)
while True: while True:

@ -26,7 +26,7 @@ class _RustLayout(ui.Layout):
def create_tasks(self) -> tuple[loop.Task, ...]: def create_tasks(self) -> tuple[loop.Task, ...]:
return self.handle_input_and_rendering(), self.handle_timers() return self.handle_input_and_rendering(), self.handle_timers()
def handle_input_and_rendering(self) -> loop.Task: # type: ignore def handle_input_and_rendering(self) -> loop.Task: # type: ignore [awaitable-is-generator]
button = loop.wait(io.BUTTON) button = loop.wait(io.BUTTON)
ui.display.clear() ui.display.clear()
self.layout.paint() self.layout.paint()
@ -41,7 +41,7 @@ class _RustLayout(ui.Layout):
if msg is not None: if msg is not None:
raise ui.Result(msg) raise ui.Result(msg)
def handle_timers(self) -> loop.Task: # type: ignore def handle_timers(self) -> loop.Task: # type: ignore [awaitable-is-generator]
while True: while True:
# Using `yield` instead of `await` to avoid allocations. # Using `yield` instead of `await` to avoid allocations.
token = yield self.timer token = yield self.timer

@ -27,7 +27,7 @@ class _RustLayout(ui.Layout):
def create_tasks(self) -> tuple[loop.Task, ...]: def create_tasks(self) -> tuple[loop.Task, ...]:
return self.handle_input_and_rendering(), self.handle_timers() return self.handle_input_and_rendering(), self.handle_timers()
def handle_input_and_rendering(self) -> loop.Task: # type: ignore def handle_input_and_rendering(self) -> loop.Task: # type: ignore [awaitable-is-generator]
touch = loop.wait(io.TOUCH) touch = loop.wait(io.TOUCH)
ui.display.clear() ui.display.clear()
self.layout.paint() self.layout.paint()
@ -44,7 +44,7 @@ class _RustLayout(ui.Layout):
if msg is not None: if msg is not None:
raise ui.Result(msg) raise ui.Result(msg)
def handle_timers(self) -> loop.Task: # type: ignore def handle_timers(self) -> loop.Task: # type: ignore [awaitable-is-generator]
while True: while True:
# Using `yield` instead of `await` to avoid allocations. # Using `yield` instead of `await` to avoid allocations.
token = yield self.timer token = yield self.timer

@ -16,6 +16,6 @@ class Popup(ui.Layout):
def create_tasks(self) -> tuple[loop.Task, ...]: def create_tasks(self) -> tuple[loop.Task, ...]:
return self.handle_input(), self.handle_rendering(), self.handle_timeout() return self.handle_input(), self.handle_rendering(), self.handle_timeout()
def handle_timeout(self) -> loop.Task: # type: ignore def handle_timeout(self) -> loop.Task: # type: ignore [awaitable-is-generator]
yield loop.sleep(self.time_ms) yield loop.sleep(self.time_ms)
raise ui.Result(None) raise ui.Result(None)

@ -153,7 +153,7 @@ if TYPE_CHECKING:
class HashContextInitable(HashContext, Protocol): class HashContextInitable(HashContext, Protocol):
def __init__( # pylint: disable=super-init-not-called def __init__( # pylint: disable=super-init-not-called
self, __data: bytes = None self, __data: bytes | None = None
) -> None: ) -> None:
... ...

@ -24,7 +24,7 @@ INVALID_TYPE = const(-1)
# use it at the same time, thus we check this at runtime in debug builds. # use it at the same time, thus we check this at runtime in debug builds.
if __debug__: if __debug__:
class BufferLock: # type: ignore class BufferLock: # type: ignore [Class declaration "BufferLock" is obscured by a declaration of the same name]
def __init__(self) -> None: def __init__(self) -> None:
self.in_use = False self.in_use = False

@ -49,13 +49,13 @@ style:
isort --apply --recursive $(STYLE_TARGETS) --skip-glob "$(EXCLUDE_TARGETS)/*" isort --apply --recursive $(STYLE_TARGETS) --skip-glob "$(EXCLUDE_TARGETS)/*"
autoflake -i --remove-all-unused-imports -r $(STYLE_TARGETS) --exclude "$(EXCLUDE_TARGETS)" autoflake -i --remove-all-unused-imports -r $(STYLE_TARGETS) --exclude "$(EXCLUDE_TARGETS)"
flake8 flake8
pyright make pyright
style_check: style_check:
black --check $(STYLE_TARGETS) black --check $(STYLE_TARGETS)
isort --check-only --recursive $(STYLE_TARGETS) --skip-glob "$(EXCLUDE_TARGETS)/*" isort --check-only --recursive $(STYLE_TARGETS) --skip-glob "$(EXCLUDE_TARGETS)/*"
flake8 flake8
pyright make pyright
style_quick_check: style_quick_check:
black --check $(STYLE_TARGETS) black --check $(STYLE_TARGETS)
@ -67,4 +67,4 @@ test:
pytest tests pytest tests
pyright: pyright:
pyright -p pyrightconfig.json python ./../tools/pyright_tool.py --dir python

@ -6,6 +6,7 @@
], ],
"pythonVersion": "3.6", "pythonVersion": "3.6",
"typeCheckingMode": "basic", "typeCheckingMode": "basic",
"enableTypeIgnoreComments": false,
"reportMissingImports": false, "reportMissingImports": false,
"reportUntypedFunctionDecorator": true, "reportUntypedFunctionDecorator": true,
"reportUntypedClassDecorator": true, "reportUntypedClassDecorator": true,

@ -81,7 +81,7 @@ class TrezorConnection:
# It is alright to return just the class object instead of instance, # It is alright to return just the class object instead of instance,
# as the ScriptUI class object itself is the implementation of TrezorClientUI # as the ScriptUI class object itself is the implementation of TrezorClientUI
# (ScriptUI is just a set of staticmethods) # (ScriptUI is just a set of staticmethods)
return ScriptUI # type: ignore [Expression of type "Type[ScriptUI]" cannot be assigned to return type "TrezorClientUI"] return ScriptUI
else: else:
return ClickUI(passphrase_on_host=self.passphrase_on_host) return ClickUI(passphrase_on_host=self.passphrase_on_host)
@ -149,4 +149,4 @@ def with_client(func: "Callable[Concatenate[TrezorClient, P], R]") -> "Callable[
# the return type of @click.pass_obj is improperly specified and pyright doesn't # the return type of @click.pass_obj is improperly specified and pyright doesn't
# understand that it converts f(obj, *args, **kwargs) to f(*args, **kwargs) # understand that it converts f(obj, *args, **kwargs) to f(*args, **kwargs)
return trezorctl_command_with_client # type: ignore return trezorctl_command_with_client # type: ignore [cannot be assigned to return type]

@ -78,7 +78,7 @@ def xpub_deserialize(xpubstr: str) -> Tuple[str, messages.HDNodeType]:
fingerprint=data.fingerprint, fingerprint=data.fingerprint,
child_num=data.child_num, child_num=data.child_num,
chain_code=data.chain_code, chain_code=data.chain_code,
public_key=public_key, # type: ignore ["Unknown | None" cannot be assigned to parameter "public_key"] public_key=public_key, # type: ignore [Argument of type "Unknown | None" cannot be assigned to parameter "public_key" of type "bytes"]
private_key=private_key, private_key=private_key,
) )

@ -139,7 +139,7 @@ def _erc20_contract(token_address: str, to_address: str, amount: int) -> str:
"outputs": [{"name": "", "type": "bool"}], "outputs": [{"name": "", "type": "bool"}],
} }
] ]
contract = _get_web3().eth.contract(address=token_address, abi=min_abi) # type: ignore ["str" cannot be assigned to type "Address | ChecksumAddress | ENS"] contract = _get_web3().eth.contract(address=token_address, abi=min_abi)
return contract.encodeABI("transfer", [to_address, amount]) return contract.encodeABI("transfer", [to_address, amount])

@ -125,7 +125,7 @@ class TrezorctlGroup(click.Group):
command, subcommand = cmd_name.split("-", maxsplit=1) command, subcommand = cmd_name.split("-", maxsplit=1)
# get_command can return None and the following line will fail. # get_command can return None and the following line will fail.
# We don't care, we ignore the exception anyway. # We don't care, we ignore the exception anyway.
return super().get_command(ctx, command).get_command(ctx, subcommand) # type: ignore ["get_command" is not a known member of "None"] return super().get_command(ctx, command).get_command(ctx, subcommand) # type: ignore ["get_command" is not a known member of "None";;Cannot access member "get_command" for type "Command"]
except Exception: except Exception:
pass pass

@ -280,7 +280,7 @@ class DebugLink:
class NullDebugLink(DebugLink): class NullDebugLink(DebugLink):
def __init__(self) -> None: def __init__(self) -> None:
# Ignoring type error as self.transport will not be touched while using NullDebugLink # Ignoring type error as self.transport will not be touched while using NullDebugLink
super().__init__(None) # type: ignore ["None" cannot be assigned to parameter of type "Transport"] super().__init__(None) # type: ignore [Argument of type "None" cannot be assigned to parameter "transport"]
def open(self) -> None: def open(self) -> None:
pass pass

@ -181,9 +181,9 @@ class Field:
class _MessageTypeMeta(type): class _MessageTypeMeta(type):
def __init__(cls, name: str, bases: tuple, d: dict) -> None: def __init__(cls, name: str, bases: tuple, d: dict) -> None:
super().__init__(name, bases, d) # type: ignore [Expected 1 positional] super().__init__(name, bases, d) # type: ignore [Expected 1 positional argument]
if name != "MessageType": if name != "MessageType":
cls.__init__ = MessageType.__init__ # type: ignore [Cannot assign member "__init__" for type "_MessageTypeMeta"] cls.__init__ = MessageType.__init__ # type: ignore ["__init__" is obscured by a declaration of the same name;;Cannot assign member "__init__" for type "_MessageTypeMeta"]
class MessageType(metaclass=_MessageTypeMeta): class MessageType(metaclass=_MessageTypeMeta):

@ -158,7 +158,7 @@ if __name__ == "__main__":
if QT_VERSION_STR >= "5": if QT_VERSION_STR >= "5":
ok.clicked.connect(clicked) ok.clicked.connect(clicked)
elif QT_VERSION_STR >= "4": elif QT_VERSION_STR >= "4":
QObject.connect(ok, SIGNAL("clicked()"), clicked) # type: ignore [SIGNAL is not unbound] QObject.connect(ok, SIGNAL("clicked()"), clicked) # type: ignore ["QObject" is possibly unbound;;"SIGNAL" is possibly unbound]
else: else:
raise RuntimeError("Unsupported Qt version") raise RuntimeError("Unsupported Qt version")

@ -0,0 +1,653 @@
#!/usr/bin/env python3
"""
Wrapper around pyright type checking to allow for easy ignore of specific error messages.
Thanks to it the `# type: ignore` does not affect the whole line,
so other problems at the same line cannot be masked by it.
Features:
- ignores specific pyright errors based on substring or regex
- reports empty `# type: ignore`s (without ignore reason in `[]`)
- reports unused `# type: ignore`s (for example after pyright is updated)
- allows for ignoring some errors in the whole file - see `FILE_SPECIFIC_IGNORES` variable
- allows for error aliases - see `ALIASES` variable
Usage:
- there are multiple options how to ignore/silence a pyright error:
1 - "# type: ignore [<error_substring>]"
- put it as a comment to the line we want to ignore
- "# type: ignore [<error1>;;<error2>;;...]" if there are more than one errors on that line
- also regex patterns are valid substrings
2 - "# pyright: off" / "# pyright: on"
- all errors in block of code between these marks will be ignored
3 - FILE_SPECIFIC_IGNORES
- ignore specific rules (defined by pyright) or error substrings in the whole file
4 - ALIASES
- create an alias for a common error and use is with option 1 - "# type: ignore [<error_alias>]"
Running the script:
- see all script argument by calling `python pyright_tool.py --help`
- only directories with existing `pyrightconfig.json` can be tested - see `--dir` flag
Simplified program flow (as it happens in PyrightTool.run()):
- extract and validate pyright config data from pyrightconfig.json
- collect all the pyright errors by actually running the pyright itself
- extract type-ignore information for all the files pyright was analyzing
- loop through all the pyright errors and try to match them against all the type-ignore rules
- if there are some unmatched errors, report them and exit with nonzero value
- also report unused ignores and other inconsistencies
"""
from __future__ import annotations
import argparse
import json
import os
import re
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Final, TypedDict
class RangeDetail(TypedDict):
line: int
character: int
class Range(TypedDict):
start: RangeDetail
end: RangeDetail
class Error(TypedDict):
file: str
severity: str
message: str
range: Range
rule: str
Errors = list[Error]
class Summary(TypedDict):
filesAnalyzed: int
errorCount: int
warningCount: int
informationCount: int
timeInSec: float
class PyrightResults(TypedDict):
version: str
time: str
generalDiagnostics: Errors
summary: Summary
@dataclass
class IgnoreStatement:
substring: str
already_used: bool = False
@dataclass
class LineIgnore:
line_no: int
ignore_statements: list[IgnoreStatement]
LineIgnores = list[LineIgnore]
FileIgnores = dict[str, LineIgnores]
@dataclass
class FileSpecificIgnore:
rule: str = ""
substring: str = ""
already_used: bool = False
def __post_init__(self) -> None:
if self.rule and self.substring:
raise ValueError("Only one of rule|substring should be set")
FileSpecificIgnores = dict[str, list[FileSpecificIgnore]]
@dataclass
class PyrightOffIgnore:
start_line: int
end_line: int
already_used: bool = False
PyrightOffIgnores = list[PyrightOffIgnore]
FilePyrightOffIgnores = dict[str, PyrightOffIgnores]
parser = argparse.ArgumentParser()
parser.add_argument(
"--dev", action="store_true", help="Creating the error file and not deleting it"
)
parser.add_argument(
"--test",
action="store_true",
help="Reusing existing error file and not deleting it",
)
parser.add_argument("--log", action="store_true", help="Log details")
parser.add_argument(
"--dir",
help="Directory which to test, relative to the repository root. When empty, taking the directory of this file.",
default="",
)
args = parser.parse_args()
if args.dev:
should_generate_error_file = True
should_delete_error_file = False
print("Running in dev mode, creating the file and not deleting it")
elif args.test:
should_generate_error_file = False
should_delete_error_file = False
print("Running in test mode, will reuse existing error file")
else:
should_generate_error_file = True
should_delete_error_file = True
SHOULD_GENERATE_ERROR_FILE = should_generate_error_file
SHOULD_DELETE_ERROR_FILE = should_delete_error_file
SHOULD_LOG = args.log
if args.dir:
# Need to change the os directory to find all the files correctly
# Repository root + the wanted directory.
HERE = Path(__file__).resolve().parent.parent / args.dir
if not HERE.is_dir():
raise RuntimeError(f"Could not find directory {args.dir} under {HERE}")
os.chdir(HERE)
else:
# Directory of this file
HERE = Path(__file__).resolve().parent
# TODO: move into a JSON or other config file
# Files need to have a relative location to the directory being tested
# Example (when checking `python` directory):
# "tools/helloworld.py": [
# FileSpecificIgnore(rule="reportMissingParameterType"),
# FileSpecificIgnore(substring="cannot be assigned to parameter"),
# ],
FILE_SPECIFIC_IGNORES: FileSpecificIgnores = {}
# Allowing for more readable ignore of common problems, with an easy-to-understand alias
ALIASES: dict[str, str] = {
"awaitable-is-generator": 'Return type of generator function must be "Generator" or "Iterable"',
"obscured-by-same-name": "is obscured by a declaration of the same name",
"int-into-enum": 'Expression of type "int.*" cannot be assigned to return type ".*"',
}
class PyrightTool:
ON_PATTERN: Final = "# pyright: on"
OFF_PATTERN: Final = "# pyright: off"
IGNORE_PATTERN: Final = "# type: ignore"
IGNORE_DELIMITER: Final = ";;"
original_pyright_results: PyrightResults
all_files_to_check: set[str]
all_pyright_ignores: FileIgnores
pyright_off_ignores: FilePyrightOffIgnores
real_errors: Errors
unused_ignores: list[str]
inconsistencies: list[str] = []
def __init__(
self,
pyright_config_file: str | Path,
*,
file_specific_ignores: FileSpecificIgnores | None = None,
aliases: dict[str, str] | None = None,
error_file: str | Path = "temp_error_file.json",
should_generate_error_file: bool = True,
should_delete_error_file: bool = True,
should_log: bool = False,
) -> None:
self.pyright_config_file = pyright_config_file
self.file_specific_ignores = file_specific_ignores or {}
self.aliases = aliases or {}
self.error_file = error_file
self.should_generate_error_file = should_generate_error_file
self.should_delete_error_file = should_delete_error_file
self.should_log = should_log
self.count_of_ignored_errors = 0
self.check_input_correctness()
def check_input_correctness(self) -> None:
"""Verify the input data structures are correct."""
# Checking for correct file_specific_ignores structure
for file, ignores in self.file_specific_ignores.items():
for ignore in ignores:
if not isinstance(ignore, FileSpecificIgnore):
raise RuntimeError(
"All items of file_specific_ignores must be FileSpecificIgnore classes. "
f"Got {ignore} - type {type(ignore)}"
)
# Also putting substrings at the beginning of ignore-lists, so they are matched before rules
# (Not to leave them potentially unused when error would be matched by a rule instead)
self.file_specific_ignores[file].sort(
key=lambda x: x.substring, reverse=True
)
# Checking for correct aliases (dict[str, str] type)
for alias, full_substring in self.aliases.items():
if not isinstance(alias, str) or not isinstance(full_substring, str):
raise RuntimeError(
"All alias keys and values must be strings. "
f"Got {alias} (type {type(alias)}), {full_substring} (type {type(full_substring)}"
)
def run(self) -> None:
"""Main function, putting together all logic and evaluating result."""
self.pyright_config_data = self.get_and_validate_pyright_config_data()
self.original_pyright_results = self.get_original_pyright_results()
self.all_files_to_check = self.get_all_files_to_check()
self.all_pyright_ignores = self.get_all_pyright_ignores()
self.pyright_off_ignores = self.get_pyright_off_ignores()
self.real_errors = self.get_all_real_errors()
self.unused_ignores = self.get_unused_ignores()
self.evaluate_final_result()
def evaluate_final_result(self) -> None:
"""Reporting results to the user/CI (printing stuff, deciding exit value)."""
print(
f"\nIgnored {self.count_of_ignored_errors} custom-defined errors "
f"from {len(self.all_pyright_ignores)} files."
)
if self.unused_ignores:
print("\nWARNING: there are unused ignores!")
for unused_ignore in self.unused_ignores:
print(unused_ignore)
if self.inconsistencies:
print("\nWARNING: there are inconsistencies!")
for inconsistency in self.inconsistencies:
print(inconsistency)
if not self.real_errors:
print("\nSUCCESS: Everything is fine!")
if self.unused_ignores or self.inconsistencies:
print("But we have unused ignores or inconsistencies!")
sys.exit(2)
else:
sys.exit(0)
else:
print("\nERROR: We have issues!\n")
for error in self.real_errors:
print(self.get_human_readable_error_string(error))
print(f"Found {len(self.real_errors)} issues above")
if self.unused_ignores or self.inconsistencies:
print("And we have unused ignores or inconsistencies!")
sys.exit(1)
def get_and_validate_pyright_config_data(self) -> dict[str, Any]:
"""Verify that pyrightconfig exists and has correct data."""
if not os.path.isfile(self.pyright_config_file):
raise RuntimeError(
f"Pyright config file under {self.pyright_config_file} does not exist! "
"Tool relies on its existence, please create it."
)
try:
config_data = json.loads(open(self.pyright_config_file, "r").read())
except json.decoder.JSONDecodeError as err:
raise RuntimeError(
f"Pyright config under {self.pyright_config_file} does not contain valid JSON! Err: {err}"
) from None
# enableTypeIgnoreComments MUST be set to False, otherwise the "type: ignore"s
# will affect the original pyright result - and we need it to grab all the errors
# so we can handle them on our own
if (
"enableTypeIgnoreComments" not in config_data
or config_data["enableTypeIgnoreComments"]
):
raise RuntimeError(
f"Please set '\"enableTypeIgnoreComments\": true' in {self.pyright_config_file}. "
"Otherwise the tool will not work as expected."
)
return config_data
def get_original_pyright_results(self) -> PyrightResults:
"""Extract all information from pyright.
`pyright --outputjson` will return all the results in
nice JSON format with `generalDiagnostics` array storing
all the errors - schema described in PyrightResults
"""
if self.should_generate_error_file:
cmd = f"pyright -p {self.pyright_config_file} --outputjson > {self.error_file}"
exit_code = subprocess.call(cmd, shell=True)
# Checking if there was no non-type-checking error when running the above command
# Exit code 0 = all fine, no type-checking issues in pyright
# Exit code 1 = pyright has found some type-checking issues (expected)
# All other exit codes mean something non-type-related got wrong (or pyright was not found)
# https://github.com/microsoft/pyright/blob/main/docs/command-line.md#pyright-exit-codes
if exit_code not in (0, 1):
raise RuntimeError(
f"Running '{cmd}' produced a non-expected exit code (see output above)."
)
if not os.path.isfile(self.error_file):
raise RuntimeError(
f"Pyright error file under {self.error_file} was not generated by running '{cmd}'."
)
try:
pyright_results: PyrightResults = json.loads(
open(self.error_file, "r").read()
)
except FileNotFoundError:
raise RuntimeError(
f"Error file under {self.error_file} does not exist!"
) from None
except json.decoder.JSONDecodeError as err:
raise RuntimeError(
f"Error file under {self.error_file} does not contain valid JSON! Err: {err}"
) from None
if self.should_delete_error_file:
os.remove(self.error_file)
return pyright_results
def get_all_real_errors(self) -> Errors:
"""Analyze all pyright errors and discard all that should be ignored.
Ignores can be different:
- as per "# type: ignore [<error_substring>]" comment
- as per "file_specific_ignores"
- as per "# pyright: off" mark
"""
real_errors: Errors = []
for error in self.original_pyright_results["generalDiagnostics"]:
# Special handling of cycle import issues, which have different format
if "range" not in error:
error["range"] = {"start": {"line": 0}}
error["rule"] = "cycleImport"
real_errors.append(error)
continue
file_path = error["file"]
error_message = error["message"]
line_no = error["range"]["start"]["line"]
# Checking for "# type: ignore [<error_substring>]" comment
if self.should_ignore_per_inline_substring(
file_path, error_message, line_no
):
self.count_of_ignored_errors += 1
self.log_ignore(error, "error substring matched")
continue
# Checking in file_specific_ignores
if self.should_ignore_file_specific_error(file_path, error):
self.count_of_ignored_errors += 1
self.log_ignore(error, "file specific error")
continue
# Checking for "# pyright: off" mark
if self.is_line_in_pyright_off_block(file_path, line_no):
self.count_of_ignored_errors += 1
self.log_ignore(error, "pyright disabled for this line")
continue
real_errors.append(error)
return real_errors
def get_all_files_to_check(self) -> set[str]:
"""Get all files to be analyzed by pyright, based on its config."""
all_files: set[str] = set()
if "include" in self.pyright_config_data:
for dir_or_file in self.pyright_config_data["include"]:
for file in self.get_all_py_files_recursively(dir_or_file):
all_files.add(file)
else:
# "include" is missing, we should analyze all files in current dir
for file in self.get_all_py_files_recursively("."):
all_files.add(file)
if "exclude" in self.pyright_config_data:
for dir_or_file in self.pyright_config_data["exclude"]:
for file in self.get_all_py_files_recursively(dir_or_file):
if file in all_files:
all_files.remove(file)
return all_files
@staticmethod
def get_all_py_files_recursively(dir_or_file: str) -> set[str]:
"""Return all python files in certain directory (or the file itself)."""
if os.path.isfile(dir_or_file):
return set(str(HERE / dir_or_file))
all_files: set[str] = set()
for root, _, files in os.walk(dir_or_file):
for file in files:
if file.endswith(".py"):
all_files.add(str(HERE / os.path.join(root, file)))
return all_files
def get_all_pyright_ignores(self) -> FileIgnores:
"""Get ignore information from all the files to be analyzed."""
file_ignores: FileIgnores = {}
for file in self.all_files_to_check:
ignores = self.get_inline_type_ignores_from_file(file)
if ignores:
file_ignores[file] = ignores
return file_ignores
def get_pyright_off_ignores(self) -> FilePyrightOffIgnores:
"""Get ignore information based on `# pyright: on/off` marks."""
pyright_off_ignores: FilePyrightOffIgnores = {}
for file in self.all_files_to_check:
ignores = self.find_pyright_off_from_file(file)
if ignores:
pyright_off_ignores[file] = ignores
return pyright_off_ignores
def get_unused_ignores(self) -> list[str]:
"""Evaluate if there are no ignores not matched by pyright errors."""
unused_ignores: list[str] = []
# type: ignore
for file, file_ignores in self.all_pyright_ignores.items():
for line_ignore in file_ignores:
for ignore_statement in line_ignore.ignore_statements:
if not ignore_statement.already_used:
unused_ignores.append(
f"File {file}:{line_ignore.line_no + 1} has unused ignore. "
f"Substring: {ignore_statement.substring}"
)
# Pyright: off
for file, file_ignores in self.pyright_off_ignores.items():
for off_ignore in file_ignores:
if not off_ignore.already_used:
unused_ignores.append(
f"File {file} has unused # pyright: off ignore between lines "
f"{off_ignore.start_line + 1} and {off_ignore.end_line + 1}."
)
# File-specific
for file, file_ignores in self.file_specific_ignores.items():
for ignore_object in file_ignores:
if not ignore_object.already_used:
if ignore_object.substring:
unused_ignores.append(
f"File {file} has unused specific ignore substring. "
f"Substring: {ignore_object.substring}"
)
elif ignore_object.rule:
unused_ignores.append(
f"File {file} has unused specific ignore rule. "
f"Rule: {ignore_object.rule}"
)
return unused_ignores
def should_ignore_per_inline_substring(
self, file: str, error_message: str, line_no: int
) -> bool:
"""Check if line should be ignored based on inline substring/regex."""
if file not in self.all_pyright_ignores:
return False
for ignore_index, ignore in enumerate(self.all_pyright_ignores[file]):
if line_no == ignore.line_no:
for substring_index, ignore_statement in enumerate(
ignore.ignore_statements
):
# Supporting both text substrings and regex patterns
if ignore_statement.substring in error_message or re.search(
ignore_statement.substring, error_message
):
# Marking this ignore to be used (so we can identify unused ignores)
self.all_pyright_ignores[file][ignore_index].ignore_statements[
substring_index
].already_used = True
return True
return False
def should_ignore_file_specific_error(self, file: str, error: Error) -> bool:
"""Check if line should be ignored based on file-specific ignores."""
if file not in self.file_specific_ignores:
return False
for ignore_object in self.file_specific_ignores[file]:
if ignore_object.rule:
if error["rule"] == ignore_object.rule:
ignore_object.already_used = True
return True
elif ignore_object.substring:
# Supporting both text substrings and regex patterns
if ignore_object.substring in error["message"] or re.search(
ignore_object.substring, error["message"]
):
ignore_object.already_used = True
return True
return False
def is_line_in_pyright_off_block(self, file: str, line_no: int) -> bool:
"""Check if line should be ignored based on `# pyright: off` mark."""
if file not in self.pyright_off_ignores:
return False
for off_ignore in self.pyright_off_ignores[file]:
if off_ignore.start_line < line_no < off_ignore.end_line:
off_ignore.already_used = True
return True
return False
def find_pyright_off_from_file(self, file: str) -> PyrightOffIgnores:
"""Get sections in file to be ignored based on `# pyright: off`."""
pyright_off_ignores: PyrightOffIgnores = []
with open(file, "r") as f:
pyright_off = False
start_line = 0
index = 0
for index, line in enumerate(f):
if self.OFF_PATTERN in line and not pyright_off:
start_line = index
pyright_off = True
elif self.ON_PATTERN in line and pyright_off:
pyright_off_ignores.append(PyrightOffIgnore(start_line, index))
pyright_off = False
if pyright_off:
pyright_off_ignores.append(PyrightOffIgnore(start_line, index))
return pyright_off_ignores
def get_inline_type_ignores_from_file(self, file: str) -> LineIgnores:
"""Get all type ignore lines and statements from a certain file."""
ignores: LineIgnores = []
with open(file, "r") as f:
for index, line in enumerate(f):
if self.IGNORE_PATTERN in line:
ignore_statements = self.get_ignore_statements(line)
if not ignore_statements:
self.inconsistencies.append(
f"There is an empty `{self.IGNORE_PATTERN}` in {file}:{index+1}"
)
else:
ignores.append(LineIgnore(index, ignore_statements))
return ignores
def get_ignore_statements(self, line: str) -> list[IgnoreStatement]:
"""Extract error substrings to be ignored from a certain line."""
# Extracting content of [error_substring(s)] after the ignore comment
ignore_part = line.split(self.IGNORE_PATTERN, maxsplit=2)[1]
ignore_content = re.search(r"\[(.*)\]", ignore_part)
# We should not be using empty `# type: ignore` without content in []
# Notifying the parent function that we should do something about it
if not ignore_content:
return []
# There might be more than one substring
statement_substrings = ignore_content.group(1).split(self.IGNORE_DELIMITER)
# When finding aliases, replacing them with a real substring
statement_substrings = [self.aliases.get(ss, ss) for ss in statement_substrings]
return [IgnoreStatement(substr) for substr in statement_substrings]
def log_ignore(self, error: Error, reason: str) -> None:
"""Print the action of ignoring certain error into the console."""
if self.should_log:
err = self.get_human_readable_error_string(error)
print(f"\nError ignored. Reason: {reason}.\nErr: {err}")
@staticmethod
def get_human_readable_error_string(error: Error) -> str:
"""Transform error object to a string readable by human."""
file = error["file"]
message = error["message"]
rule = error["rule"]
line = error["range"]["start"]["line"]
# Need to add +1 to the line, as it is zero-based index
return f"{file}:{line + 1}: - error: {message} ({rule})\n"
if __name__ == "__main__":
tool = PyrightTool(
pyright_config_file=HERE / "pyrightconfig.json",
file_specific_ignores={
str(HERE / k): v for k, v in FILE_SPECIFIC_IGNORES.items()
},
aliases=ALIASES,
error_file="errors_for_pyright_temp.json",
should_generate_error_file=SHOULD_GENERATE_ERROR_FILE,
should_delete_error_file=SHOULD_DELETE_ERROR_FILE,
should_log=SHOULD_LOG,
)
tool.run()
Loading…
Cancel
Save