import logging import os import sys from concurrent.futures import wait from datetime import datetime as dt from enum import Enum, unique from typing import List import pandas as pd from py_jftech import ( component, autowired, get_config, filter_weekend, asynchronized, workday_range, format_date, prev_workday, parse_date ) from api import ( RoboExecutor, AssetRisk, Datum, AssetPool, PortfoliosBuilder, DatumType, RoboExportor, PortfoliosRisk, PortfoliosHolder, PortfoliosType, RebalanceRuler, DataSync ) logger = logging.getLogger(__name__) @unique class BacktestStep(Enum): EWMA_VALUE = 1 ASSET_POOL = 2 NORMAL_PORTFOLIO = 3 HOLD_PORTFOLIO = 4 def within(self, step: Enum): return self.value <= step.value @component(bean_name='backtest') class BacktestExecutor(RoboExecutor): @autowired(names={'export': 'backtest-export'}) def __init__(self, risk: AssetRisk = None, datum: Datum = None, pool: AssetPool = None, syncs: List[DataSync] = None, builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None, rule: RebalanceRuler = None, export: RoboExportor = None): self._risk = risk self._datum = datum self._pool = pool self._builder = builder self._hold = hold self._rule = rule self._syncs = syncs self._export = export self._config = get_config(__name__)['backtest'] @property def start_date(self): return pd.to_datetime(filter_weekend(self._config['start-date'])) @property def start_step(self) -> BacktestStep: return BacktestStep(self._config['start-step']) @property def is_sync_data(self): return get_config(__name__)['sync-data'] @property def end_date(self): return pd.to_datetime(self._config['end-date']) @property def is_clean_up(self): return self._config['clean-up'] if 'clean-up' in self._config else True def clear_datas(self): if self.start_step.within(BacktestStep.EWMA_VALUE): logger.info('start to clear fund ewma value'.center(50, '-')) self._risk.clear() if self.start_step.within(BacktestStep.ASSET_POOL): logger.info('start to clear asset pool'.center(50, '-')) self._pool.clear() if self.start_step.within(BacktestStep.NORMAL_PORTFOLIO): logger.info('start to clear normal portfolios'.center(50, '-')) self._builder.clear() if self.start_step.within(BacktestStep.HOLD_PORTFOLIO): logger.info('start to clear hold portfolios'.center(50, '-')) self._hold.clear() self._rule.clear_signal() def start_exec(self): if self.is_sync_data: for sync in self._syncs: sync.do_sync() if self.is_clean_up: self.clear_datas() if self.start_step.within(BacktestStep.EWMA_VALUE): logger.info("start to build fund ewma value.".center(50, '-')) now = dt.now() wait([self.async_build_risk_date(x['id']) for x in self._datum.get_datums(type=DatumType.FUND, risk=(3, 4, 5))]) logger.info(f"build fund ewma value success, use[{(dt.now() - now).seconds}s]") if self.start_step.within(BacktestStep.ASSET_POOL): logger.info("start to build asset pool".center(50, '-')) now = dt.now() workdays = workday_range(self.start_date, self.end_date) for date in workdays: self._risk.get_risk_pool(date) # time.sleep(0.05) # 这里需要sleep,否则里面多进程太快,数据库连接容易超时 for date in workdays: self._pool.get_pool(date) logger.info(f"build asset pool success, use[{(dt.now() - now).seconds}s]") if self.start_step.within(BacktestStep.NORMAL_PORTFOLIO): logger.info("start to build normal portfolios".center(50, '-')) now = dt.now() wait([self.async_build_portfolios(day, risk) for risk in PortfoliosRisk for day in workday_range(self.start_date, self.end_date)]) logger.info(f"build normal portfolios success, use[{(dt.now() - now).seconds}s]") if self.start_step.within(BacktestStep.HOLD_PORTFOLIO): logger.info("start to build hold portfolios".center(50, '-')) now = dt.now() wait([self.async_build_hold(x) for x in PortfoliosRisk]) logger.info(f"build hold portfolios success, use[{(dt.now() - now).seconds}s]") logger.info("start to export report".center(50, '-')) now = dt.now() file = self._export.export(max_date=self.end_date) logger.info(f"report file[{os.path.basename(file)}] exported successfully. use[{(dt.now() - now).seconds}s].") @asynchronized(isolate=True) def async_build_risk_date(self, asset_id): self._risk.build_risk_date(asset_id, self.end_date) @asynchronized(isolate=True) def async_build_portfolios(self, day, risk: PortfoliosRisk): self._builder.get_portfolios(day, risk) @asynchronized(isolate=True) def async_build_hold(self, risk: PortfoliosRisk): self._hold.build_hold_portfolio(day=self.end_date, risk=risk) @component(bean_name='real') class RealExecutor(RoboExecutor): @autowired def __init__(self, builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None, syncs: List[DataSync] = None, ): self._builder = builder self._hold = hold self._syncs = syncs self._config = get_config(__name__)['real'] @property def start_date(self): return pd.to_datetime(filter_weekend(self._config['start-date'])) @property def is_sync_data(self): return get_config(__name__)['sync-data'] @property def curt_date(self): if len(sys.argv) > 1: try: return parse_date(sys.argv[1]) except Exception as e: logger.warning(f'get curt date from argv failure.', e) return dt.combine(dt.today().date(), dt.min.time()) def start_exec(self): if self.is_sync_data: for sync in self._syncs: sync.do_sync() date = self.curt_date for risk in PortfoliosRisk: logger.info(f"start to build risk[{risk.name}] real for date[{format_date(date)}]".center(50, '-')) now = dt.now() # 因为每天都必须有NORMAL最优投组,不管用不用 self._builder.get_portfolios(prev_workday(date), risk) self._hold.build_hold_portfolio(date, risk) # 如果当前持仓为风控投组,则还要计算风控投组,不管用不用 p_type = self._hold.get_portfolio_type(date, risk) if p_type is not PortfoliosType.NORMAL: self._builder.get_portfolios(prev_workday(date), p_type) logger.info(f"build risk[{risk.name}] real for date[{format_date(date)}] success, use[{(dt.now() - now).seconds}s]")