So, continuing the work started in my previous article on this topic, I'm now going to restore the “chart panel”, which was used to display 4 charts of the selected coin with 4 different time frames. As in the initial version I will use matplotlib to draw the chart. And in fact, I've already been trying to integrate that package with pyqt5 and it works just fine, so this task here should not be too hard hopefully . Let's get started!
"""ChartPanel widget module""" import logging from PyQt5 import QtCore from PyQt5.QtCore import Qt import PyQt5.QtWidgets as qwd from nvp.nvp_context import NVPContext from nvh.app.cryptoview.coin_panel import CoinPanel from nvh.app.cryptoview.price_manager import PriceManager logger = logging.getLogger(__name__) class ChartPanel(qwd.QWidget): """ChartPanel class""" def __init__(self): """Constructor of the ChartPanel class""" super().__init__() # Create a CoinGecko() instance: ctx = NVPContext.get() self.pman: PriceManager = ctx.get_component("price_manager") self.pman.on_coins_data_updated(self.update_coin_panels) self.coin_panels = {} self.selected_panel = None self.build_panel() setattr(self, "sizeHint", lambda: QtCore.QSize(360, 300)) def build_panel(self): """Build the panel for this widget""" lyt = qwd.QHBoxLayout() lyt.setContentsMargins(5, 5, 5, 5) lyt.setSpacing(5) col = qwd.QVBoxLayout() col.setContentsMargins(0, 0, 0, 0) col.setSpacing(5) coins = self.pman.get_monitored_coins() panel = qwd.QWidget() # panel.setSizePolicy(qwd.QSizePolicy.MinimumExpanding, # qwd.QSizePolicy.MinimumExpanding) col2 = qwd.QVBoxLayout() col2.setContentsMargins(5, 5, 5, 5) col2.setSpacing(5) for cid, coin in coins.items(): cpanel = CoinPanel(coin) cpanel.on_click(self.on_coin_panel_clicked) self.coin_panels[cid] = cpanel col2.addWidget(cpanel, stretch=0) panel.setLayout(col2) # QtCore.QTimer.singleShot(0, panel.adjustSize) scroll = qwd.QScrollArea() # scroll.setStyleSheet("QScrollArea { border: none; }") scroll.setVerticalScrollBarPolicy(Qt.ScrollBarAlwaysOn) scroll.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff) scroll.setWidget(panel) scroll.setWidgetResizable(True) # QtCore.QTimer.singleShot(0, scroll.adjustSize) col.addWidget(scroll) lyt.addLayout(col) lyt.addStretch(1) self.setLayout(lyt) # Select bitcoin by default: self.selected_panel = self.coin_panels['bitcoin'] self.selected_panel.highlight(True) def update_coin_panels(self): """Update the coins panels using the coins data from the price manager""" alldata = self.pman.get_coins_data() for cname, panel in self.coin_panels.items(): if cname not in alldata: logger.warning("No data available for coin %s", cname) continue cdata = alldata[cname] panel.set_price(cdata["price"]) panel.set_quote_symbol(cdata["quote_symbol"]) panel.set_percent_change(cdata["percent_change"]) panel.set_percent_ratio(cdata["percent_ratio"]) panel.update() logger.info("Done updating coin panels on ChartPanel") def on_coin_panel_clicked(self, panel, _event): """Event when a given panel is clicked.""" if self.selected_panel == panel: # Nothing to change return if self.selected_panel is not None: self.selected_panel.highlight(False) logger.info("Selecting coin %s", panel.get_symbol()) self.selected_panel = panel self.selected_panel.highlight(True)
# Adding the plots here: chart_col = qwd.QVBoxLayout() chart_col.setContentsMargins(0, 0, 0, 0) chart_col.setSpacing(5) lyt.addLayout(chart_col, stretch=1) row = qwd.QHBoxLayout() row.setContentsMargins(0, 0, 0, 0) row.setSpacing(5) chart_col.addLayout(row) def create_chart(layout): clyt = qwd.QVBoxLayout() clyt.setContentsMargins(0, 0, 0, 0) chart = PlotCanvas(self, width=2, height=2, dpi=100) toolbar = chart.create_toolbar(self) clyt.addWidget(toolbar) clyt.addWidget(chart) layout.addLayout(clyt) self.charts.append(chart) create_chart(row) # chart 0 create_chart(row) # chart 1 row = qwd.QHBoxLayout() row.setContentsMargins(0, 0, 0, 0) row.setSpacing(5) chart_col.addLayout(row) create_chart(row) # chart 2 create_chart(row) # chart 3 self.setLayout(lyt) # Select bitcoin by default: self.selected_panel = self.coin_panels['bitcoin'] self.selected_panel.highlight(True)
"""matplotlib canvas module""" import logging import matplotlib from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg, NavigationToolbar2QT as NavigationToolbar from matplotlib.figure import Figure matplotlib.use('Qt5Agg') logger = logging.getLogger(__name__) class PlotCanvas(FigureCanvasQTAgg): """Helper class to display a single matplotlib figure""" def __init__(self, _parent=None, width=5, height=4, dpi=100): fig = Figure(figsize=(width, height), dpi=dpi) self.axes = fig.add_subplot(111) super(PlotCanvas, self).__init__(fig) def create_toolbar(self, parent): """Create a matplotlib toolbar for this canvas""" return NavigationToolbar(self, parent)
def set_plot_data(self, plot, title, prices, vols, times): """Assign the data to a given PlotWidget""" # logDEBUG("Dates: %s" % xx[:10]) plot.chart.clear() plot.chart.set_title(title) if prices.size == 0: logger.warning("No price data for '%s'", title) plot.draw(prices, vols, times) def load_price_charts(self, coin_id): """Load the price charts for a given coin.""" # Retrieve the coin from the panel: coin = self.coin_panels[coin_id].get_coin() sym = coin.symbol().upper() # get the quote from the price manager: quote = self.pman.get_quote_coin() # Build the 24h/7days charts from the highres data: price_data = self.pman.get_price_data() hres = price_data[coin_id]['highres'] hist = price_data[coin_id]['history'] qhres = None if quote is None else price_data[quote]['highres'] qhist = None if quote is None else price_data[quote]['history'] # get the current timestamp: now_ts = self.pman.get_timestamp() def get_data(dset, start_ts): """Retrieve the data from a dataset, starting from a given timestamp""" data = dset[dset[:, 0] >= start_ts, :] # Check if we actually had more data before that and otherwise assume this was the start of the currency: if dset[0, 0] > start_ts+1: arr = np.empty((2, data.shape[1]), dtype=np.float) arr[:] = np.nan arr[0, 0] = start_ts arr[1, 0] = dset[0, 0]-1 data = np.vstack([arr, data]) return data def draw_chart(chart_idx, dset, quote_dset, start_ts, title): """Draw a given chart by index""" data = get_data(dset, start_ts) prices = None if quote is None: prices = data[:, 1] elif quote.get_id() == "bitcoin": prices = data[:, 4] elif quote.get_id() == "ethereum": prices = data[:, 5] else: # Get the quote data from that start timestamp: qdata = get_data(quote_dset, start_ts) # we might not have the same number of entries, but in that case we simply interpolate the quote on the # current timestamps: qprices = np.interp(data[:, 0].astype(float), qdata[:, 0].astype(float), qdata[:, 1].astype(float)) bprices = data[:, 1] # Then we do the division: prices = bprices / qprices vols = data[:, 3] times = data[:, 0].astype(np.int64) times = np.datetime64('1970-01-01T00:00:00') + times.astype('timedelta64[ms]') self.set_plot_data(self.charts[chart_idx], title, prices, vols, times) qstr = "USD" if quote is None else quote.symbol().upper() # 24H chart: draw_chart(0, hres, qhres, now_ts - 24*3600, f"{sym} - 24h [{qstr}]") # 7 days chart: draw_chart(1, hres, qhres, now_ts - 7*24*3600, f"{sym} - 7 Days [{qstr}]") # 30 days chart: # To draw the history charts we should also include the newest data from the high res datasets: last_hist_ts = hist[-1, 0] # Select all entries newer than the "last_hist_ts" from the high res dataset: append_hres = hres[hres[:, 0] > last_hist_ts, :] hist = np.vstack([hist, append_hres]) if qhist is not None: assert qhist[-1, 0] == last_hist_ts, f"Mismatch in last history timestamp: {qhist[-1, 0]} != {last_hist_ts}" append_hres2 = qhres[qhres[:, 0] > last_hist_ts, :] len1 = len(append_hres) len2 = len(append_hres2) assert len1 == len2, f"Mismatch in apendHRes length: {len1} != {len2}" qhist = np.vstack([hist, append_hres2]) draw_chart(2, hist, qhist, now_ts - 30*24*3600, f"{sym} - 30 Days [{qstr}]") # Max Days chat: max_days = coin.get_num_days() dset = hist if max_days > 7 else hres qds = qhist if max_days > 7 else qhres draw_chart(3, dset, qds, now_ts - max_days*24*3600, f"{sym} - Max ({max_days} Days) [{qstr}]")
"""PlotWidget widget module""" import logging import PyQt5.QtWidgets as qwd import numpy as np from nvh.gui.plot_canvas import PlotCanvas logger = logging.getLogger(__name__) class PlotWidget(qwd.QFrame): """PlotWidget class""" def __init__(self): """Constructor of the PlotWidget class""" super().__init__() lyt = qwd.QVBoxLayout() lyt.setContentsMargins(0, 0, 0, 0) self.chart = PlotCanvas(self, width=2, height=2, dpi=100) self.toolbar = self.chart.create_toolbar(self) lyt.addWidget(self.toolbar) lyt.addWidget(self.chart) self.setLayout(lyt) def draw(self, prices, vols, xpos, allow_nan=True): """Draw price and volume data.""" if prices.size == 0: self.chart.axes.text(0.5, 0.5, "No price data.", size="large", ha='center', va='center', weight='demi') return assert prices.size > 0, "Invalid prices array" if xpos is None: xpos = np.arange(prices.size) prices = prices.astype(np.float) vols = vols.astype(np.float) if not allow_nan: np.nan_to_num(prices, copy=False) np.nan_to_num(vols, copy=False) assert not np.isnan(prices).any(), "Should not have nan price values here!" pmin = np.nanmin(prices) pmax = np.nanmax(prices) # Rescale the volume prices in 1/5 of the price range: vrange = (pmax-pmin)/5.0 vmin = np.nanmin(vols) vmax = np.nanmax(vols) vscale = (vrange/(vmax - vmin)) if vmax > vmin else 1.0 vols = (vols-vmin)*vscale + pmin-vrange # draw a white line to ensure we have all the xx raneg covered even with Nan values: dum = np.ones((xpos.size))*pmin self.chart.axes.plot(xpos, dum, color='white', linewidth=1) self.chart.axes.plot(xpos, prices, color='blue', linewidth=1) self.chart.axes.plot(xpos, vols, color='red', linewidth=1) self.chart.draw()
def on_coin_panel_clicked(self, panel, _event): """Event when a given panel is clicked.""" if self.selected_panel == panel: # Nothing to change return if self.selected_panel is not None: self.selected_panel.highlight(False) logger.info("Selecting coin %s", panel.get_symbol()) self.selected_panel = panel self.selected_panel.highlight(True) self.update_charts() def update_charts(self): """Should update the charts here.""" coin = self.selected_panel.get_coin() self.load_price_charts(coin.get_id())
times = data[:, 0].astype(np.int64) # times = np.datetime64('1970-01-01T00:00:00') + times.astype('timedelta64[ms]') # use the current timestamp as reference: times = now_ts - times self.set_plot_data(self.charts[chart_idx], title, prices, vols, times)
wid = PlotWidget(3600) row.addWidget(wid) self.charts.append(wid) wid = PlotWidget(3600*24) row.addWidget(wid) self.charts.append(wid)
# xpos are times given in seconds from current timestamp, so we rescale given our period: # This is given in seconds, so we convert that to hours or days, or weeks depending on the number of entries: xpos = xpos.astype(np.float32) / self.period xmin = np.amin(xpos) xmax = np.amax(xpos) offset = self.offset * (xmax - xmin) self.chart.axes.set_xlim([xmax+offset, xmin-offset])