robo_rebalance_signal.py 2.29 KB
from framework import read, write, where, format_date, mapper_columns
from api import SignalType, PortfoliosRisk
import json
from datetime import datetime
from enum import Enum

__COLUMNS__ = {
    'rrs_id': 'id',
    'rrs_date': 'date',
    'rrs_type': 'type',
    'rrs_risk': 'risk',
    'rrs_p_type': 'portfolio_type',
    'rrs_p_weight': 'portfolio',
    'rrs_effective': 'effective',
}


@read(one=True)
def get_by_id(id):
    return f'''
    select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal {where(rrs_id=id)}'''


@read(one=True)
def get_one(type: SignalType, risk: PortfoliosRisk, date):
    return f'''
    select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
    {where(rrs_date=date, rrs_type=type, rrs_risk=risk)}
    '''


@read(one=True)
def get_first_after(type: SignalType, risk: PortfoliosRisk, min_date, effective=None):
    return f'''
    select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
    {where(f"rrs_date >= {format_date(min_date)}", rrs_type=type, rrs_risk=risk, rrs_effective=effective)} order by rrs_date limit 1
    '''


@read(one=True)
def get_last_one(max_date, risk: PortfoliosRisk, type: SignalType = None, effective=None):
    return f'''
    select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
    {where(f"rrs_date <= '{format_date(max_date)}'", rrs_type=type, rrs_risk=risk, rrs_effective=effective)} order by rrs_date desc limit 1
    '''


def get_count(risk: PortfoliosRisk = None, day=None, effective=None):
    @read(one=True)
    def exec():
        return f"select count(*) as `count` from robo_rebalance_signal {where(rrs_risk=risk, rrs_date=day, rrs_effective=effective)}"
    result = exec()
    return result['count']


@write
def insert(datas):
    datas = mapper_columns(datas=datas, columns=__COLUMNS__)
    return f'''
    insert into robo_rebalance_signal({','.join([x for x in datas.keys()])})
    values ({','.join([f"'{x[1]}'" for x in datas.items()])})
    '''


@write
def update(id, datas):
    datas = mapper_columns(datas=datas, columns=__COLUMNS__)
    return f'''
    update robo_rebalance_signal 
    set {','.join([f"{x[0]} = '{x[1]}'" for x in datas.items()])}
    where rrs_id = {id}
    '''