from py_jftech import read, write, where, mapper_columns, format_date,to_tuple __COLUMNS__ = { 'red_eco_id': 'eco_id', 'red_date': 'date', 'red_indicator': 'indicator', 'red_release_date': 'release_date', } @write def batch_insert(datas): datas = [mapper_columns(datas=x, columns=__COLUMNS__, ignore_none=False) for x in datas] values = ','.join([f'''({','.join([(f"'{x[j]}'" if j in x and x[j] is not None else 'null') for j in __COLUMNS__.keys()])})''' for x in datas]) return f'''insert into robo_eco_datas({','.join(__COLUMNS__.keys())}) values {values}''' @read def get_list(eco_ids=None, min_date=None, max_date=None): sqls = [] if min_date: sqls.append(f"red_date >= '{format_date(min_date)}'") if max_date: sqls.append(f"red_date <= '{format_date(max_date)}'") return f''' select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_eco_datas {where(*sqls, red_eco_id=to_tuple(eco_ids))} order by red_eco_id, red_date ''' @read(one=True) def get_last_one(eco_id, max_date=None, by_release_date=False): date_field = 'red_release_date' if by_release_date else 'red_date' sql = f"{date_field} <= '{format_date(max_date)}'" if max_date else None return f''' select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_eco_datas {where(sql, red_eco_id=eco_id)} order by red_date desc limit 1 ''' @read def get_last(eco_id, max_date=None, count=1, by_release_date=False): date_field = 'red_release_date' if by_release_date else 'red_date' sql = f"{date_field} <= '{format_date(max_date)}'" if max_date else None return f''' select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_eco_datas {where(sql, red_eco_id=eco_id)} order by red_date desc limit {count} ''' @read(one=True) def get_one(eco_id, date): return f''' select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_eco_datas {where(red_eco_id=eco_id, red_date=date)} '''