mirror of
https://github.com/trezor/trezor-firmware.git
synced 2025-01-17 10:51:00 +00:00
common/tools: factor out marketcap data download
This commit is contained in:
parent
993d3b6e3f
commit
9849d84a5e
@ -7,9 +7,9 @@ import sys
|
||||
import time
|
||||
|
||||
import click
|
||||
import requests
|
||||
|
||||
import coin_info
|
||||
import marketcap
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -20,11 +20,6 @@ WALLETS = coin_info.load_json("wallets.json")
|
||||
OVERRIDES = coin_info.load_json("coins_details.override.json")
|
||||
VERSIONS = coin_info.latest_releases()
|
||||
|
||||
COINMAKETCAP_CACHE = os.path.join(os.path.dirname(__file__), "coinmarketcap.json")
|
||||
COINMARKETCAP_API_BASE = "https://pro-api.coinmarketcap.com/v1/"
|
||||
|
||||
MARKET_CAPS = {}
|
||||
|
||||
# automatic wallet entries
|
||||
WALLET_TREZOR = {"Trezor": "https://wallet.trezor.io"}
|
||||
WALLET_ETH_TREZOR = {"Trezor Beta": "https://beta-wallet.trezor.io/next/"}
|
||||
@ -46,88 +41,9 @@ TREZOR_KNOWN_URLS = (
|
||||
)
|
||||
|
||||
|
||||
def coinmarketcap_call(endpoint, api_key, params=None):
|
||||
url = COINMARKETCAP_API_BASE + endpoint
|
||||
r = requests.get(url, params=params, headers={"X-CMC_PRO_API_KEY": api_key})
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
|
||||
def coinmarketcap_init(api_key, refresh=None):
|
||||
global MARKET_CAPS
|
||||
|
||||
force_refresh = refresh is True
|
||||
disable_refresh = refresh is False
|
||||
try:
|
||||
try:
|
||||
mtime = os.path.getmtime(COINMAKETCAP_CACHE)
|
||||
except FileNotFoundError:
|
||||
mtime = 0
|
||||
cache_is_fresh = mtime > time.time() - 3600
|
||||
if disable_refresh or (cache_is_fresh and not force_refresh):
|
||||
print("Using cached market cap data")
|
||||
with open(COINMAKETCAP_CACHE) as f:
|
||||
coinmarketcap_data = json.load(f)
|
||||
else:
|
||||
print("Fetching market cap data")
|
||||
coinmarketcap_data = coinmarketcap_call(
|
||||
"cryptocurrency/listings/latest",
|
||||
api_key,
|
||||
params={"limit": 5000, "convert": "USD"},
|
||||
)
|
||||
by_id = {str(coin["id"]): coin for coin in coinmarketcap_data["data"]}
|
||||
all_ids = list(by_id.keys())
|
||||
while all_ids:
|
||||
first_100 = all_ids[:100]
|
||||
all_ids = all_ids[100:]
|
||||
time.sleep(1)
|
||||
print("Fetching metadata, {} coins remaining...".format(len(all_ids)))
|
||||
metadata = coinmarketcap_call(
|
||||
"cryptocurrency/info", api_key, params={"id": ",".join(first_100)}
|
||||
)
|
||||
for coin_id, meta in metadata["data"].items():
|
||||
by_id[coin_id]["meta"] = meta
|
||||
|
||||
with open(COINMAKETCAP_CACHE, "w") as f:
|
||||
json.dump(coinmarketcap_data, f)
|
||||
except Exception as e:
|
||||
raise RuntimeError("market cap data unavailable") from e
|
||||
|
||||
coin_data = {}
|
||||
for coin in coinmarketcap_data["data"]:
|
||||
slug = coin["slug"]
|
||||
platform = coin["meta"]["platform"]
|
||||
market_cap = coin["quote"]["USD"]["market_cap"]
|
||||
if market_cap is not None:
|
||||
coin_data[slug] = int(market_cap)
|
||||
if platform is not None and platform["name"] == "Ethereum":
|
||||
address = platform["token_address"].lower()
|
||||
coin_data[address] = int(market_cap)
|
||||
|
||||
MARKET_CAPS = coin_data
|
||||
|
||||
return coin_data
|
||||
|
||||
|
||||
def marketcap(coin):
|
||||
cap = None
|
||||
if coin["type"] == "erc20":
|
||||
address = coin["address"].lower()
|
||||
return MARKET_CAPS.get(address)
|
||||
|
||||
if "coinmarketcap_alias" in coin:
|
||||
cap = MARKET_CAPS.get(coin["coinmarketcap_alias"])
|
||||
if cap is None:
|
||||
slug = coin["name"].replace(" ", "-").lower()
|
||||
cap = MARKET_CAPS.get(slug)
|
||||
if cap is None:
|
||||
cap = MARKET_CAPS.get(coin["shortcut"].lower())
|
||||
return cap
|
||||
|
||||
|
||||
def update_marketcaps(coins):
|
||||
for coin in coins.values():
|
||||
coin["marketcap_usd"] = marketcap(coin) or 0
|
||||
coin["marketcap_usd"] = marketcap.marketcap(coin) or 0
|
||||
|
||||
|
||||
def summary(coins, api_key):
|
||||
@ -149,7 +65,7 @@ def summary(coins, api_key):
|
||||
|
||||
total_marketcap = None
|
||||
try:
|
||||
ret = coinmarketcap_call("global-metrics/quotes/latest", api_key)
|
||||
ret = marketcap.call("global-metrics/quotes/latest", api_key)
|
||||
total_marketcap = int(ret["data"]["quote"]["USD"]["total_market_cap"])
|
||||
except Exception:
|
||||
pass
|
||||
@ -398,7 +314,7 @@ def main(refresh, api_key, verbose):
|
||||
handler.setLevel(log_level)
|
||||
root.addHandler(handler)
|
||||
|
||||
coinmarketcap_init(api_key, refresh=refresh)
|
||||
marketcap.init(api_key, refresh=refresh)
|
||||
|
||||
defs = coin_info.coin_info()
|
||||
support_info = coin_info.support_info(defs)
|
||||
|
91
common/tools/marketcap.py
Normal file
91
common/tools/marketcap.py
Normal file
@ -0,0 +1,91 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Fetch market capitalization data."""
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
|
||||
import requests
|
||||
|
||||
COINMAKETCAP_CACHE = os.path.join(os.path.dirname(__file__), "coinmarketcap.json")
|
||||
COINMARKETCAP_API_BASE = "https://pro-api.coinmarketcap.com/v1/"
|
||||
|
||||
MARKET_CAPS = {}
|
||||
|
||||
|
||||
def call(endpoint, api_key, params=None):
|
||||
url = COINMARKETCAP_API_BASE + endpoint
|
||||
r = requests.get(url, params=params, headers={"X-CMC_PRO_API_KEY": api_key})
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
|
||||
def init(api_key, refresh=None):
|
||||
global MARKET_CAPS
|
||||
|
||||
force_refresh = refresh is True
|
||||
disable_refresh = refresh is False
|
||||
try:
|
||||
try:
|
||||
mtime = os.path.getmtime(COINMAKETCAP_CACHE)
|
||||
except FileNotFoundError:
|
||||
mtime = 0
|
||||
cache_is_fresh = mtime > time.time() - 3600
|
||||
if disable_refresh or (cache_is_fresh and not force_refresh):
|
||||
print("Using cached market cap data")
|
||||
with open(COINMAKETCAP_CACHE) as f:
|
||||
coinmarketcap_data = json.load(f)
|
||||
else:
|
||||
print("Fetching market cap data")
|
||||
coinmarketcap_data = call(
|
||||
"cryptocurrency/listings/latest",
|
||||
api_key,
|
||||
params={"limit": 5000, "convert": "USD"},
|
||||
)
|
||||
by_id = {str(coin["id"]): coin for coin in coinmarketcap_data["data"]}
|
||||
all_ids = list(by_id.keys())
|
||||
while all_ids:
|
||||
first_100 = all_ids[:100]
|
||||
all_ids = all_ids[100:]
|
||||
time.sleep(1)
|
||||
print("Fetching metadata, {} coins remaining...".format(len(all_ids)))
|
||||
metadata = call(
|
||||
"cryptocurrency/info", api_key, params={"id": ",".join(first_100)}
|
||||
)
|
||||
for coin_id, meta in metadata["data"].items():
|
||||
by_id[coin_id]["meta"] = meta
|
||||
|
||||
with open(COINMAKETCAP_CACHE, "w") as f:
|
||||
json.dump(coinmarketcap_data, f)
|
||||
except Exception as e:
|
||||
raise RuntimeError("market cap data unavailable") from e
|
||||
|
||||
coin_data = {}
|
||||
for coin in coinmarketcap_data["data"]:
|
||||
slug = coin["slug"]
|
||||
platform = coin["meta"]["platform"]
|
||||
market_cap = coin["quote"]["USD"]["market_cap"]
|
||||
if market_cap is not None:
|
||||
coin_data[slug] = int(market_cap)
|
||||
if platform is not None and platform["name"] == "Ethereum":
|
||||
address = platform["token_address"].lower()
|
||||
coin_data[address] = int(market_cap)
|
||||
|
||||
MARKET_CAPS = coin_data
|
||||
|
||||
return coin_data
|
||||
|
||||
|
||||
def marketcap(coin):
|
||||
cap = None
|
||||
if coin["type"] == "erc20":
|
||||
address = coin["address"].lower()
|
||||
return MARKET_CAPS.get(address)
|
||||
|
||||
if "coinmarketcap_alias" in coin:
|
||||
cap = MARKET_CAPS.get(coin["coinmarketcap_alias"])
|
||||
if cap is None:
|
||||
slug = coin["name"].replace(" ", "-").lower()
|
||||
cap = MARKET_CAPS.get(slug)
|
||||
if cap is None:
|
||||
cap = MARKET_CAPS.get(coin["shortcut"].lower())
|
||||
return cap
|
Loading…
Reference in New Issue
Block a user