import json from py_jftech import read, write, where, mapper_columns, format_date from api import PortfoliosRisk, LoggerType __COLUMNS__ = { 'rdl_id': 'id', 'rdl_date': 'date', 'rdl_risk': 'risk', 'rdl_type': 'type', 'rdl_datas': 'datas', } @write def batch_insert(datas): datas = [mapper_columns(x, __COLUMNS__) for x in datas] values = ','.join([f'''({','.join([(f"'{x[j]}'" if j in x and x[j] is not None else 'null') for j in __COLUMNS__.keys() if j != 'rb_id'])})''' for x in datas]) return f'''insert into robo_data_logger({','.join([x for x in __COLUMNS__.keys() if x != 'rb_id'])}) values {values}''' @write def insert(datas): datas = mapper_columns(datas=datas, columns=__COLUMNS__) return f''' insert into robo_data_logger({','.join([x for x in datas.keys()])}) values ({','.join([f"'{x[1]}'" for x in datas.items()])}) ''' def update(id, datas): return f''' update robo_data_logger set rdl_datas = '{json.dumps(datas, ensure_ascii=False)}' where rdl_id = {id} ''' @read(one=True) def get_one(date, risk: PortfoliosRisk, type: LoggerType): return f''' select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_data_logger {where(rdl_date=date, rdl_risk=risk, rdl_type=type)} ''' @read(one=True) def get_last_one(max_date=None, risk: PortfoliosRisk = None, type: LoggerType = None, like_type=False): sqls = [] if max_date: sqls.append(f"rdl_date <= '{format_date(max_date)}'") if like_type and type: sqls.append(f"rdl_type like '{type.value}%'") return f''' select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_data_logger {where(*sqls, rdl_risk=risk, rdl_type=type if not like_type else None)} order by rdl_date, rdl_id desc limit 1 ''' @read def get_list(min_date=None, max_date=None, risk: PortfoliosRisk = None, type: LoggerType = None, like_type=False): sqls = [] if max_date: sqls.append(f"rdl_date <= '{format_date(max_date)}'") if min_date: sqls.append(f"rdl_date >= '{format_date(min_date)}'") if like_type and type: sqls.append(f"rdl_type like '{type.value}%'") return f''' select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_data_logger {where(*sqls, rdl_risk=risk, rdl_type=type if not like_type else None)} order by rdl_risk, rdl_date, rdl_id ''' @write def delete(min_date=None, risk: PortfoliosRisk = None, type: LoggerType = None, like_type=False): sqls = [] if min_date: sqls.append(f"rdl_date >= '{format_date(min_date)}'") if like_type and type: sqls.append(f"rdl_type like '{type.value}%'") delete_where = where(*sqls, rdl_risk=risk, rdl_type=type if not like_type else None) if delete_where: return f"delete from robo_data_logger {delete_where}" else: return "truncate table robo_data_logger"