====== Cryptoview: Restoring coins overview ======
{{tag>dev python cryptoview}}
Okay, so, I desperately need to restore my **cryptoview** project now. This was formerly based on **wxPython** and I would really like to translate this into **pyqt5** in the process. I just hope this wont take me too long...
====== ======
===== Preparing application skeleton =====
* As usual starting with a minimal nervproj app component: """Module for CryptoView class definition"""
import logging
from PyQt5.QtCore import Qt
import PyQt5.QtWidgets as qwd
from PyQt5 import QtCore
from nvp.nvp_context import NVPContext
from nvh.gui.dock_widget import DockWidget
from nvh.gui.app_base import AppBase
from nvh.gui.crypto.coin_overview_panel import CoinOverviewPanel
logger = logging.getLogger(__name__)
class CryptoView(AppBase):
"""CryptoView component class"""
def __init__(self, ctx: NVPContext):
"""class constructor"""
AppBase.__init__(self, ctx, "cryptoview")
def process_command(self, cmd):
"""Check if this component can process the given command"""
if cmd == 'run':
self.run_app()
return True
return False
def build_app(self):
"""Re-implementation of the build function"""
super().build_app()
win = self.get_main_window()
# win.setWindowTitle("CryptoView")
# win.setFixedSize(800, 600)
dock1 = DockWidget("Overview")
dock1.setWidget(CoinOverviewPanel())
dock1.setAllowedAreas(Qt.AllDockWidgetAreas)
win.addDockWidget(Qt.RightDockWidgetArea, dock1)
win.setDockOptions(win.GroupedDragging | win.AllowTabbedDocks | win.AllowNestedDocks)
win.setTabPosition(Qt.AllDockWidgetAreas, qwd.QTabWidget.North)
# Set a size hint on the window:
setattr(win, "sizeHint", lambda: QtCore.QSize(800, 600))
if __name__ == "__main__":
# Create the context:
context = NVPContext()
# Add our component:
comp = context.register_component("app", CryptoView(context))
context.define_subparsers("main", {
'run': None,
})
psr = context.get_parser('main.run')
# psr.add_argument("message", type=str,
# help="Simple message that we should send")
comp.run()
* Also prepared a minimal "CoinOverviewPanel" class: """CoinOverviewPanel widget module"""
import logging
from PyQt5 import QtCore
from PyQt5.QtCore import Qt
import PyQt5.QtWidgets as qwd
logger = logging.getLogger(__name__)
class CoinOverviewPanel(qwd.QWidget):
"""CoinOverviewPanel class"""
def __init__(self):
"""Constructor of the CoinOverviewPanel class"""
super().__init__()
lyt = qwd.QVBoxLayout()
lyt.setContentsMargins(5, 5, 5, 5)
lyt.setSpacing(5)
# Add a simple label:
label = qwd.QLabel(self)
label.setText("This is the CoinOverviewPanel")
label.setAlignment(Qt.AlignCenter)
lyt.addWidget(label)
self.setLayout(lyt)
setattr(self, "sizeHint", lambda: QtCore.QSize(360, 300))
* Then we add the python environment and the script on the NervHome nvp_config.json file: "cryptoview_env": {
"packages": [
"requests",
"jstyleson",
"xxhash",
"PyQt5",
"psycopg2",
"numpy"
]
}
* And: "cryptoview": {
"custom_python_env": "cryptoview_env",
"cmd": "${PYTHON} ${PROJECT_ROOT_DIR}/nvh/app/crypto_view.py run",
"python_path": ["${PROJECT_ROOT_DIR}", "${NVP_ROOT_DIR}"]
}
* And next we try to run the app: $ nvp run cryptoview
* And here is the initial (rather empty but working) view that we get:
{{ blog:2022:0514:cryptoview_skeleton.png }}
===== Retrieving the monitored coins =====
* So I then updated the CoinOverviewPanel to collect all the monitored coins from coingecko, and build a **CoinPanel** for each of them:class CoinOverviewPanel(qwd.QWidget):
"""CoinOverviewPanel class"""
def __init__(self):
"""Constructor of the CoinOverviewPanel class"""
super().__init__()
# Create a CoinGecko() instance:
ctx = NVPContext.get()
self.cgk = Coingecko(ctx)
self.build_panel()
setattr(self, "sizeHint", lambda: QtCore.QSize(360, 300))
def build_panel(self):
"""Build the panel for this widget"""
coins = self.cgk.get_monitored_coins()
# logger.info("List of monitored coins: %s", coins)
lyt = qwd.QGridLayout()
lyt.setContentsMargins(5, 5, 5, 5)
lyt.setSpacing(5)
ncoins = len(coins)
for i in range(ncoins):
coin_id = coins[i]
coin = self.cgk.get_coin(coin_id)
if coin is None:
logger.error("Cannot retrieve coin for: %s", coin_id)
continue
row = i//5
col = i % 5
lyt.addWidget(CoinPanel(coin), row, col)
self.setLayout(lyt)
* And my initial **CoinPanel** class looks like this: """CoinPanel widget module"""
import logging
from PyQt5 import QtCore
from PyQt5.QtCore import Qt
import PyQt5.QtWidgets as qwd
logger = logging.getLogger(__name__)
class CoinPanel(qwd.QWidget):
"""CoinPanel class"""
def __init__(self, coin):
"""Constructor of the CoinPanel class"""
super().__init__()
self.coin = coin
self.symbol = coin.symbol().upper()
self.build_panel()
# setattr(self, "sizeHint", lambda: QtCore.QSize(360, 300))
def build_panel(self):
"""Build the panel for this widget"""
lyt = qwd.QHBoxLayout()
lyt.setContentsMargins(0, 0, 0, 0)
lyt.setSpacing(0)
label = qwd.QLabel(self)
label.setText(self.symbol)
label.setAlignment(Qt.AlignCenter)
lyt.addWidget(label)
self.setLayout(lyt)
* And now I get the symbols of all the coins shown as expected:
{{ blog:2022:0514:cryptoview_coin_symbols.png }}
===== Initial support for multithreading =====
* To be able to update the coins prices/data I need support to handle tasks multithreading in the context of the pyQT5 application.
* I found the following links with some good explanations on how to handle multithreading with pyqt5:
* https://realpython.com/python-pyqt-qthread/
* https://www.pythonguis.com/tutorials/multithreading-pyqt-applications-qthreadpool/
* So I implemented a **QTaskManager** component as follow: """Module for QTaskManager class definition"""
# cf. https://www.pythonguis.com/tutorials/multithreading-pyqt-applications-qthreadpool/
# cf. https://realpython.com/python-pyqt-qthread/
import sys
import traceback
import logging
# from PyQt5.QtGui import *
# from PyQt5.QtWidgets import
from PyQt5.QtCore import QObject, QRunnable, pyqtSignal, QThreadPool
from nvp.nvp_context import NVPContext
from nvp.nvp_component import NVPComponent
logger = logging.getLogger(__name__)
def create_component(ctx: NVPContext):
"""Create an instance of the component"""
return QTaskManager(ctx)
class TaskCancelledException(Exception):
"""Simple exception used to cancel a task."""
class QTaskSignals(QObject):
"""Signal provider for a given task."""
# finished = pyqtSignal()
error = pyqtSignal(object)
result = pyqtSignal(object)
progress = pyqtSignal(object)
class QTask(QRunnable):
"""Simple task encapsulation class."""
def __init__(self, manager, tid, name, func, *args, **kwargs):
super(QTask, self).__init__()
# Store constructor arguments (re-used for processing)
self.func = func
self.manager = manager
self.task_id = tid
self.name = name
self.args = args
self.kwargs = kwargs
self.signals = QTaskSignals()
self.kwargs['check_cancel'] = self.check_cancel
self.kwargs['report_progress'] = self.report_progress
def get_name(self):
"""Retrieve the name for this task"""
return self.name
def get_id(self):
"""Retrieve the id for this task"""
return self.task_id
def report_progress(self, value):
"""Report progress in this task."""
self.signals.progress.emit(value)
def check_cancel(self):
"""Check if this task is cancelled"""
if self.name is None:
# Cannot cancel a task without name:
return False
if self.task_id < self.manager.get_current_index(self.name):
raise TaskCancelledException(f"Cancelling task {self.name} [{self.task_id}]")
return False
def run(self):
"""Initialise the runner function with passed args, kwargs."""
# Retrieve args/kwargs here; and fire processing using them
result = None
try:
res = self.func(*self.args, **self.kwargs)
result = res
except TaskCancelledException:
logger.info("Cancelled task %s [%d]", self.name, self.task_id)
except: # pylint: disable=bare-except
traceback.print_exc()
exctype, value = sys.exc_info()[:2]
self.signals.error.emit({"exctype": exctype, "value": value, "traceback": traceback.format_exc()})
else:
self.signals.result.emit(result) # Return the result of the processing
# finally:
# self.signals.finished.emit(result) # Done
class QTaskManager(NVPComponent):
"""QTaskManager component class"""
def __init__(self, ctx: NVPContext):
"""class constructor"""
NVPComponent.__init__(self, ctx)
self.tpool = QThreadPool()
self.unnamed_index = 0
self.task_indices = {}
logger.info("Running QThreadPool with at max %d worker threads.", self.tpool.maxThreadCount())
def cancel_task(self, name):
"""Explicitly cancel a task by name."""
tid = self.task_indices.get(name, 0)
tid += 1
self.task_indices[name] = tid
return tid
def get_current_index(self, task_name):
"""Get the current index corresponding to a given task name"""
return self.task_indices.get(task_name, 0)
def add_task(self, func, name=None, on_done=None, on_error=None):
"""Method used to add/process a simple task."""
if name is None:
self.unnamed_index += 1
name = f"unnamed_{self.unnamed_index}"
# check if we already have a task with that name and cancel it if any:
# returning a new id for this task:
tid = self.cancel_task(name)
task = QTask(self, tid, name, func)
if on_error is not None:
task.signals.error.connect(on_error)
else:
task.signals.error.connect(lambda err: logger.error(
"An exception occured in task %s [%d]: %s", name, tid, self.pretty_print(err)))
if on_done is not None:
task.signals.result.connect(on_done)
# Finally we start the task:
self.tpool.start(task)
* And then I added a quick test in the CoinOverviewPanel class I created above to confirm it is working fine: def __init__(self):
"""Constructor of the CoinOverviewPanel class"""
super().__init__()
# Create a CoinGecko() instance:
ctx = NVPContext.get()
self.cgk = Coingecko(ctx)
self.coin_panels = {}
self.build_panel()
setattr(self, "sizeHint", lambda: QtCore.QSize(360, 300))
# Let's try a simple task here:
logger.info("The main thread id is %s", threading.get_ident())
tman: QTaskManager = ctx.get_component("qtasks")
tman.add_task(self.handle_large_task, on_done=self.task_completed)
def handle_large_task(self, **kwargs):
"""Simple test function"""
logger.info("Executing large task in thread %s", threading.get_ident())
time.sleep(5)
return {"key1": 42, "key2": True}
def task_completed(self, result):
"""Task completed callback"""
logger.info("Executing task completed callback in thread %s", threading.get_ident())
logger.info("The result value is: %s", result)
* And indeed, when running the cryptoview app I got the following results: $ nvp run cryptoview
2022/05/15 16:20:29 [nvh.gui.crypto.coin_overview_panel] INFO: The main thread id is 45516
2022/05/15 16:20:29 [nvh.gui.qtask_manager] INFO: Running QThreadPool with at max 8 worker threads.
2022/05/15 16:20:30 [nvh.gui.crypto.coin_overview_panel] INFO: Executing large task in thread 55992
2022/05/15 16:20:35 [nvh.gui.crypto.coin_overview_panel] INFO: Executing task completed callback in thread 45516
2022/05/15 16:20:35 [nvh.gui.crypto.coin_overview_panel] INFO: The result value is: {'key1': 42, 'key2': True}
To be able to access the "qtasks" component, I of course had to register it as a **dynamic component** as described in a previous article 😉
===== Create local price database =====
* Before I can update the price data for my coins I need to restore my **CryptoviewLocalDB** class to store the price data locally in an sqlite database, and the **PriceManager** class responsible for providing the actual data to the gui elements.
* I actually implemented the later as a component: """Module for PriceManager class definition"""
import logging
from nvp.nvp_context import NVPContext
from nvp.nvp_component import NVPComponent
from nvh.gui.app_base import AppBase
logger = logging.getLogger(__name__)
def create_component(ctx: NVPContext):
"""Create an instance of the component"""
return PriceManager(ctx)
class PriceManager(NVPComponent):
"""PriceManager component class"""
def __init__(self, ctx: NVPContext):
"""PriceManager constructor"""
NVPComponent.__init__(self, ctx)
self.price_data = {}
self.app: AppBase = self.get_component("app")
self.request_price_data_update()
def get_price_data(self):
"""Retrieve the price data"""
return self.price_data
def request_price_data_update(self):
"""Request an update of the price data"""
self.app.post_task(self.update_price_data, name="update_price_data")
def update_price_data(self, **kwargs):
"""Perform actual update of the price data."""
logger.info("Should update price data here, thread_id=%d", self.get_thread_id())
self.app.post_main_task(self.on_price_data_updated)
def on_price_data_updated(self):
"""Called when price data was updated."""
logger.info("On price data updated %d", self.get_thread_id())
* And while building this I also extended my **AppBase** a bit to support posting tasks either on the worker threads or on the main thread: def is_main_thread(self):
"""Check if we are on the main application thread."""
assert self.main_thread_id is not None, "Main ThreadID not set yet."
return self.main_thread_id == self.get_thread_id()
def on_task_received(self, desc):
"""Execute a task on the main thread."""
assert self.is_main_thread(), "Should be on the main thread here."
# We execute the task:
func = desc['func']
args = desc["args"]
kwargs = desc["kwargs"]
func(*args, **kwargs)
def post_main_task(self, func, *args, **kwargs):
"""Post a task on the main thread."""
assert func is not None, "Invalid task."
self.signals.task_received.emit({
"func": func,
"args": args,
"kwargs": kwargs})
def post_task(self, func, name=None, on_done=None, on_error=None, on_progress=None):
"""Post a task on the QTask manager."""
tman = self.get_component("qtasks")
tman.add_task(func, name=name, on_done=on_done, on_error=on_error, on_progress=on_progress)
===== Updating coin prices =====
* Next, with the support to collect the price data from my PostgresSQL database and to store it locally for faster processing, I could then continue with an update on the **PriceManager** to support updating the price data for each monitored coin: def request_price_data_update(self):
"""Request an update of the price data"""
logger.info("Requesting price update...")
self.app.post_task(self.update_price_data, name="update_price_data")
def update_price_data(self, check_cancel, **_kwargs):
"""Perform actual update of the price data."""
logger.info("Should update price data here, thread_id=%d", self.get_thread_id())
gecko = Coingecko(self.ctx)
pdb = gecko.get_price_db()
coins = gecko.get_monitored_coins()
self.coins = coins
logger.info("Collecting price data for %d currencies", len(coins))
db_dir = self.get_path(self.app.get_app_dir(), "data")
self.make_folder(db_dir)
db_file = self.get_path(db_dir, "cryptoview.local.db")
ldb = CryptoviewLocalDB(db_file)
cidx = 0
for cname in coins:
cidx += 1
check_cancel()
hname = f"history_{cname}"
stamp = 0
if not ldb.has_table(hname):
logger.info("Initializing local %s data", hname)
ldb.create_price_table(hname)
else:
stamp = ldb.get_last_timestamp(hname)
# Retrieve all the current available data on this coin:
hisdata = pdb.get_all_price_data(PriceDB.HISTORY, cname, stamp)
check_cancel()
if len(hisdata) > 1:
logger.info("Coin %d: Retrieved %d rows of remote history data for %s", cidx, len(hisdata), cname)
ldb.insert_price_data(hname, hisdata)
check_cancel()
hisdata = ldb.get_all_price_data(hname)
check_cancel()
# logger.info("Coin %d: Collected %d rows of local history data for %s" % (cidx, len(hisdata), cname))
hname = f"highres_{cname}"
stamp = 0
if not ldb.has_table(hname):
logger.info("Initializing local %s data", hname)
ldb.create_price_table(hname)
else:
stamp = ldb.get_last_timestamp(hname)
# Retrieve all the current available data on this coin:
hresdata = pdb.get_all_price_data(PriceDB.HIGHRES, cname, stamp)
check_cancel()
if len(hresdata) > 1:
logger.info("Coin %d: Retrieved %d rows of remote highres data for %s", cidx, len(hresdata), cname)
ldb.insert_price_data(hname, hresdata)
check_cancel()
hresdata = ldb.get_all_price_data(hname)
check_cancel()
# logger.info("Coin %d: Collected %d rows of local highres data for %s" % (cidx, len(hresdata), cname))
# The history/highres data should be a numpy array with the columns:
# timestamp,usd_price,usd_marketcap,usd_volume,btc_price,eth_price
self.price_data[cname] = {
'history': np.array(hisdata),
'highres': np.array(hresdata)
}
logger.info("Done collecting price data.")
self.compute_coins_data(check_cancel)
def get_quote_price(self, row, ref_price):
"""Return the current price value on the given row given the current quote coin:"""
if self.quote_coin is None:
return row[1], "$"
if self.quote_coin.name() == 'bitcoin':
return row[4], " BTC"
if self.quote_coin.name() == 'ethereum':
return row[5], " ETH"
# otherwise we use the USD price ratios from our current coin and the reference row:
return row[1]/ref_price, " "+self.quote_coin.symbol().upper()
def compute_coins_data(self, check_cancel, **_kwargs):
"""Use the latest price data to compute the coins data."""
assert self.coins is not None, "coins list not retrieved yet."
coins = self.coins
# Collect all the percentage changes:
pos_percent = []
neg_percent = []
self.oldest_stamp = None
ref_last_price = None
ref_prev_price = None
if self.quote_coin is not None:
qname = self.quote_coin.get_id()
arr = self.price_data[qname]['highres']
last_row = arr[-1]
ref_last_price = last_row[1]
stamp = last_row[0]
last_day_ts = stamp - self.period
last_row = arr[arr[:, 0] <= last_day_ts, :][-1]
ref_prev_price = last_row[1]
check_cancel()
for cname in coins:
# In somre cases we might still not have any highres data for that coin,
# So we ignore it in that case:
if not cname in self.price_data:
logger.warning("No price data available for coin %s", cname)
continue
# Retrieve the highres data we have for that coin and get the last price value from there:
arr = self.price_data[cname]['highres']
# Compute the required coin data:
check_cancel()
last_row = arr[-1]
usd_price, psym = self.get_quote_price(last_row, ref_last_price)
# The current timestamp is given by the value from the first column (in seconds)
stamp = last_row[0]
if self.oldest_stamp is None or stamp < self.oldest_stamp:
self.oldest_stamp = stamp
# Compute timestamp 24hours before:
last_day_ts = stamp - self.period
# We should now navigate in the high res data to select the last row with a timestamp smaller than that value:
# Actually we might have some missing data here, so we rather use the firt value above that last_day_ts:
# last_row = arr[arr[:,0]<=last_day_ts,:][-1]
prev_row = arr[arr[:, 0] >= last_day_ts, :][0]
# We compute the validity factor for that prev_row
val_ratio = (stamp - prev_row[0])/self.period
# From there we get the previous USD price:
prev_usd_price, psym = self.get_quote_price(prev_row, ref_prev_price)
# And we compute the percentage change:
percent = ((usd_price/prev_usd_price)-1.0)*100.0
if percent >= 0.0:
pos_percent.append(percent)
else:
neg_percent.append(percent)
self.coins_data[cname] = {
"price": usd_price,
"percent_change": percent,
"percent_ratio": 1.0,
"validity_ratio": val_ratio,
"quote_symbol": psym
}
# Should apply gradient colors so we use the percent ratio value:
max_pos_change = max(pos_percent) if len(pos_percent) > 0 else None
min_neg_change = min(neg_percent) if len(neg_percent) > 0 else None
for cname in coins:
# Check what is the ratio of that percentage change compared to the other:
cdata = self.coins_data[cname]
percent = cdata["percent_change"]
if percent >= 0:
cdata['percent_ratio'] = percent/max_pos_change if percent > 0 else 0.0
else:
cdata['percent_ratio'] = percent/min_neg_change
# Compute the max age of the data (in mins)
data_age = (self.get_timestamp() - self.oldest_stamp)/60
logger.info("Max price data age: %.3f mins", data_age)
self.app.post_main_task(self.notify_coins_data_update)
def notify_coins_data_update(self):
"""Notify that coins data were updated."""
for func in self.coins_data_callbacks:
func()
def on_coins_data_updated(self, cb):
"""Add a callback that should be executed (on the main thread)
when the coins data is updated."""
assert cb is not None, "Invalid calback"
self.coins_data_callbacks.append(cb)
* And then we notify the **OverviewPanel** to display the updated price infos (with one of those **coins_data_callback**): self.pman: PriceManager = ctx.get_component("price_manager")
self.pman.on_coins_data_updated(self.update_coin_panels)
* def update_coin_panels(self):
"""Update the coins panels using the coins data from the price manager"""
alldata = self.pman.get_coins_data()
for cname, panel in self.coin_panels.items():
if cname not in alldata:
logger.warning("No data available for coin %s", cname)
continue
cdata = alldata[cname]
panel.set_price(cdata["price"])
panel.set_quote_symbol(cdata["quote_symbol"])
panel.set_percent_change(cdata["percent_change"])
panel.set_percent_ratio(cdata["percent_ratio"])
panel.update()
logger.info("Done updating coin panels.")
* With those main changes (and many other small additional changes), here is the visual I get ont he overview panel:
{{ blog:2022:0514:new_overview_panel.png }}
=> So this is starting to look like something 👍😎!
===== Adding support to select quote token =====
* Next thing I'm going to add on this OverviewPanel is the possibility to select the quote token: could be set to USD or BTC or ETH or any other monitored coin in fact.
* I start with building the row in the GUI: lyt.addLayout(grid)
# Add row to contains the other controls:
row = qwd.QHBoxLayout()
row.setContentsMargins(0, 0, 0, 0)
row.setSpacing(5)
lbl = qwd.QLabel()
lbl.setText("Quote: ")
row.addWidget(lbl, stretch=0, alignment=Qt.AlignCenter)
btn = qwd.QRadioButton()
btn.setText("USD")
btn.setChecked(True)
row.addWidget(btn, stretch=0, alignment=Qt.AlignCenter)
btn = qwd.QRadioButton()
btn.setText("BTC")
row.addWidget(btn, stretch=0, alignment=Qt.AlignCenter)
btn = qwd.QRadioButton()
btn.setText("ETH")
row.addWidget(btn, stretch=0, alignment=Qt.AlignCenter)
btn = qwd.QRadioButton()
btn.setText("Other")
row.addWidget(btn, stretch=0, alignment=Qt.AlignCenter)
items = ["item1", "item2", "item3"]
wid = qwd.QComboBox()
wid.addItems(items)
row.addWidget(wid)
row.addStretch(1)
# Add the row to the layout:
lyt.addLayout(row)
self.setLayout(lyt)
* Next I have to populate the list of "other" coins with the corresponding coin symbols when this is possible, so when building the grid I also collect the symbols for each coin: # Collect all the symbols in the process:
syms = []
for i in range(ncoins):
coin_id = self.coins[i]
coin = self.cgk.get_coin(coin_id)
if coin is None:
logger.error("Cannot retrieve coin for: %s", coin_id)
continue
sym = coin.symbol().upper()
syms.append(sym)
row = i//ncols
col = i % ncols
panel = CoinPanel(coin)
self.coin_panels[coin_id] = panel
grid.addWidget(panel, row, col)
* Then I can assign that list of symbols directly to the combobox: # sort the symbols:
syms.sort()
wid = qwd.QComboBox()
wid.addItems(syms)
row.addWidget(wid)
self.quote_cb = wid
The list of symbols retrieved above will also contains 'BTC' and 'ETH', but not 'USD': but that's not a big deal.
* To make this more obvious, I should disable the combobox when "Other" is not selected (ie. calling ''self.quote_cb.setEnabled(False)'' initially). And in the process I also added the required button click handlers to change the quote coin: def set_quote_coin(self, coin):
"""Assign the quote coin in the price manager"""
self.pman.set_quote_coin(coin)
def on_usd_quote_selected(self):
"""Select USD as quote"""
self.quote_cb.setEnabled(False)
self.set_quote_coin(None)
def on_btc_quote_selected(self):
"""Select BTC as quote"""
self.quote_cb.setEnabled(False)
coin = self.cgk.get_coin("bitcoin")
self.set_quote_coin(coin)
def on_eth_quote_selected(self):
"""Select ETH as quote"""
self.quote_cb.setEnabled(False)
coin = self.cgk.get_coin("ethereum")
self.set_quote_coin(coin)
def on_other_quote_selected(self):
"""Select other as quote"""
self.quote_cb.setEnabled(True)
# Get current symbol selected:
sym = self.quote_cb.currentText()
coin = self.get_coin_by_symbol(sym)
assert coin is not None, f"Cannot retrieve coin with symbol {sym}"
self.set_quote_coin(coin)
===== Adding support for period selection =====
* When computing the percentage change I should be able to use either a period of 24 hours, 48 hours, 72 hours or 7 days, or 1 month, so let's add another dropdown item to select that:
* First I add the period names and values in the CoinOverviewPanel class constructor: self.period_names = ["24h", "48h", "72h", "7 Days", "1 Month"]
self.periods = [24*3600, 48*3600, 72*3600, 7*24*3600, 30.5*24*3600]
* Then we build the additional widget: lbl = qwd.QLabel()
lbl.setText("Period: ")
row.addWidget(lbl, stretch=0, alignment=Qt.AlignCenter)
wid = qwd.QComboBox()
wid.addItems(self.period_names)
wid.activated.connect(self.on_period_selected)
row.addWidget(wid)
self.period_cb = wid
* And finally we add the event handler connected just above: def on_period_selected(self):
"""Change the period value"""
pname = self.period_cb.currentText()
idx = self.period_names.index(pname)
assert idx >= 0, f"Invalid index for period name {pname}"
# Get the corresponding period value:
period = self.periods[idx]
# Assign the period value to the price manager:
self.pman.set_period(period)
* Naturally we also add the ''set_period()'' method in PriceManager: def set_period(self, period):
"""Assign the period value to use for change computations"""
self.period = period
self.request_coins_data_update()
* => **OK** Now I cal select the period of interest for the overview display:
{{ blog:2022:0514:period_selection.png }}
===== Adding support for coin sorting =====
* I also want to be able to sort the coins on the overview panel either by percentage, of symbol, of quote value, so I'm adding another dropdown menu for that: lbl = qwd.QLabel()
lbl.setText("Sort: ")
row.addWidget(lbl, stretch=0, alignment=Qt.AlignCenter)
wid = qwd.QComboBox()
wid.addItems(self.sort_names)
wid.activated.connect(self.sort_coin_panels)
row.addWidget(wid)
self.sort_cb = wid
* And the corresponding handler for it: def sort_coin_panels(self):
"""Sort the coin panels"""
panels = [p for p in self.coin_panels.values()]
self.sort_mode = self.sort_cb.currentText()
if self.sort_mode == 'Symbol':
panels.sort(key=lambda p: p.get_symbol())
elif self.sort_mode == 'Percentage':
panels.sort(key=lambda p: p.get_percent_change())
elif self.sort_mode == 'Price':
panels.sort(key=lambda p: p.get_price())
# Remove all those widgets:
for panel in panels:
self.grid.removeWidget(panel)
for i, panel in enumerate(panels):
row = i//self.ncols
col = i % self.ncols
self.grid.addWidget(panel, row, col)
* => This works just fine already:
{{ blog:2022:0514:sort_selection.png?800 }}
===== Conclusion =====
* As usual, there would be a lot more to add on this app, but this article is way too long already. So let's stop here for this time and continue this journey another time ;-)
* Side note: the next step will probably be to restore support to display the graph for each coin.