Okay so, I have a coin monitoring system I built some time ago: it's in python and it will retrive price data for many coins on a regular basis, and store that data into a postgresql database. It was producing some errors or conflicts or corruptions sometimes but on the whole it was working reasonably well. Until I decided to get ride of my fully corrupted RAID array, and thus mess up completely my main server structure lol.
Yet that tool was actually very handy (I then created a GUI on top of it to get some visuals on the coin price actions), so I think it's now high time I try and put this back on rails!
* * * * * su kenshin -c "bash /home/kenshin/scripts/cron/every_1min_operations.sh"
defi_gecko --updateHighResDatasets >>$lfile 2>&1
defi_gecko() { cd `defi_dir` nv_py_setup_paths nv_call_python gecko.py "$@" cd - > /dev/null }
"custom_python_envs": { "defi_env": { "packages": ["requests"] } }, "scripts": { // Update the coingecko prices "defi_gecko_update": { "custom_python_env": "defi_env", "cmd": "${PYTHON} nvh/defi/coingecko.py --updateHighResDatasets", "cwd": "${PROJECT_ROOT_DIR}", "python_path": ["${PROJECT_ROOT_DIR}"] } }
"""Module for Coingecko class definition""" import logging logger = logging.getLogger(__name__) if __name__ == "__main__": logger.info("This is the main script for coingecko")
$ nvp run defi_gecko_update Traceback (most recent call last): File "D:\Projects\NervProj\cli.py", line 5, in <module> ctx.run() File "D:\Projects\NervProj\nvp\nvp_context.py", line 315, in run if comp.process_command(cmd): File "D:\Projects\NervProj\nvp\components\runner.py", line 39, in process_command self.run_script(sname, proj) File "D:\Projects\NervProj\nvp\components\runner.py", line 120, in run_script self.execute(cmd, cwd=cwd, env=env) File "D:\Projects\NervProj\nvp\nvp_object.py", line 383, in execute subprocess.check_call(cmd, stdout=stdout, stderr=stderr, cwd=cwd, env=env) File "D:\Projects\NervProj\tools\windows\python-3.10.1\lib\subprocess.py", line 364, in check_call retcode = call(*popenargs, **kwargs) File "D:\Projects\NervProj\tools\windows\python-3.10.1\lib\subprocess.py", line 345, in call with Popen(*popenargs, **kwargs) as p: File "D:\Projects\NervProj\tools\windows\python-3.10.1\lib\subprocess.py", line 966, in __init__ self._execute_child(args, executable, preexec_fn, close_fds, File "D:\Projects\NervProj\tools\windows\python-3.10.1\lib\subprocess.py", line 1435, in _execute_child hp, ht, pid, tid = _winapi.CreateProcess(executable, args, FileNotFoundError: [WinError 2] Le fichier spƩcifiƩ est introuvable
"""Module for Coingecko class definition""" import logging from nvp.nvp_context import NVPContext from nvp.nvp_component import NVPComponent logger = logging.getLogger(__name__) class Coingecko(NVPComponent): """Coingecko component class""" def __init__(self, ctx: NVPContext): """Script runner constructor""" NVPComponent.__init__(self, ctx) # override the config entry with our dedicated config read # from the current base dir: self.config = self.config["coingecko"] def process_command(self, cmd): """Check if this component can process the given command""" if cmd == 'update-highres': self.update_highres_datasets() return True return False # def get_price_db(self, user=None, password=None): # if self.priceDb is None: # db_name = self.config['price_db_name'] # # logDEBUG2("Loading PriceDB from %s" % dbFile) # if user is None: # user = self.config['price_db_user'] # if password is None: # password = self.config['price_db_password'] # sqldb = PostgreSQLDB(dbName=db_name, user=user, password=password) # self.priceDb = PriceDB(sqldb) # # Now return that object: # return self.priceDb def get_monitored_coins(self): """Retrieve the full list of monitored coins""" return self.config["monitored_coins"] def update_highres_datasets(self, cids=None, period=60): """High frequency dataset update of the selected coins""" if cids is None: cids = self.get_monitored_coins() logger.info("Should update the monitored coins: %s", cids) # start_time = self.get_time() # pdb = self.get_price_db() # last_datas = {} # # check for too long gaps in the datasets: # now_stamp = self.get_timestamp() if __name__ == "__main__": # Create the context: context = NVPContext() # Add our component: comp = context.register_component("coingecko", Coingecko(context)) context.define_subparsers("main", {'update-highres': None}) # psr = context.get_parser('main') # psr.add_argument("--updateHighResDatasets", dest="update_highres", action="store_true", # help="Update the highres data from coingecko") comp.run()
$ nvp -p nvh admin init -p $ nvp home nvh # => Update the tools/requirements.txt file here. $ ./cli.sh --install-py-reqs
$ nvp run defi_gecko_update 2022/05/07 17:52:29 [nvh.core.postgresql_db] INFO: Opening connection to crypto_prices_v2 2022/05/07 17:52:33 [__main__] INFO: Writing 8 new price entries 2022/05/07 17:52:33 [__main__] INFO: Done updating highres data for bitcoin 2022/05/07 17:52:37 [__main__] INFO: Writing 8 new price entries 2022/05/07 17:52:37 [__main__] INFO: Done updating highres data for ethereum 2022/05/07 17:52:41 [__main__] INFO: Writing 8 new price entries 2022/05/07 17:52:41 [__main__] INFO: Done updating highres data for litecoin 2022/05/07 17:52:45 [__main__] INFO: Writing 8 new price entries 2022/05/07 17:52:45 [__main__] INFO: Done updating highres data for binancecoin 2022/05/07 17:52:48 [__main__] INFO: Writing 9 new price entries 2022/05/07 17:52:48 [__main__] INFO: Done updating highres data for elrond-erd-2 2022/05/07 17:52:50 [__main__] INFO: Last written timestamp for bitcoin: 1651941900 2022/05/07 17:52:50 [__main__] INFO: Writing 1 new price observations for bitcoin 2022/05/07 17:52:50 [__main__] INFO: Last written timestamp for ethereum: 1651941900 2022/05/07 17:52:50 [__main__] INFO: Writing 1 new price observations for ethereum 2022/05/07 17:52:50 [__main__] INFO: Last written timestamp for litecoin: 1651941900 2022/05/07 17:52:50 [__main__] INFO: Writing 1 new price observations for litecoin 2022/05/07 17:52:50 [__main__] INFO: Last written timestamp for binancecoin: 1651941900 2022/05/07 17:52:50 [__main__] INFO: Writing 1 new price observations for binancecoin 2022/05/07 17:52:50 [__main__] INFO: Last written timestamp for elrond-erd-2: 1651942200 2022/05/07 17:52:50 [__main__] INFO: Updated all high res datasets in 20.780 seconds
def update_highres_datasets(self, cids=None, period=300):
source /mnt/data1/dev/projects/NervProj/cli.sh log_dir="/mnt/data1/admin/logs" lfile="${log_dir}/coingecko_highres_updates.log" nvp run defi_gecko_update 2>&1 | tee -a $lfile
$ nvp git pull $ nvp -p nvh git pull $ nvp pyenv setup defi_env $ nvp run defi_gecko_update
*/5 * * * * su kenshin -c "bash /home/kenshin/cron/every_5mins_operations.sh"
# By default our start_stamp should be the date of release of the coin: start_stamp = self.getCoin(cid).getStartTimestamp()
"gecko_update_history": { "custom_python_env": "defi_env", "cmd": "${PYTHON} nvh/defi/coingecko.py update-history", "cwd": "${PROJECT_ROOT_DIR}", "python_path": ["${PROJECT_ROOT_DIR}", "${NVP_ROOT_DIR}"] }
$ nvp run gecko_update_history 2022/05/08 15:27:47 [__main__] INFO: Default start timestamp for bitcoin is: 1367107200 2022/05/08 15:27:47 [__main__] INFO: Creating history price table for bitcoin 2022/05/08 15:27:51 [__main__] INFO: Writing 89 new price entries 2022/05/08 15:27:56 [__main__] INFO: Writing 89 new price entries 2022/05/08 15:28:01 [__main__] INFO: Writing 89 new price entries 2022/05/08 15:28:06 [__main__] INFO: Writing 89 new price entries 2022/05/08 15:28:12 [__main__] INFO: Writing 89 new price entries 2022/05/08 15:28:16 [__main__] INFO: Writing 89 new price entries 2022/05/08 15:28:21 [__main__] INFO: Writing 89 new price entries 2022/05/08 15:28:26 [__main__] INFO: Writing 89 new price entries
lfile="${log_dir}/coingecko_history_updates.log" nvp run gecko_update_history 2>&1 | tee -a $lfile
def parse_args(self): """Parse the command line arguments""" # self.settings = vars(self.parsers['main'].parse_args()) # cf. https://docs.python.org/3.4/library/argparse.html#partial-parsing self.settings, self.additional_args = self.parsers['main'].parse_known_args() self.settings = vars(self.settings)
# Check if we have additional args to pass to the command: args = self.ctx.get_additional_args() if len(args) > 0: cmd += args # Execute that command: logger.info("Executing script command: %s (cwd=%s)", cmd, cwd) self.execute(cmd, cwd=cwd, env=env)
"coingecko": { "custom_python_env": "defi_env", "cmd": "${PYTHON} nvh/defi/coingecko.py", "cwd": "${PROJECT_ROOT_DIR}", "python_path": ["${PROJECT_ROOT_DIR}", "${NVP_ROOT_DIR}"] }
$ nvp run coingecko update-history 2022/05/08 19:07:07 [nvp.components.runner] INFO: Executing script command: ['D:\\Projects\\NervProj\\.pyenvs\\defi_env\\python.exe', 'nvh/defi/coingecko.py', 'update-history'] (cwd=D:\Projects\NervHome) 2022/05/08 19:07:09 [__main__] INFO: Default start timestamp for bitcoin is: 1367107200 2022/05/08 19:07:13 [__main__] INFO: Writing 4 new price entries 2022/05/08 19:07:13 [__main__] INFO: Done updating history data for bitcoin 2022/05/08 19:07:13 [__main__] INFO: Default start timestamp for ethereum is: 1438905600 2022/05/08 19:07:17 [__main__] INFO: Writing 4 new price entries 2022/05/08 19:07:17 [__main__] INFO: Done updating history data for ethereum 2022/05/08 19:07:17 [__main__] INFO: Default start timestamp for litecoin is: 1367107200 2022/05/08 19:07:21 [__main__] INFO: Writing 4 new price entries 2022/05/08 19:07:21 [__main__] INFO: Done updating history data for litecoin 2022/05/08 19:07:21 [__main__] INFO: Default start timestamp for binancecoin is: 1505520000 2022/05/08 19:07:26 [__main__] INFO: Writing 4 new price entries 2022/05/08 19:07:26 [__main__] INFO: Done updating history data for binancecoin 2022/05/08 19:07:26 [__main__] INFO: Default start timestamp for elrond-erd-2 is: 1599091200 2022/05/08 19:07:30 [__main__] INFO: Writing 4 new price entries 2022/05/08 19:07:30 [__main__] INFO: Done updating history data for elrond-erd-2
nvp run coingecko --help
will should the help for the run command: the --help argument is not āunknownā to it's collected as usualā¦ š Not quite sure how to deal with this, but well, I can live with this for now anyway.
--show-help
to the runner parser, and when specified, this will add the --help
to our script command line (tested and working š!): if len(args) > 0: cmd += args if self.get_param("show_script_help", False): cmd += ["--help"]
context.define_subparsers("main", { 'update-highres': None, 'update-history': None, 'monitored': { 'list': None, 'add': None, 'remove': None, 'drop': None } })
if cmd == 'monitored': cmd1 = self.ctx.get_command(1) if cmd1 == 'list': # We should list all the monitored coins: mcoins = self.get_coin_db().get_all_monitored_coins() logger.info("List of monitored coins: %s", self.pretty_print(mcoins)) return True if cmd1 == 'remove': # We should list all the monitored coins: mcoin = self.get_param("mcoin_name") self.get_coin_db().delete_monitored_coin(mcoin) logger.info("Removed monitored coin %s", mcoin) return True if cmd1 == 'add': # We should list all the monitored coins: mcoin = self.get_param("mcoin_name") # Convert to list: mcoin = mcoin.split(",") self.get_coin_db().insert_monitored_coins(mcoin) logger.info("Added monitored coins: %s", mcoin) return True if cmd1 == 'drop': logger.info("TODO: Should drop monitored coins table here.") return True
$ nvp run coingecko monitored list 2022/05/08 20:11:39 [__main__] INFO: List of monitored coins: [ 'bitcoin', 'ethereum', 'litecoin', 'binancecoin', 'elrond-erd-2', # ( large list here ) 'meld']
$ nvp run coingecko monitored remove meld 2022/05/08 20:12:27 [__main__] INFO: Removed monitored coin meld
$ nvp run coingecko monitored add meld 2022/05/08 20:12:52 [__main__] INFO: Added monitored coins: ['meld']
def get_monitored_coins(self): """Retrieve the full list of monitored coins""" # return self.config["monitored_coins"] return self.get_coin_db().get_all_monitored_coins()
$ nvp run coingecko update-history
coingecko_highres_updates.log
, I get: 2022/05/09 07:55:04 [__main__] INFO: Updated 69 highres datasets in 1.929 seconds 2022/05/09 08:00:05 [__main__] INFO: Updated 69 highres datasets in 2.541 seconds 2022/05/09 08:05:04 [__main__] INFO: Updated 69 highres datasets in 1.922 seconds 2022/05/09 08:10:04 [__main__] INFO: Updated 69 highres datasets in 2.024 seconds 2022/05/09 08:15:04 [__main__] INFO: Updated 69 highres datasets in 1.939 seconds
// Backup of postgresql server data: "postgresql_server": { "source_dir": "/mnt/data1/containers/postgresql_server", "backup_dir": "${slot1}/containers", "repository_url": "ssh://git@gitlab.nervtech.org:22002/backups/postgresql_server.git", "clone_dirs": [ "${slot1}/git/archives_1", "${slot2}/git/archives_1", "${slot3}/git/archives_1" ] },
docker exec -t postgresql_server pg_dump crypto_coins -c -U crypto_user > my_dump.sql
// Backup of individual SQL databases: "postgresql_crypto_databases": { "type": "postgresql", "container": "postgresql_server", "user": "crypto_user", "databases": ["crypto_coins", "crypto_prices_v2"], "backup_dirs": [ "${slot1}/sql/crypto", "${slot2}/sql/crypto", "${slot3}/sql/crypto" ] },
def backup_postgresql_databases(self, tgt_name, desc): """Handle a postgresql database backup target""" logger.info("Should backup postgresql databases with desc: %s", desc)
$ nvp backup run postgresql_crypto_databases 2022/05/09 08:21:58 [components.backup_manager] INFO: Should backup postgresql databases with desc: {'type': 'postgresql', 'container': ' postgresql_server', 'user': 'crypto_user', 'databases': ['crypto_coins', 'crypto_prices_v2'], 'backup_dirs': ['${slot1}/sql/crypto', '${s lot2}/sql/crypto', '${slot3}/sql/crypto']} 2022/05/09 08:21:58 [components.backup_manager] INFO: Backup target postgresql_crypto_databases processed in 0.00 seconds
def backup_postgresql_databases(self, tgt_name, desc): """Handle a postgresql database backup target""" # prepare the work folder: backup_dirs = self.collect_valid_slot_paths(desc["backup_dirs"]) num_bdirs = len(backup_dirs) if num_bdirs == 0: logger.warning("No valid backup dir provided for %s", tgt_name) return logger.info("postgresql db backup dirs: %s", backup_dirs) # Prepare the work folder: work_dir = backup_dirs[0] # Ensure our work folder is created: self.make_folder(work_dir) # Iterate on each database: dbs = desc["databases"] user = desc["user"] container = desc["container"] now = self.get_now() now_str = now.strftime("%Y%m%d_%H%M%S") tools = self.get_component('tools') btype = self.get_rolling_backup_type(now.date()) suffix = f"{btype}_{now_str}.sql" for db_name in dbs: # Prepare the command line: cmd = ["docker", "exec", "-t", container, "pg_dump", db_name, "-c", "-U", user] filename = f"{db_name}_{suffix}" outfile = self.get_path(work_dir, filename) logger.info("Dumping database %s...", db_name) with open(outfile, "w", encoding="utf-8") as file: self.execute(cmd, outfile=file) # Next we should compress that file: logger.info("Generating archive for %s...", db_name) pkg_name = f"{filename}.tar.xz" tools.create_package(outfile, work_dir, pkg_name) # remove the source sql file: self.remove_file(outfile) pkg_file = self.get_path(work_dir, pkg_name) dest_folder = self.get_path(work_dir, db_name) self.make_folder(dest_folder) dest_file = self.get_path(dest_folder, pkg_name) self.rename_file(pkg_file, dest_file) self.remove_old_backup_files(dest_folder) # Finally we also copy that file into the additional backup slots: for i in range(1, num_bdirs): bck_dir = backup_dirs[i] dest_folder = self.get_path(bck_dir, db_name) self.make_folder(dest_folder) logger.debug("Copying %s into %s", pkg_name, dest_folder) self.copy_file(dest_file, self.get_path(dest_folder, pkg_name)) # Remove the old backups there too: self.remove_old_backup_files(dest_folder) def get_rolling_backup_type(self, date): """Retrieve the type of rolling backup based on the given date""" if date.day == 1: # first day of the month: return "mbak" if date.weekday() == 0: # This is a monday: return "wbak" # ordinary day backup: return "dbak" def remove_old_backup_files(self, folder): """Remove the too old backup files in a given folder""" files = self.get_all_files(folder, recursive=True) now_stamp = self.get_timestamp() # Iterate on each file: for fname in files: if "_mbak_" in fname: # keep for 366 days! offset = 366*24*3600 elif "_wbak_" in fname: # keep for 28 days: offset = 28*24*3600 elif "_dbak_" in fname: # Keep the files for 7 days offset = 7*24*3600 else: logger.warning("Ignoring non-rolling backup file %s", fname) continue min_stamp = now_stamp - offset # Check the file timestamp: file_path = self.get_path(folder, fname) mtime = self.get_file_mtime(file_path) if mtime < min_stamp: logger.info("Removing old backup file %s", file_path) self.remove_file(file_path) else: ndays = (mtime - min_stamp)/(24*3600) logger.debug("Keeping %s for %.3f more days", file_path, ndays)
# Create The par files: redun = 10 pkg_file = self.get_path(work_dir, pkg_name) fsize = self.get_file_size(pkg_file) # We allocate 1 block per 1kB with min=10, max=3000 nblocks = max(min(fsize/1024, 3000), 10) logger.info("Generating PAR2 files with %d source blocks...", nblocks) tools.create_par2_archives(pkg_file, redun, nblocks)