import sys import time from datetime import datetime as dt from enum import Enum, unique import pandas as pd from api import RoboExecutor, AssetRisk, Datum, AssetPool, PortfoliosBuilder, PortfoliosRisk, PortfoliosHolder, PortfoliosType from framework import ( component, autowired, block_execute, get_config, get_logger, filter_weekend, workday_range, format_date, prev_workday, parse_date ) logger = get_logger(__name__) @unique class BacktestStep(Enum): EWMA_VALUE = 1 ASSET_POOL = 2 NORMAL_PORTFOLIO = 3 HOLD_PORTFOLIO = 4 def within(self, step: Enum): return self.value <= step.value @component(bean_name='backtest') class BacktestExector(RoboExecutor): @autowired def __init__(self, risk: AssetRisk = None, datum: Datum = None, pool: AssetPool = None, builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None): self._risk = risk self._datum = datum self._pool = pool self._builder = builder self._hold = hold self._config = get_config(__name__)['backtest'] @property def start_date(self): return pd.to_datetime(filter_weekend(self._config['start-date'])) @property def start_step(self) -> BacktestStep: return BacktestStep(self._config['start-step']) @property def end_date(self): return pd.to_datetime(self._config['end-date']) def clear_datas(self): if self.start_step.within(BacktestStep.EWMA_VALUE): logger.info('start to clear fund ewma value'.center(50, '-')) self._risk.clear() if self.start_step.within(BacktestStep.ASSET_POOL): logger.info('start to clear asset pool'.center(50, '-')) self._pool.clear() if self.start_step.within(BacktestStep.NORMAL_PORTFOLIO): logger.info('start to clear normal portfolios'.center(50, '-')) self._builder.clear() if self.start_step.within(BacktestStep.HOLD_PORTFOLIO): logger.info('start to clear hold portfolios'.center(50, '-')) self._hold.clear() def start_exec(self): self.clear_datas() if self.start_step.within(BacktestStep.EWMA_VALUE): logger.info("start to build fund ewma value.".center(50, '-')) now = dt.now() block_execute(self._risk.build_risk_date, {x['id']: (x['id'], self.end_date) for x in self._datum.get_fund_datums(risk=(3, 4, 5))}, isolate=True, result=False) logger.info(f"build fund ewma value success, use[{(dt.now() - now).seconds}s]") if self.start_step.within(BacktestStep.ASSET_POOL): logger.info("start to build asset pool".center(50, '-')) now = dt.now() workdays = workday_range(self.start_date, self.end_date) for date in workdays: self._risk.get_risk_pool(date) time.sleep(0.05) # 这里需要sleep,否则里面多进程太快,数据库连接容易超时 for date in workdays: self._pool.get_pool(date) logger.info(f"build asset pool success, use[{(dt.now() - now).seconds}s]") if self.start_step.within(BacktestStep.NORMAL_PORTFOLIO): logger.info("start to build normal portfolios".center(50, '-')) now = dt.now() block_execute(self._builder.get_portfolios, {f'{x.name}_{format_date(j)}': (j, x) for x in PortfoliosRisk for j in workday_range(self.start_date, self.end_date)}, isolate=True, result=False) logger.info(f"build normal portfolios success, use[{(dt.now() - now).seconds}s]") if self.start_step.within(BacktestStep.HOLD_PORTFOLIO): logger.info("start to build hold portfolios".center(50, '-')) now = dt.now() block_execute(self._hold.build_hold_portfolio, {x: (self.end_date, x) for x in PortfoliosRisk}, isolate=True, result=False) logger.info(f"build hold portfolios success, use[{(dt.now() - now).seconds}s]") @component(bean_name='real') class RealExecutor(RoboExecutor): @autowired def __init__(self, builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None): self._builder = builder self._hold = hold self._config = get_config(__name__)['real'] @property def start_date(self): return pd.to_datetime(filter_weekend(self._config['start-date'])) @property def curt_date(self): if len(sys.argv) > 1: try: return parse_date(sys.argv[1]) except Exception as e: logger.warning(f'get curt date from argv failure.', e) return dt.combine(dt.today().date(), dt.min.time()) def start_exec(self): date = self.curt_date for risk in PortfoliosRisk: logger.info(f"start to build risk[{risk.name}] real for date[{format_date(date)}]".center(50, '-')) now = dt.now() self._builder.get_portfolios(prev_workday(date), risk) self._hold.build_hold_portfolio(date, risk) p_type = self._hold.get_portfolio_type(date, risk) if p_type is not PortfoliosType.NORMAL: self._builder.get_portfolios(prev_workday(date), p_type) logger.info(f"build risk[{risk.name}] real for date[{format_date(date)}] success, use[{(dt.now() - now).seconds}s]")