from framework import read, write, where, format_date from api import SignalType, PortfoliosRisk import json from datetime import datetime from enum import Enum __COLUMNS__ = { 'rrs_id': 'id', 'rrs_date': 'date', 'rrs_type': 'type', 'rrs_risk': 'risk', 'rrs_p_type': 'portfolio_type', 'rrs_p_weight': 'portfolio', 'rrs_effective': 'effective', } @read(one=True) def get_by_id(id): return f''' select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal {where(rrs_id=id)}''' @read(one=True) def get_first_after(type: SignalType, risk: PortfoliosRisk, min_date, effective=None): return f''' select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal {where(f"rrs_date >= {format_date(min_date)}", rrs_type=type, rrs_risk=risk, rrs_effective=effective)} order by rrs_date limit 1 ''' @read(one=True) def get_last_one(max_date, risk: PortfoliosRisk, type: SignalType = None, effective=None): return f''' select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal {where(f"rrs_date <= {format_date(max_date)}", rrs_type=type, rrs_risk=risk, rrs_effective=effective)} order by rrs_date desc limit 1 ''' def get_count(risk: PortfoliosRisk): def exec(): return f"select count(*) as `count` from robo_rebalance_signal {where(rrs_risk=risk)}" result = exec() return result['count'] @write def insert(datas): datas = {x[0]: datas[x[1]] for x in __COLUMNS__.items() if x[1] in datas and datas[x[1]] is not None} datas = { **datas, **{x[0]: format_date(x[1]) for x in datas.items() if isinstance(x[1], datetime)}, **{x[0]: x[1].value for x in datas.items() if isinstance(x[1], Enum)}, **{x[0]: json.dumps(x[1]) for x in datas.items() if isinstance(x[1], dict)}, **{x[0]: (1 if x[1] else 0) for x in datas.items() if isinstance(x[1], bool)} } return f''' insert into robo_rebalance_signal({','.join([x for x in datas.keys()])}) values ({','.join([f"'{x[1]}'" for x in datas.items()])}) '''