from py_jftech import read, write, format_date, to_tuple, where, mapper_columns __COLUMNS__ = { 'rid_index_id': 'index_id', 'rid_date': 'date', 'rid_high': 'high', 'rid_open': 'open', 'rid_low': 'low', 'rid_close': 'close', 'rid_pe': 'pe', 'rid_pb': 'pb', 'rid_volume': 'volume', } @write def batch_insert(datas): datas = [mapper_columns(x, __COLUMNS__, ignore_none=False) for x in datas] values = ','.join([f'''({','.join([(f"'{x[j]}'" if j in x and x[j] is not None else 'null') for j in __COLUMNS__.keys()])})''' for x in datas]) return f'''insert into robo_index_datas({','.join(__COLUMNS__.keys())}) values {values}''' @read def get_list(index_ids=None, min_date=None, max_date=None): sqls = [] if min_date: sqls.append(f"rid_date >= '{format_date(min_date)}'") if max_date: sqls.append(f"rid_date <= '{format_date(max_date)}'") return f''' select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_index_datas {where(*sqls, rid_index_id=to_tuple(index_ids))} order by rid_index_id, rid_date ''' @read(one=True) def get_last_one(index_id, max_date=None): sql = f"rid_date <= '{format_date(max_date)}'" if max_date else None return f''' select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_index_datas {where(sql, rid_index_id=index_id)} order by rid_date desc limit 1 ''' @read def get_last(index_id, max_date=None, count=1): sql = f"rid_date <= '{format_date(max_date)}'" if max_date else None return f''' select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_index_datas {where(sql, rid_index_id=index_id)} order by rid_date desc limit {count} ''' @read(one=True) def get_one(index_id, date): return f''' select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_index_datas {where(rid_index_id=index_id, rid_date=date)} '''