import logging import os import sys from concurrent.futures import wait from datetime import datetime as dt from enum import Enum, unique from typing import List import pandas as pd from py_jftech import ( component, autowired, get_config, filter_weekend, asynchronized, workday_range, format_date, prev_workday, parse_date ) from api import ( RoboExecutor, AssetRisk, Datum, AssetPool, PortfoliosBuilder, DatumType, RoboExportor, PortfoliosRisk, PortfoliosHolder, PortfoliosType, RebalanceRuler, DataSync ) logger = logging.getLogger(__name__) @unique class BacktestStep(Enum): EWMA_VALUE = 1 ASSET_POOL = 2 NORMAL_PORTFOLIO = 3 HOLD_PORTFOLIO = 4 def within(self, step: Enum): return self.value <= step.value def without(self, step: Enum): return self.value >= step.value @component(bean_name='backtest') class BacktestExecutor(RoboExecutor): @autowired(names={'export': 'backtest-export'}) def __init__(self, risk: AssetRisk = None, datum: Datum = None, pool: AssetPool = None, syncs: List[DataSync] = None, builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None, rule: RebalanceRuler = None, export: RoboExportor = None): self._risk = risk self._datum = datum self._pool = pool self._builder = builder self._hold = hold self._rule = rule self._syncs = syncs self._export = export self._config = get_config(__name__)['backtest'] @property def start_date(self): return pd.to_datetime(filter_weekend(self._config['start-date'])) @property def start_step(self) -> BacktestStep: return BacktestStep(self._config['start-step']) @property def end_step(self) -> BacktestStep: return BacktestStep(self._config['end-step']) if 'end-step' in self._config else BacktestStep.HOLD_PORTFOLIO @property def is_sync_data(self): return get_config(__name__)['sync-data'] @property def end_date(self): return pd.to_datetime(self._config['end-date']) @property def is_clean_up(self): return self._config['clean-up'] if 'clean-up' in self._config else True def clear_datas(self): if self.start_step.within(BacktestStep.EWMA_VALUE) and self.end_step.without(BacktestStep.EWMA_VALUE): logger.info('start to clear fund ewma value'.center(50, '-')) self._risk.clear() if self.start_step.within(BacktestStep.ASSET_POOL) and self.end_step.without(BacktestStep.ASSET_POOL): logger.info('start to clear asset pool'.center(50, '-')) self._pool.clear() if self.start_step.within(BacktestStep.NORMAL_PORTFOLIO) and self.end_step.without(BacktestStep.NORMAL_PORTFOLIO): logger.info('start to clear normal portfolios'.center(50, '-')) self._builder.clear() if self.start_step.within(BacktestStep.HOLD_PORTFOLIO) and self.end_step.without(BacktestStep.HOLD_PORTFOLIO): logger.info('start to clear hold portfolios'.center(50, '-')) self._hold.clear() self._rule.clear_signal() def start_exec(self): if self.is_sync_data: for sync in self._syncs: sync.do_sync() if self.is_clean_up: self.clear_datas() if self.start_step.within(BacktestStep.EWMA_VALUE) and self.end_step.without(BacktestStep.EWMA_VALUE): logger.info("start to build fund ewma value.".center(50, '-')) now = dt.now() wait([self.async_build_risk_date(x['id']) for x in self._datum.get_datums(type=DatumType.FUND, risk=(3, 4, 5))]) logger.info(f"build fund ewma value success, use[{(dt.now() - now).seconds}s]") if self.start_step.within(BacktestStep.ASSET_POOL) and self.end_step.without(BacktestStep.ASSET_POOL): logger.info("start to build asset pool".center(50, '-')) now = dt.now() workdays = workday_range(self.start_date, self.end_date) for date in workdays: self._risk.get_risk_pool(date) for date in workdays: self._pool.get_pool(date) logger.info(f"build asset pool success, use[{(dt.now() - now).seconds}s]") if self.start_step.within(BacktestStep.NORMAL_PORTFOLIO) and self.end_step.without(BacktestStep.NORMAL_PORTFOLIO): logger.info("start to build normal portfolios".center(50, '-')) now = dt.now() wait([self.async_build_portfolios(day, risk) for risk in PortfoliosRisk for day in workday_range(self.start_date, self.end_date)]) logger.info(f"build normal portfolios success, use[{(dt.now() - now).seconds}s]") if self.start_step.within(BacktestStep.HOLD_PORTFOLIO) and self.end_step.without(BacktestStep.HOLD_PORTFOLIO): logger.info("start to build hold portfolios".center(50, '-')) now = dt.now() wait([self.async_build_hold(x) for x in PortfoliosRisk]) logger.info(f"build hold portfolios success, use[{(dt.now() - now).seconds}s]") logger.info("start to export report".center(50, '-')) now = dt.now() file = self._export.export(max_date=self.end_date, min_date=self.start_date) logger.info(f"report file[{os.path.basename(file)}] exported successfully. use[{(dt.now() - now).seconds}s].") @asynchronized(isolate=True) def async_build_risk_date(self, asset_id): self._risk.build_risk_date(asset_id, self.end_date) @asynchronized(isolate=True) def async_build_portfolios(self, day, risk: PortfoliosRisk): self._builder.get_portfolios(day, risk) @asynchronized(isolate=True) def async_build_hold(self, risk: PortfoliosRisk): self._hold.build_hold_portfolio(day=self.end_date, risk=risk) @component(bean_name='real') class RealExecutor(RoboExecutor): @autowired def __init__(self, builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None, syncs: List[DataSync] = None, ): self._builder = builder self._hold = hold self._syncs = syncs self._config = get_config(__name__)['real'] @property def start_date(self): return pd.to_datetime(filter_weekend(self._config['start-date'])) @property def is_sync_data(self): return get_config(__name__)['sync-data'] @property def curt_date(self): if len(sys.argv) > 1: try: return parse_date(sys.argv[1]) except Exception as e: logger.warning(f'get curt date from argv failure.', e) return dt.combine(dt.today().date(), dt.min.time()) def start_exec(self): if self.is_sync_data: for sync in self._syncs: sync.do_sync() date = self.curt_date for risk in PortfoliosRisk: logger.info(f"start to build risk[{risk.name}] real for date[{format_date(date)}]".center(50, '-')) now = dt.now() # 因为每天都必须有NORMAL最优投组,不管用不用 self._builder.get_portfolios(prev_workday(date), risk) self._hold.build_hold_portfolio(date, risk) # 如果当前持仓为风控投组,则还要计算风控投组,不管用不用 p_type = self._hold.get_portfolio_type(date, risk) if p_type is not PortfoliosType.NORMAL: self._builder.get_portfolios(prev_workday(date), p_type) logger.info(f"build risk[{risk.name}] real for date[{format_date(date)}] success, use[{(dt.now() - now).seconds}s]")