blog:2022:0517_cryptoview_restoring_overview_panel

Cryptoview: Restoring coins overview

Okay, so, I desperately need to restore my cryptoview project now. This was formerly based on wxPython and I would really like to translate this into pyqt5 in the process. I just hope this wont take me too long…

  • As usual starting with a minimal nervproj app component:
    """Module for CryptoView class definition"""
    
    import logging
    from PyQt5.QtCore import Qt
    import PyQt5.QtWidgets as qwd
    from PyQt5 import QtCore
    
    from nvp.nvp_context import NVPContext
    
    from nvh.gui.dock_widget import DockWidget
    from nvh.gui.app_base import AppBase
    from nvh.gui.crypto.coin_overview_panel import CoinOverviewPanel
    
    logger = logging.getLogger(__name__)
    
    
    class CryptoView(AppBase):
        """CryptoView component class"""
    
        def __init__(self, ctx: NVPContext):
            """class constructor"""
            AppBase.__init__(self, ctx, "cryptoview")
    
        def process_command(self, cmd):
            """Check if this component can process the given command"""
    
            if cmd == 'run':
                self.run_app()
                return True
    
            return False
    
        def build_app(self):
            """Re-implementation of the build function"""
            super().build_app()
    
            win = self.get_main_window()
            # win.setWindowTitle("CryptoView")
    
            # win.setFixedSize(800, 600)
    
            dock1 = DockWidget("Overview")
            dock1.setWidget(CoinOverviewPanel())
    
            dock1.setAllowedAreas(Qt.AllDockWidgetAreas)
    
            win.addDockWidget(Qt.RightDockWidgetArea, dock1)
            win.setDockOptions(win.GroupedDragging | win.AllowTabbedDocks | win.AllowNestedDocks)
            win.setTabPosition(Qt.AllDockWidgetAreas, qwd.QTabWidget.North)
    
            # Set a size hint on the window:
            setattr(win, "sizeHint", lambda: QtCore.QSize(800, 600))
    
    
    if __name__ == "__main__":
        # Create the context:
        context = NVPContext()
    
        # Add our component:
        comp = context.register_component("app", CryptoView(context))
    
        context.define_subparsers("main", {
            'run': None,
        })
    
        psr = context.get_parser('main.run')
        # psr.add_argument("message", type=str,
        #                  help="Simple message that we should send")
    
        comp.run()
    
  • Also prepared a minimal “CoinOverviewPanel” class:
    """CoinOverviewPanel widget module"""
    
    import logging
    from PyQt5 import QtCore
    from PyQt5.QtCore import Qt
    import PyQt5.QtWidgets as qwd
    
    logger = logging.getLogger(__name__)
    
    
    class CoinOverviewPanel(qwd.QWidget):
        """CoinOverviewPanel class"""
    
        def __init__(self):
            """Constructor of the CoinOverviewPanel class"""
            super().__init__()
    
            lyt = qwd.QVBoxLayout()
            lyt.setContentsMargins(5, 5, 5, 5)
            lyt.setSpacing(5)
    
            # Add a simple label:
            label = qwd.QLabel(self)
            label.setText("This is the CoinOverviewPanel")
            label.setAlignment(Qt.AlignCenter)
            lyt.addWidget(label)
    
            self.setLayout(lyt)
    
            setattr(self, "sizeHint", lambda: QtCore.QSize(360, 300))
    
  • Then we add the python environment and the script on the NervHome nvp_config.json file:
        "cryptoview_env": {
          "packages": [
            "requests",
            "jstyleson",
            "xxhash",
            "PyQt5",
            "psycopg2",
            "numpy"
          ]
        }
  • And:
        "cryptoview": {
          "custom_python_env": "cryptoview_env",
          "cmd": "${PYTHON} ${PROJECT_ROOT_DIR}/nvh/app/crypto_view.py run",
          "python_path": ["${PROJECT_ROOT_DIR}", "${NVP_ROOT_DIR}"]
        }
  • And next we try to run the app:
    $ nvp run cryptoview
  • And here is the initial (rather empty but working) view that we get:

  • So I then updated the CoinOverviewPanel to collect all the monitored coins from coingecko, and build a CoinPanel for each of them:
    class CoinOverviewPanel(qwd.QWidget):
        """CoinOverviewPanel class"""
    
        def __init__(self):
            """Constructor of the CoinOverviewPanel class"""
            super().__init__()
    
            # Create a CoinGecko() instance:
            ctx = NVPContext.get()
    
            self.cgk = Coingecko(ctx)
    
            self.build_panel()
    
            setattr(self, "sizeHint", lambda: QtCore.QSize(360, 300))
    
        def build_panel(self):
            """Build the panel for this widget"""
    
            coins = self.cgk.get_monitored_coins()
            # logger.info("List of monitored coins: %s", coins)
    
            lyt = qwd.QGridLayout()
            lyt.setContentsMargins(5, 5, 5, 5)
            lyt.setSpacing(5)
    
            ncoins = len(coins)
            for i in range(ncoins):
                coin_id = coins[i]
                coin = self.cgk.get_coin(coin_id)
                if coin is None:
                    logger.error("Cannot retrieve coin for: %s", coin_id)
                    continue
    
                row = i//5
                col = i % 5
    
                lyt.addWidget(CoinPanel(coin), row, col)
    
            self.setLayout(lyt)
  • And my initial CoinPanel class looks like this:
    """CoinPanel widget module"""
    
    import logging
    from PyQt5 import QtCore
    from PyQt5.QtCore import Qt
    import PyQt5.QtWidgets as qwd
    
    logger = logging.getLogger(__name__)
    
    
    class CoinPanel(qwd.QWidget):
        """CoinPanel class"""
    
        def __init__(self, coin):
            """Constructor of the CoinPanel class"""
            super().__init__()
            self.coin = coin
            self.symbol = coin.symbol().upper()
    
            self.build_panel()
    
            # setattr(self, "sizeHint", lambda: QtCore.QSize(360, 300))
    
        def build_panel(self):
            """Build the panel for this widget"""
    
            lyt = qwd.QHBoxLayout()
            lyt.setContentsMargins(0, 0, 0, 0)
            lyt.setSpacing(0)
    
            label = qwd.QLabel(self)
            label.setText(self.symbol)
            label.setAlignment(Qt.AlignCenter)
            lyt.addWidget(label)
    
            self.setLayout(lyt)
  • And now I get the symbols of all the coins shown as expected:

  • So I implemented a QTaskManager component as follow:
    """Module for QTaskManager class definition"""
    
    # cf. https://www.pythonguis.com/tutorials/multithreading-pyqt-applications-qthreadpool/
    # cf. https://realpython.com/python-pyqt-qthread/
    
    import sys
    import traceback
    import logging
    # from PyQt5.QtGui import *
    # from PyQt5.QtWidgets import
    from PyQt5.QtCore import QObject, QRunnable, pyqtSignal, QThreadPool
    
    from nvp.nvp_context import NVPContext
    from nvp.nvp_component import NVPComponent
    
    logger = logging.getLogger(__name__)
    
    
    def create_component(ctx: NVPContext):
        """Create an instance of the component"""
        return QTaskManager(ctx)
    
    
    class TaskCancelledException(Exception):
        """Simple exception used to cancel a task."""
    
    
    class QTaskSignals(QObject):
        """Signal provider for a given task."""
        # finished = pyqtSignal()
        error = pyqtSignal(object)
        result = pyqtSignal(object)
        progress = pyqtSignal(object)
    
    
    class QTask(QRunnable):
        """Simple task encapsulation class."""
    
        def __init__(self, manager, tid, name, func, *args, **kwargs):
            super(QTask, self).__init__()
    
            # Store constructor arguments (re-used for processing)
            self.func = func
            self.manager = manager
            self.task_id = tid
            self.name = name
            self.args = args
            self.kwargs = kwargs
            self.signals = QTaskSignals()
            self.kwargs['check_cancel'] = self.check_cancel
            self.kwargs['report_progress'] = self.report_progress
    
        def get_name(self):
            """Retrieve the name for this task"""
            return self.name
    
        def get_id(self):
            """Retrieve the id for this task"""
            return self.task_id
    
        def report_progress(self, value):
            """Report progress in this task."""
            self.signals.progress.emit(value)
    
        def check_cancel(self):
            """Check if this task is cancelled"""
            if self.name is None:
                # Cannot cancel a task without name:
                return False
    
            if self.task_id < self.manager.get_current_index(self.name):
                raise TaskCancelledException(f"Cancelling task {self.name} [{self.task_id}]")
            return False
    
        def run(self):
            """Initialise the runner function with passed args, kwargs."""
            # Retrieve args/kwargs here; and fire processing using them
            result = None
            try:
                res = self.func(*self.args, **self.kwargs)
                result = res
            except TaskCancelledException:
                logger.info("Cancelled task %s [%d]", self.name, self.task_id)
            except:  # pylint: disable=bare-except
                traceback.print_exc()
                exctype, value = sys.exc_info()[:2]
                self.signals.error.emit({"exctype": exctype, "value": value, "traceback": traceback.format_exc()})
            else:
                self.signals.result.emit(result)  # Return the result of the processing
                # finally:
                # self.signals.finished.emit(result)  # Done
    
    
    class QTaskManager(NVPComponent):
        """QTaskManager component class"""
    
        def __init__(self, ctx: NVPContext):
            """class constructor"""
            NVPComponent.__init__(self, ctx)
    
            self.tpool = QThreadPool()
            self.unnamed_index = 0
            self.task_indices = {}
            logger.info("Running QThreadPool with at max %d worker threads.", self.tpool.maxThreadCount())
    
        def cancel_task(self, name):
            """Explicitly cancel a task by name."""
            tid = self.task_indices.get(name, 0)
            tid += 1
            self.task_indices[name] = tid
            return tid
    
        def get_current_index(self, task_name):
            """Get the current index corresponding to a given task name"""
            return self.task_indices.get(task_name, 0)
    
        def add_task(self, func, name=None, on_done=None, on_error=None):
            """Method used to add/process a simple task."""
            if name is None:
                self.unnamed_index += 1
                name = f"unnamed_{self.unnamed_index}"
    
            # check if we already have a task with that name and cancel it if any:
            # returning a new id for this task:
            tid = self.cancel_task(name)
    
            task = QTask(self, tid, name, func)
    
            if on_error is not None:
                task.signals.error.connect(on_error)
            else:
                task.signals.error.connect(lambda err: logger.error(
                    "An exception occured in task %s [%d]: %s", name, tid, self.pretty_print(err)))
    
            if on_done is not None:
                task.signals.result.connect(on_done)
    
            # Finally we start the task:
            self.tpool.start(task)
    
  • And then I added a quick test in the CoinOverviewPanel class I created above to confirm it is working fine:
        def __init__(self):
            """Constructor of the CoinOverviewPanel class"""
            super().__init__()
    
            # Create a CoinGecko() instance:
            ctx = NVPContext.get()
    
            self.cgk = Coingecko(ctx)
            self.coin_panels = {}
    
            self.build_panel()
    
            setattr(self, "sizeHint", lambda: QtCore.QSize(360, 300))
    
            # Let's try a simple task here:
            logger.info("The main thread id is %s", threading.get_ident())
            tman: QTaskManager = ctx.get_component("qtasks")
            tman.add_task(self.handle_large_task, on_done=self.task_completed)
    
        def handle_large_task(self, **kwargs):
            """Simple test function"""
            logger.info("Executing large task in thread %s", threading.get_ident())
            time.sleep(5)
            return {"key1": 42, "key2": True}
    
        def task_completed(self, result):
            """Task completed callback"""
            logger.info("Executing task completed callback in thread %s", threading.get_ident())
            logger.info("The result value is: %s", result)
  • And indeed, when running the cryptoview app I got the following results:
    $ nvp run cryptoview
    2022/05/15 16:20:29 [nvh.gui.crypto.coin_overview_panel] INFO: The main thread id is 45516
    2022/05/15 16:20:29 [nvh.gui.qtask_manager] INFO: Running QThreadPool with at max 8 worker threads.
    2022/05/15 16:20:30 [nvh.gui.crypto.coin_overview_panel] INFO: Executing large task in thread 55992
    2022/05/15 16:20:35 [nvh.gui.crypto.coin_overview_panel] INFO: Executing task completed callback in thread 45516
    2022/05/15 16:20:35 [nvh.gui.crypto.coin_overview_panel] INFO: The result value is: {'key1': 42, 'key2': True}
To be able to access the “qtasks” component, I of course had to register it as a dynamic component as described in a previous article 😉
  • Before I can update the price data for my coins I need to restore my CryptoviewLocalDB class to store the price data locally in an sqlite database, and the PriceManager class responsible for providing the actual data to the gui elements.
  • I actually implemented the later as a component:
    """Module for PriceManager class definition"""
    
    import logging
    
    from nvp.nvp_context import NVPContext
    from nvp.nvp_component import NVPComponent
    from nvh.gui.app_base import AppBase
    
    logger = logging.getLogger(__name__)
    
    
    def create_component(ctx: NVPContext):
        """Create an instance of the component"""
        return PriceManager(ctx)
    
    
    class PriceManager(NVPComponent):
        """PriceManager component class"""
    
        def __init__(self, ctx: NVPContext):
            """PriceManager constructor"""
            NVPComponent.__init__(self, ctx)
            self.price_data = {}
            self.app: AppBase = self.get_component("app")
    
            self.request_price_data_update()
    
        def get_price_data(self):
            """Retrieve the price data"""
            return self.price_data
    
        def request_price_data_update(self):
            """Request an update of the price data"""
    
            self.app.post_task(self.update_price_data, name="update_price_data")
    
        def update_price_data(self, **kwargs):
            """Perform actual update of the price data."""
            logger.info("Should update price data here, thread_id=%d", self.get_thread_id())
            self.app.post_main_task(self.on_price_data_updated)
    
        def on_price_data_updated(self):
            """Called when price data was updated."""
            logger.info("On price data updated %d", self.get_thread_id())
    
  • And while building this I also extended my AppBase a bit to support posting tasks either on the worker threads or on the main thread:
        def is_main_thread(self):
            """Check if we are on the main application thread."""
            assert self.main_thread_id is not None, "Main ThreadID not set yet."
            return self.main_thread_id == self.get_thread_id()
    
        def on_task_received(self, desc):
            """Execute a task on the main thread."""
            assert self.is_main_thread(), "Should be on the main thread here."
    
            # We execute the task:
            func = desc['func']
            args = desc["args"]
            kwargs = desc["kwargs"]
            func(*args, **kwargs)
    
        def post_main_task(self, func, *args, **kwargs):
            """Post a task on the main thread."""
            assert func is not None, "Invalid task."
            self.signals.task_received.emit({
                "func": func,
                "args": args,
                "kwargs": kwargs})
    
        def post_task(self, func, name=None, on_done=None, on_error=None, on_progress=None):
            """Post a task on the QTask manager."""
            tman = self.get_component("qtasks")
            tman.add_task(func, name=name, on_done=on_done, on_error=on_error, on_progress=on_progress)
    
  • Next, with the support to collect the price data from my PostgresSQL database and to store it locally for faster processing, I could then continue with an update on the PriceManager to support updating the price data for each monitored coin:
        def request_price_data_update(self):
            """Request an update of the price data"""
            logger.info("Requesting price update...")
            self.app.post_task(self.update_price_data, name="update_price_data")
    
        def update_price_data(self, check_cancel, **_kwargs):
            """Perform actual update of the price data."""
            logger.info("Should update price data here, thread_id=%d", self.get_thread_id())
            gecko = Coingecko(self.ctx)
    
            pdb = gecko.get_price_db()
            coins = gecko.get_monitored_coins()
            self.coins = coins
            logger.info("Collecting price data for %d currencies", len(coins))
    
            db_dir = self.get_path(self.app.get_app_dir(), "data")
            self.make_folder(db_dir)
            db_file = self.get_path(db_dir, "cryptoview.local.db")
            ldb = CryptoviewLocalDB(db_file)
    
            cidx = 0
            for cname in coins:
                cidx += 1
    
                check_cancel()
                hname = f"history_{cname}"
                stamp = 0
                if not ldb.has_table(hname):
                    logger.info("Initializing local %s data", hname)
                    ldb.create_price_table(hname)
                else:
                    stamp = ldb.get_last_timestamp(hname)
    
                # Retrieve all the current available data on this coin:
                hisdata = pdb.get_all_price_data(PriceDB.HISTORY, cname, stamp)
                check_cancel()
    
                if len(hisdata) > 1:
                    logger.info("Coin %d: Retrieved %d rows of remote history data for %s", cidx, len(hisdata), cname)
                ldb.insert_price_data(hname, hisdata)
                check_cancel()
    
                hisdata = ldb.get_all_price_data(hname)
                check_cancel()
                # logger.info("Coin %d: Collected %d rows of local history data for %s" % (cidx, len(hisdata), cname))
    
                hname = f"highres_{cname}"
                stamp = 0
                if not ldb.has_table(hname):
                    logger.info("Initializing local %s data", hname)
                    ldb.create_price_table(hname)
                else:
                    stamp = ldb.get_last_timestamp(hname)
    
                # Retrieve all the current available data on this coin:
                hresdata = pdb.get_all_price_data(PriceDB.HIGHRES, cname, stamp)
                check_cancel()
    
                if len(hresdata) > 1:
                    logger.info("Coin %d: Retrieved %d rows of remote highres data for %s", cidx, len(hresdata), cname)
                ldb.insert_price_data(hname, hresdata)
                check_cancel()
    
                hresdata = ldb.get_all_price_data(hname)
                check_cancel()
                # logger.info("Coin %d: Collected %d rows of local highres data for %s" % (cidx, len(hresdata), cname))
    
                # The history/highres data should be a numpy array with the columns:
                # timestamp,usd_price,usd_marketcap,usd_volume,btc_price,eth_price
                self.price_data[cname] = {
                    'history': np.array(hisdata),
                    'highres': np.array(hresdata)
                }
    
            logger.info("Done collecting price data.")
    
            self.compute_coins_data(check_cancel)
    
        def get_quote_price(self, row, ref_price):
            """Return the current price value on the given row given the current quote coin:"""
            if self.quote_coin is None:
                return row[1], "$"
            if self.quote_coin.name() == 'bitcoin':
                return row[4], " BTC"
            if self.quote_coin.name() == 'ethereum':
                return row[5], " ETH"
    
            # otherwise we use the USD price ratios from our current coin and the reference row:
            return row[1]/ref_price, " "+self.quote_coin.symbol().upper()
    
        def compute_coins_data(self, check_cancel, **_kwargs):
            """Use the latest price data to compute the coins data."""
            assert self.coins is not None, "coins list not retrieved yet."
    
            coins = self.coins
    
            # Collect all the percentage changes:
            pos_percent = []
            neg_percent = []
    
            self.oldest_stamp = None
            ref_last_price = None
            ref_prev_price = None
    
            if self.quote_coin is not None:
                qname = self.quote_coin.get_id()
                arr = self.price_data[qname]['highres']
                last_row = arr[-1]
                ref_last_price = last_row[1]
    
                stamp = last_row[0]
                last_day_ts = stamp - self.period
    
                last_row = arr[arr[:, 0] <= last_day_ts, :][-1]
                ref_prev_price = last_row[1]
    
            check_cancel()
    
            for cname in coins:
                # In somre cases we might still not have any highres data for that coin,
                # So we ignore it in that case:
                if not cname in self.price_data:
                    logger.warning("No price data available for coin %s", cname)
                    continue
    
                # Retrieve the highres data we have for that coin and get the last price value from there:
                arr = self.price_data[cname]['highres']
    
                # Compute the required coin data:
                check_cancel()
    
                last_row = arr[-1]
                usd_price, psym = self.get_quote_price(last_row, ref_last_price)
    
                # The current timestamp is given by the value from the first column (in seconds)
                stamp = last_row[0]
    
                if self.oldest_stamp is None or stamp < self.oldest_stamp:
                    self.oldest_stamp = stamp
    
                # Compute timestamp 24hours before:
                last_day_ts = stamp - self.period
    
                # We should now navigate in the high res data to select the last row with a timestamp smaller than that value:
                # Actually we might have some missing data here, so we rather use the firt value above that last_day_ts:
                # last_row = arr[arr[:,0]<=last_day_ts,:][-1]
                prev_row = arr[arr[:, 0] >= last_day_ts, :][0]
    
                # We compute the validity factor for that prev_row
                val_ratio = (stamp - prev_row[0])/self.period
    
                # From there we get the previous USD price:
                prev_usd_price, psym = self.get_quote_price(prev_row, ref_prev_price)
    
                # And we compute the percentage change:
                percent = ((usd_price/prev_usd_price)-1.0)*100.0
    
                if percent >= 0.0:
                    pos_percent.append(percent)
                else:
                    neg_percent.append(percent)
    
                self.coins_data[cname] = {
                    "price": usd_price,
                    "percent_change": percent,
                    "percent_ratio": 1.0,
                    "validity_ratio": val_ratio,
                    "quote_symbol": psym
                }
    
            # Should apply gradient colors so we use the percent ratio value:
            max_pos_change = max(pos_percent) if len(pos_percent) > 0 else None
            min_neg_change = min(neg_percent) if len(neg_percent) > 0 else None
    
            for cname in coins:
    
                # Check what is the ratio of that percentage change compared to the other:
                cdata = self.coins_data[cname]
                percent = cdata["percent_change"]
    
                if percent >= 0:
                    cdata['percent_ratio'] = percent/max_pos_change if percent > 0 else 0.0
                else:
                    cdata['percent_ratio'] = percent/min_neg_change
    
            # Compute the max age of the data (in mins)
            data_age = (self.get_timestamp() - self.oldest_stamp)/60
            logger.info("Max price data age: %.3f mins", data_age)
    
            self.app.post_main_task(self.notify_coins_data_update)
    
        def notify_coins_data_update(self):
            """Notify that coins data were updated."""
            for func in self.coins_data_callbacks:
                func()
    
        def on_coins_data_updated(self, cb):
            """Add a callback that should be executed (on the main thread)
            when the coins data is updated."""
            assert cb is not None, "Invalid calback"
            self.coins_data_callbacks.append(cb)
  • And then we notify the OverviewPanel to display the updated price infos (with one of those coins_data_callback):
            self.pman: PriceManager = ctx.get_component("price_manager")
            self.pman.on_coins_data_updated(self.update_coin_panels)
  •     def update_coin_panels(self):
            """Update the coins panels using the coins data from the price manager"""
    
            alldata = self.pman.get_coins_data()
            for cname, panel in self.coin_panels.items():
                if cname not in alldata:
                    logger.warning("No data available for coin %s", cname)
                    continue
    
                cdata = alldata[cname]
                panel.set_price(cdata["price"])
                panel.set_quote_symbol(cdata["quote_symbol"])
                panel.set_percent_change(cdata["percent_change"])
                panel.set_percent_ratio(cdata["percent_ratio"])
                panel.update()
    
            logger.info("Done updating coin panels.")
    
  • With those main changes (and many other small additional changes), here is the visual I get ont he overview panel:

⇒ So this is starting to look like something 👍😎!

  • Next thing I'm going to add on this OverviewPanel is the possibility to select the quote token: could be set to USD or BTC or ETH or any other monitored coin in fact.
  • I start with building the row in the GUI:
            lyt.addLayout(grid)
    
            # Add row to contains the other controls:
            row = qwd.QHBoxLayout()
            row.setContentsMargins(0, 0, 0, 0)
            row.setSpacing(5)
    
            lbl = qwd.QLabel()
            lbl.setText("Quote: ")
            row.addWidget(lbl, stretch=0, alignment=Qt.AlignCenter)
    
            btn = qwd.QRadioButton()
            btn.setText("USD")
            btn.setChecked(True)
            row.addWidget(btn, stretch=0, alignment=Qt.AlignCenter)
    
            btn = qwd.QRadioButton()
            btn.setText("BTC")
            row.addWidget(btn, stretch=0, alignment=Qt.AlignCenter)
    
            btn = qwd.QRadioButton()
            btn.setText("ETH")
            row.addWidget(btn, stretch=0, alignment=Qt.AlignCenter)
    
            btn = qwd.QRadioButton()
            btn.setText("Other")
            row.addWidget(btn, stretch=0, alignment=Qt.AlignCenter)
    
            items = ["item1", "item2", "item3"]
            wid = qwd.QComboBox()
            wid.addItems(items)
            row.addWidget(wid)
    
            row.addStretch(1)
    
            # Add the row to the layout:
            lyt.addLayout(row)
            self.setLayout(lyt)
  • Next I have to populate the list of “other” coins with the corresponding coin symbols when this is possible, so when building the grid I also collect the symbols for each coin:
            # Collect all the symbols in the process:
            syms = []
    
            for i in range(ncoins):
                coin_id = self.coins[i]
                coin = self.cgk.get_coin(coin_id)
                if coin is None:
                    logger.error("Cannot retrieve coin for: %s", coin_id)
                    continue
    
                sym = coin.symbol().upper()
                syms.append(sym)
    
                row = i//ncols
                col = i % ncols
    
                panel = CoinPanel(coin)
                self.coin_panels[coin_id] = panel
                grid.addWidget(panel, row, col)
  • Then I can assign that list of symbols directly to the combobox:
            # sort the symbols:
            syms.sort()
            wid = qwd.QComboBox()
            wid.addItems(syms)
            row.addWidget(wid)
            self.quote_cb = wid
The list of symbols retrieved above will also contains 'BTC' and 'ETH', but not 'USD': but that's not a big deal.
  • To make this more obvious, I should disable the combobox when “Other” is not selected (ie. calling self.quote_cb.setEnabled(False) initially). And in the process I also added the required button click handlers to change the quote coin:
        def set_quote_coin(self, coin):
            """Assign the quote coin in the price manager"""
            self.pman.set_quote_coin(coin)
    
        def on_usd_quote_selected(self):
            """Select USD as quote"""
            self.quote_cb.setEnabled(False)
            self.set_quote_coin(None)
    
        def on_btc_quote_selected(self):
            """Select BTC as quote"""
            self.quote_cb.setEnabled(False)
            coin = self.cgk.get_coin("bitcoin")
            self.set_quote_coin(coin)
    
        def on_eth_quote_selected(self):
            """Select ETH as quote"""
            self.quote_cb.setEnabled(False)
            coin = self.cgk.get_coin("ethereum")
            self.set_quote_coin(coin)
    
        def on_other_quote_selected(self):
            """Select other as quote"""
            self.quote_cb.setEnabled(True)
            # Get current symbol selected:
            sym = self.quote_cb.currentText()
    
            coin = self.get_coin_by_symbol(sym)
            assert coin is not None, f"Cannot retrieve coin with symbol {sym}"
            self.set_quote_coin(coin)
  • When computing the percentage change I should be able to use either a period of 24 hours, 48 hours, 72 hours or 7 days, or 1 month, so let's add another dropdown item to select that:
  • First I add the period names and values in the CoinOverviewPanel class constructor:
            self.period_names = ["24h", "48h", "72h", "7 Days", "1 Month"]
            self.periods = [24*3600, 48*3600, 72*3600, 7*24*3600, 30.5*24*3600]
  • Then we build the additional widget:
            lbl = qwd.QLabel()
            lbl.setText("Period: ")
            row.addWidget(lbl, stretch=0, alignment=Qt.AlignCenter)
    
            wid = qwd.QComboBox()
            wid.addItems(self.period_names)
            wid.activated.connect(self.on_period_selected)
            row.addWidget(wid)
            self.period_cb = wid
  • And finally we add the event handler connected just above:
        def on_period_selected(self):
            """Change the period value"""
            pname = self.period_cb.currentText()
            idx = self.period_names.index(pname)
            assert idx >= 0, f"Invalid index for period name {pname}"
    
            # Get the corresponding period value:
            period = self.periods[idx]
    
            # Assign the period value to the price manager:
            self.pman.set_period(period)
  • Naturally we also add the set_period() method in PriceManager:
        def set_period(self, period):
            """Assign the period value to use for change computations"""
            self.period = period
            self.request_coins_data_update()
  • OK Now I cal select the period of interest for the overview display:

  • I also want to be able to sort the coins on the overview panel either by percentage, of symbol, of quote value, so I'm adding another dropdown menu for that:
            lbl = qwd.QLabel()
            lbl.setText("Sort: ")
            row.addWidget(lbl, stretch=0, alignment=Qt.AlignCenter)
    
            wid = qwd.QComboBox()
            wid.addItems(self.sort_names)
            wid.activated.connect(self.sort_coin_panels)
            row.addWidget(wid)
            self.sort_cb = wid
    
  • And the corresponding handler for it:
        def sort_coin_panels(self):
            """Sort the coin panels"""
            panels = [p for p in self.coin_panels.values()]
            self.sort_mode = self.sort_cb.currentText()
    
            if self.sort_mode == 'Symbol':
                panels.sort(key=lambda p: p.get_symbol())
            elif self.sort_mode == 'Percentage':
                panels.sort(key=lambda p: p.get_percent_change())
            elif self.sort_mode == 'Price':
                panels.sort(key=lambda p: p.get_price())
    
            # Remove all those widgets:
            for panel in panels:
                self.grid.removeWidget(panel)
    
            for i, panel in enumerate(panels):
                row = i//self.ncols
                col = i % self.ncols
    
                self.grid.addWidget(panel, row, col)
  • ⇒ This works just fine already:

  • As usual, there would be a lot more to add on this app, but this article is way too long already. So let's stop here for this time and continue this journey another time ;-)
  • Side note: the next step will probably be to restore support to display the graph for each coin.
  • blog/2022/0517_cryptoview_restoring_overview_panel.txt
  • Last modified: 2022/05/17 18:39
  • by 127.0.0.1