import logging import sys from concurrent.futures import wait from datetime import datetime as dt from typing import List import pandas as pd from py_jftech import ( component, autowired, get_config, filter_weekend, asynchronized, parse_date, workday_range, is_workday, prev_workday, format_date ) from api import ( RoboExecutor, Datum, AssetPool, PortfoliosBuilder, PortfoliosRisk, PortfoliosHolder, DataSync, RoboExportor, BacktestStep, RebalanceSignal ) logger = logging.getLogger(__name__) @component(bean_name='backtest') class BacktestExecutor(RoboExecutor): @autowired def __init__(self, datum: Datum = None, pool: AssetPool = None, syncs: List[DataSync] = None, export: RoboExportor = None, builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None, signal: RebalanceSignal = None): self._datum = datum self._pool = pool self._builder = builder self._hold = hold self._syncs = syncs self._export = export self._signal = signal self._config = get_config(__name__)['backtest'] @staticmethod def get_last_business_day(start_date, end_date): transfer_date = get_config('portfolios')['holder']['warehouse-transfer-date'] # 生成日期范围并转换为DataFrame dates = pd.date_range(start_date, end_date, freq='MS', closed='right') dates = [pd.to_datetime(f"{date.year}-{date.month}-{transfer_date}") for date in dates] dates.insert(0, start_date) df = pd.DataFrame({'dates': dates}) df['dates'] = df['dates'].apply(lambda x: prev_workday(x)) result = [] for i in range(0, len(df), get_config('portfolios')['holder']['warehouse-frequency']): result.append(df.iloc[i]['dates']) delta = workday_range(result[0], result[1]) period = get_config(__name__)['backtest']['sealing-period'] if len(delta) <= period: result.pop(1) return result @property def start_date(self): return pd.to_datetime(filter_weekend(self._config['start-date'])) @property def start_step(self) -> BacktestStep: return BacktestStep(self._config['start-step']) @property def end_step(self) -> BacktestStep: return BacktestStep(self._config['end-step']) if 'end-step' in self._config else BacktestStep.HOLD_PORTFOLIO @property def is_sync_data(self): return get_config(__name__)['sync-data'] @property def end_date(self): return pd.to_datetime(self._config['end-date']) @property def is_clean_up(self): return self._config['clean-up'] if 'clean-up' in self._config else True def clear_datas(self): if self.start_step.within(BacktestStep.ASSET_POOL) and self.end_step.without(BacktestStep.ASSET_POOL): logger.info('start to clear asset pool'.center(50, '-')) self._pool.clear() if self.start_step.within(BacktestStep.NORMAL_PORTFOLIO) and self.end_step.without( BacktestStep.NORMAL_PORTFOLIO): logger.info('start to clear normal portfolios'.center(50, '-')) self._builder.clear() self._signal.clear() if self.start_step.within(BacktestStep.HOLD_PORTFOLIO) and self.end_step.without(BacktestStep.HOLD_PORTFOLIO): logger.info('start to clear hold portfolios'.center(50, '-')) self._hold.clear() def start_exec(self): if self.is_sync_data: for sync in self._syncs: sync.do_sync() if self.is_clean_up: self.clear_datas() if self.start_step.within(BacktestStep.ASSET_POOL) and self.end_step.without(BacktestStep.ASSET_POOL): logger.info("start to build asset pool".center(50, '-')) now = dt.now() workdays = self.get_last_business_day(self.start_date, self.end_date) for date in workdays: self._pool.get_pool(date) logger.info(f"build asset pool success, use[{(dt.now() - now).seconds}s]") if self.start_step.within(BacktestStep.NORMAL_PORTFOLIO) and self.end_step.without( BacktestStep.NORMAL_PORTFOLIO): logger.info("start to build normal portfolios".center(50, '-')) now = dt.now() wait([self.async_build_portfolios(day, risk) for risk in PortfoliosRisk for day in self.get_last_business_day(self.start_date, self.end_date)]) logger.info(f"build normal portfolios success, use[{(dt.now() - now).seconds}s]") if self.start_step.within(BacktestStep.HOLD_PORTFOLIO) and self.end_step.without(BacktestStep.HOLD_PORTFOLIO): logger.info("start to build hold portfolios".center(50, '-')) now = dt.now() wait([self.async_build_hold(x) for x in PortfoliosRisk]) logger.info(f"build hold portfolios success, use[{(dt.now() - now).seconds}s]") logger.info("start to export report".center(50, '-')) now = dt.now() self._export.export(max_date=self.end_date, min_date=self.start_date) logger.info(f"report file exported successfully. use[{(dt.now() - now).seconds}s].") @asynchronized(isolate=True) def async_build_risk_date(self, asset_id): self._risk.build_risk_date(asset_id, self.end_date) @asynchronized(isolate=True) def async_build_portfolios(self, day, risk: PortfoliosRisk): self._builder.get_portfolios(day, risk) @asynchronized(isolate=True) def async_build_hold(self, risk: PortfoliosRisk): self._hold.build_hold_portfolio(day=self.end_date, risk=risk) @component(bean_name='real') class RealExecutor(RoboExecutor): @autowired(names={'daily_export': 'daily-real-export', 'monitor_export': 'daily-monitor-export'}) def __init__(self, builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None, syncs: List[DataSync] = None, daily_export: RoboExportor = None, monitor_export: RoboExportor = None, pool: AssetPool = None, signal: RebalanceSignal = None): self._builder = builder self._pool = pool self._hold = hold self._syncs = syncs self._daily_export = daily_export self._monitor_export = monitor_export self._config = get_config(__name__)['real'] self._signal = signal @property def start_date(self): return pd.to_datetime(filter_weekend(self._config['start-date'])) @property def is_sync_data(self): return get_config(__name__)['sync-data'] @property def curt_date(self): if len(sys.argv) > 1: try: return parse_date(sys.argv[1]) except Exception as e: logger.warning(f'get curt date from argv failure.', e) return dt.combine(dt.today().date(), dt.min.time()) @property def include_date(self): return [dt.combine(x, dt.min.time()) for x in self._config['include-date']] @property def export(self): return self._config['export'] if 'export' in self._config else False def start_exec(self): if self.is_sync_data: for sync in self._syncs: sync.do_sync() date = self.curt_date if is_workday(date) or date in self.include_date: date = prev_workday(filter_weekend(date)) for risk in PortfoliosRisk: logger.info(f"start to build risk[{risk.name}] real for date[{format_date(date)}]".center(50, '-')) now = dt.now() # 因为每天都必须有NORMAL最优投组,不管用不用 self._builder.get_portfolios(date, risk) self._signal.get_signal(date, risk) # 更新持仓 self._hold.build_hold_portfolio(date, risk) logger.info( f"build risk[{risk.name}] real for date[{format_date(date)}] success, use[{(dt.now() - now).seconds}s]") if self.export: now = dt.now() # 每日实盘报告 self._daily_export.export(max_date=date) # 每日监测报告 self._monitor_export.export(max_date=date) logger.info( f'export email for date[{format_date(date)}] send success, use[{(dt.now() - now).seconds}s]') else: logger.info(f'today[{format_date(date)}] is a rest day, do not execute the daily real robo.')