Commit 16c1760c authored by jichao's avatar jichao

但任务回测完成

parent 3bcbe8b1
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import datetime as dt
from enum import Enum, unique from enum import Enum, unique
from framework import get_config
@unique @unique
class DatumType(Enum): class DatumType(Enum):
...@@ -208,6 +211,15 @@ class AssetRisk(ABC): ...@@ -208,6 +211,15 @@ class AssetRisk(ABC):
''' '''
pass pass
@abstractmethod
def build_risk_date(self, asset_id, day=dt.today()):
'''
构建指定资产的所有风险时间点
:param asset_id: 指定的资产id
:param day: 构建的截止日期
'''
pass
class AssetPool(ABC): class AssetPool(ABC):
''' '''
...@@ -437,3 +449,25 @@ class RebalanceRuler(ABC): ...@@ -437,3 +449,25 @@ class RebalanceRuler(ABC):
:param sign_id: 信号ID :param sign_id: 信号ID
''' '''
pass pass
class RoboExecutor(ABC):
'''
ROBO执行器,整合以上逻辑,进行实盘或回测
'''
@abstractmethod
def start_exec(self):
'''
开始执行测试逻辑
'''
pass
@property
@abstractmethod
def start_date(self):
pass
@staticmethod
def use_name():
return get_config('robo-executor')['use']
...@@ -5,18 +5,13 @@ import pandas as pd ...@@ -5,18 +5,13 @@ import pandas as pd
from dateutil.relativedelta import relativedelta from dateutil.relativedelta import relativedelta
from scipy.stats import norm from scipy.stats import norm
from api import AssetRisk, Navs, AssetRiskDateType as DateType, Datum, AssetPoolType from api import AssetRisk, Navs, AssetRiskDateType as DateType, Datum, AssetPoolType, RoboExecutor
from asset_pool.dao import asset_risk_dates as ard, asset_ewma_value as aev, robo_assets_pool as rap from asset_pool.dao import asset_risk_dates as ard, asset_ewma_value as aev, robo_assets_pool as rap
from framework import component, autowired, get_config, format_date, block_execute, get_logger from framework import component, autowired, get_config, format_date, block_execute, get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
def get_risk_start_date():
config = get_config("main")
return config['start-date'] - relativedelta(months=3)
@component @component
class CvarEwmaAssetRisk(AssetRisk): class CvarEwmaAssetRisk(AssetRisk):
''' '''
...@@ -24,12 +19,17 @@ class CvarEwmaAssetRisk(AssetRisk): ...@@ -24,12 +19,17 @@ class CvarEwmaAssetRisk(AssetRisk):
EWMA方式决定风控结束:风控结束后,就可以找到风控期的最低点日期,该日期作为下一轮cvar计算的起始日期 EWMA方式决定风控结束:风控结束后,就可以找到风控期的最低点日期,该日期作为下一轮cvar计算的起始日期
''' '''
@autowired @autowired(names={'executor': RoboExecutor.use_name()})
def __init__(self, navs: Navs = None, datum: Datum = None): def __init__(self, navs: Navs = None, datum: Datum = None, executor: RoboExecutor = None):
self._navs = navs self._navs = navs
self._datum = datum self._datum = datum
self._executor = executor
self._config = get_config(__name__) self._config = get_config(__name__)
@property
def risk_start_date(self):
return self._executor.start_date - relativedelta(months=self._config['advance-months'])
def get_risk_pool(self, day): def get_risk_pool(self, day):
asset_pool = rap.get_one(day, AssetPoolType.RISK) asset_pool = rap.get_one(day, AssetPoolType.RISK)
if not asset_pool: if not asset_pool:
...@@ -61,7 +61,7 @@ class CvarEwmaAssetRisk(AssetRisk): ...@@ -61,7 +61,7 @@ class CvarEwmaAssetRisk(AssetRisk):
def get_next_date(self, asset_id, day=dt.today()): def get_next_date(self, asset_id, day=dt.today()):
last = ard.get_last_one(asset_id, day) last = ard.get_last_one(asset_id, day)
if not last or DateType(last['type']) is DateType.START_DATE: if not last or DateType(last['type']) is DateType.START_DATE:
start_date = last['date'] if last else get_risk_start_date() start_date = last['date'] if last else self.risk_start_date
ewma = pd.DataFrame(self.get_ewma_value(asset_id, min_date=start_date, max_date=day)) ewma = pd.DataFrame(self.get_ewma_value(asset_id, min_date=start_date, max_date=day))
total = self._config['ewma']['condition-total'] total = self._config['ewma']['condition-total']
meet = self._config['ewma']['condition-meet'] meet = self._config['ewma']['condition-meet']
...@@ -76,7 +76,7 @@ class CvarEwmaAssetRisk(AssetRisk): ...@@ -76,7 +76,7 @@ class CvarEwmaAssetRisk(AssetRisk):
return {'date': stop_date, 'type': DateType.STOP_DATE} return {'date': stop_date, 'type': DateType.STOP_DATE}
elif DateType(last['type']) is DateType.STOP_DATE: elif DateType(last['type']) is DateType.STOP_DATE:
last_start = ard.get_last_one(asset_id, last['date'], type=DateType.START_DATE) last_start = ard.get_last_one(asset_id, last['date'], type=DateType.START_DATE)
start_date = last_start['date'] if last_start else get_risk_start_date() start_date = last_start['date'] if last_start else self.risk_start_date
rtns = pd.DataFrame(self.get_income_return(asset_id, min_date=start_date, max_date=day)) rtns = pd.DataFrame(self.get_income_return(asset_id, min_date=start_date, max_date=day))
risk_rtns = rtns[rtns.date <= last['date']] risk_rtns = rtns[rtns.date <= last['date']]
cvar_start_date = risk_rtns.loc[risk_rtns.rtn.idxmin()].date cvar_start_date = risk_rtns.loc[risk_rtns.rtn.idxmin()].date
...@@ -98,8 +98,8 @@ class CvarEwmaAssetRisk(AssetRisk): ...@@ -98,8 +98,8 @@ class CvarEwmaAssetRisk(AssetRisk):
return {'date': row['date'], 'type': DateType.START_DATE} return {'date': row['date'], 'type': DateType.START_DATE}
return None return None
def get_ewma_value(self, id, min_date=get_risk_start_date(), max_date=None): def get_ewma_value(self, id, min_date=None, max_date=None):
rtn = pd.DataFrame(self.get_income_return(id, min_date=min_date, max_date=max_date)) rtn = pd.DataFrame(self.get_income_return(id, min_date=min_date or self.risk_start_date, max_date=max_date))
if rtn.empty: if rtn.empty:
return [] return []
rtn.sort_values('date', inplace=True) rtn.sort_values('date', inplace=True)
......
framework: framework:
database: database:
host: ${MYSQL_HOST:127.0.0.1} host: ${MYSQL_HOST:localhost}
port: ${MYSQL_PORT:3306} port: ${MYSQL_PORT:3306}
user: ${MYSQL_USER:root} user: ${MYSQL_USER:root}
password: ${MYSQL_PWD:123456} password: ${MYSQL_PWD:123456}
...@@ -45,8 +45,6 @@ framework: ...@@ -45,8 +45,6 @@ framework:
root: root:
level: INFO level: INFO
handlers: [ console ] handlers: [ console ]
main:
start-date: 2008-01-02
basic: basic:
datum: datum:
excludes: excludes:
...@@ -164,6 +162,11 @@ rebalance: ...@@ -164,6 +162,11 @@ rebalance:
init-factor: 0.000000002 init-factor: 0.000000002
high-low-buy: high-low-buy:
threshold: [ 0.5, 0.8 ] threshold: [ 0.5, 0.8 ]
robo-executor:
use: ${ROBO_EXECUTOR:backtest}
backtest:
start-date: 2008-01-02
end-date: 2009-01-01
......
from concurrent.futures import ProcessPoolExecutor, as_completed from concurrent.futures import ProcessPoolExecutor, as_completed, wait
from framework.env_config import get_config from framework.env_config import get_config
...@@ -10,11 +10,11 @@ def create_process_pool(max_workers=None): ...@@ -10,11 +10,11 @@ def create_process_pool(max_workers=None):
return ProcessPoolExecutor(max_workers=max_workers or config['max-workers']) return ProcessPoolExecutor(max_workers=max_workers or config['max-workers'])
def block_execute(func, params: dict, isolate=False) -> dict: def block_execute(func, params: dict, isolate=False, result=True) -> dict:
if isolate: if isolate:
with create_process_pool() as process: with create_process_pool() as process:
futures = {process.submit(func, *x[1]): x[0] for x in params.items()} futures = {process.submit(func, *x[1]): x[0] for x in params.items()}
return {futures[x]: x.result() for x in as_completed(futures)} return {futures[x]: x.result() for x in as_completed(futures)} if result else wait(futures.keys())
else: else:
futures = {process_pool.submit(func, *x[1]): x[0] for x in params.items()} futures = {process_pool.submit(func, *x[1]): x[0] for x in params.items()}
return {futures[x]: x.result() for x in as_completed(futures)} return {futures[x]: x.result() for x in as_completed(futures)} if result else wait(futures.keys())
from framework import autowired, parse_date, get_logger from framework import autowired
from api import PortfoliosBuilder, PortfoliosRisk from api import RoboExecutor
logger = get_logger('main')
@autowired(names={'executor': RoboExecutor.use_name()})
@autowired(names={'builder': 'poem'}) def start(executor: RoboExecutor = None):
def start(builder: PortfoliosBuilder = None): executor.start_exec()
day = parse_date('2022-11-07')
logger.info(builder.get_portfolios(day, PortfoliosRisk.FT3))
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -18,28 +18,31 @@ class MptPortfoliosBuilder(PortfoliosBuilder): ...@@ -18,28 +18,31 @@ class MptPortfoliosBuilder(PortfoliosBuilder):
self._factory = factory self._factory = factory
def get_portfolios(self, day, risk: PortfoliosRisk, type: PortfoliosType = PortfoliosType.NORMAL): def get_portfolios(self, day, risk: PortfoliosRisk, type: PortfoliosType = PortfoliosType.NORMAL):
portfolio = rmp.get_one(day, type, risk) try:
if not portfolio:
result, detail = self.build_portfolio(day, type)
for build_risk, datas in result.items():
rmp.insert({
**datas,
'risk': build_risk,
'type': type,
'date': day
})
portfolio = rmp.get_one(day, type, risk) portfolio = rmp.get_one(day, type, risk)
if SolveType(portfolio['solve']) is not SolveType.INFEASIBLE: if not portfolio:
result = json.loads(portfolio['portfolio']) result, detail = self.build_portfolio(day, type)
return {int(x[0]): x[1] for x in result.items()} for build_risk, datas in result.items():
return None rmp.insert({
**datas,
'risk': build_risk,
'type': type,
'date': day
})
portfolio = rmp.get_one(day, type, risk)
if SolveType(portfolio['solve']) is not SolveType.INFEASIBLE:
result = json.loads(portfolio['portfolio'])
return {int(x[0]): x[1] for x in result.items()}
return None
except Exception as e:
logger.exception(f"build protfolio of type[{type.name}] and risk[{risk.name}] with date[{format_date(day)}] failure.", e)
def build_portfolio(self, day, type: PortfoliosType): def build_portfolio(self, day, type: PortfoliosType):
result = {} result = {}
detail = {} detail = {}
for risk in PortfoliosRisk: for risk in PortfoliosRisk:
logger.info( logger.info(
f"start build protfolio of type[{type.name}] and risk[{risk.name}] with date[{format_date(day)}]") f"start to build protfolio of type[{type.name}] and risk[{risk.name}] with date[{format_date(day)}]")
solver = self._factory.create_solver(risk, type) solver = self._factory.create_solver(risk, type)
solver.reset_navs(day) solver.reset_navs(day)
logger.debug({ logger.debug({
......
...@@ -2,7 +2,7 @@ import json ...@@ -2,7 +2,7 @@ import json
import pandas as pd import pandas as pd
from api import PortfoliosHolder, PortfoliosRisk, RebalanceRuler, Navs, SignalType from api import PortfoliosHolder, PortfoliosRisk, RebalanceRuler, Navs, SignalType, RoboExecutor
from framework import ( from framework import (
component, autowired, get_config, next_workday, filter_weekend, component, autowired, get_config, next_workday, filter_weekend,
prev_workday, transaction, workday_range, format_date, get_logger prev_workday, transaction, workday_range, format_date, get_logger
...@@ -13,18 +13,14 @@ from portfolios.utils import format_weight ...@@ -13,18 +13,14 @@ from portfolios.utils import format_weight
logger = get_logger(__name__) logger = get_logger(__name__)
def get_start_date():
config = get_config('main')
return pd.to_datetime(filter_weekend(config['start-date']))
@component(bean_name='next-re') @component(bean_name='next-re')
class NextReblanceHolder(PortfoliosHolder): class NextReblanceHolder(PortfoliosHolder):
@autowired @autowired(names={'executor': RoboExecutor.use_name()})
def __init__(self, rule: RebalanceRuler, navs: Navs = None): def __init__(self, rule: RebalanceRuler, navs: Navs = None, executor: RoboExecutor = None):
self._rule = rule self._rule = rule
self._navs = navs self._navs = navs
self._executor = executor
self._config = get_config(__name__) self._config = get_config(__name__)
def get_last_rebalance_date(self, risk: PortfoliosRisk, max_date=None, signal_id=None): def get_last_rebalance_date(self, risk: PortfoliosRisk, max_date=None, signal_id=None):
...@@ -44,24 +40,27 @@ class NextReblanceHolder(PortfoliosHolder): ...@@ -44,24 +40,27 @@ class NextReblanceHolder(PortfoliosHolder):
def build_hold_portfolio(self, day, risk: PortfoliosRisk): def build_hold_portfolio(self, day, risk: PortfoliosRisk):
last_nav = rhp.get_last_one(max_date=day, risk=risk) last_nav = rhp.get_last_one(max_date=day, risk=risk)
start = next_workday(last_nav['date'] if last_nav else get_start_date()) start = next_workday(last_nav['date'] if last_nav else self._executor.start_date)
while start <= day: try:
logger.info(f"start to build hold portfolio for date[{format_date(start)}]") while start <= day:
signal = None logger.info(f"start to build hold portfolio[{risk.name}] for date[{format_date(start)}]")
if last_nav: signal = None
last_re = rhp.get_last_one(max_date=start, risk=risk, rebalance=True) if last_nav:
if len(workday_range(last_re['date'], start)) > self.interval_days: last_re = rhp.get_last_one(max_date=start, risk=risk, rebalance=True)
if len(workday_range(last_re['date'], start)) > self.interval_days:
signal = self._rule.take_next_signal(prev_workday(start), risk)
else:
signal = self._rule.take_next_signal(prev_workday(start), risk) signal = self._rule.take_next_signal(prev_workday(start), risk)
else: if signal and not signal['effective']:
signal = self._rule.take_next_signal(prev_workday(start), risk) logger.info(f"start to rebalance hold portfolio[{risk.name}] for date[{format_date(start)}] "
if signal and not signal['effective']: f"with signal[{SignalType(signal['type']).name}]")
logger.info(f"start to rebalance hold portfolio for date[{format_date(start)}] " self.do_rebalance(start, risk, signal, last_nav)
f"with signal[{SignalType(signal['type']).name}]") elif last_nav and signal is None:
self.do_rebalance(start, risk, signal, last_nav) self.no_rebalance(start, risk, last_nav)
elif last_nav and signal is None: start = next_workday(start)
self.no_rebalance(start, risk, last_nav) last_nav = rhp.get_last_one(max_date=day, risk=risk)
start = next_workday(start) except Exception as e:
last_nav = rhp.get_last_one(max_date=day, risk=risk) logger.exception(f"build hold portfolio[{risk.name}] for date[{format_date(start)}] failure.", e)
@transaction @transaction
def do_rebalance(self, day, risk: PortfoliosRisk, signal, last_nav): def do_rebalance(self, day, risk: PortfoliosRisk, signal, last_nav):
......
...@@ -30,3 +30,13 @@ def insert(datas): ...@@ -30,3 +30,13 @@ def insert(datas):
insert into robo_weight_drift({','.join([x for x in datas.keys()])}) insert into robo_weight_drift({','.join([x for x in datas.keys()])})
values ({','.join([f"'{x[1]}'" for x in datas.items()])}) values ({','.join([f"'{x[1]}'" for x in datas.items()])})
''' '''
@write
def lock():
return "lock tables robo_weight_drift read"
@write
def unlock():
return "UNLOCK TABLES"
from api import DriftSolver, PortfoliosRisk, PortfoliosBuilder, Datum from api import DriftSolver, PortfoliosRisk, PortfoliosBuilder, Datum, RoboExecutor
from framework import component, autowired, get_config, workday_range, filter_weekend, next_workday from framework import component, autowired, get_config, workday_range, filter_weekend, next_workday
from rebalance.dao import robo_rebalance_signal as rrs, robo_weight_drift as rwd from rebalance.dao import robo_rebalance_signal as rrs, robo_weight_drift as rwd
def get_start_date():
config = get_config('main')
return filter_weekend(config['start-date'])
@component(bean_name='date-curve') @component(bean_name='date-curve')
class DateCurve(DriftSolver): class DateCurve(DriftSolver):
...@@ -31,10 +26,11 @@ class DateCurve(DriftSolver): ...@@ -31,10 +26,11 @@ class DateCurve(DriftSolver):
@component(bean_name='high-weight') @component(bean_name='high-weight')
class PortfolioHighWeight(DriftSolver): class PortfolioHighWeight(DriftSolver):
@autowired @autowired(names={'executor': RoboExecutor.use_name()})
def __init__(self, builder: PortfoliosBuilder = None, datum: Datum = None): def __init__(self, builder: PortfoliosBuilder = None, datum: Datum = None, executor: RoboExecutor = None):
self._builder = builder self._builder = builder
self._datum = datum self._datum = datum
self._executor = executor
self._config = get_config(__name__)['high-weight'] self._config = get_config(__name__)['high-weight']
@property @property
...@@ -46,7 +42,7 @@ class PortfolioHighWeight(DriftSolver): ...@@ -46,7 +42,7 @@ class PortfolioHighWeight(DriftSolver):
if not drift: if not drift:
datum_ids = [x['id'] for x in self._datum.get_high_risk_datums(risk)] datum_ids = [x['id'] for x in self._datum.get_high_risk_datums(risk)]
last_one = rwd.get_last_one(max_date=day, risk=risk) last_one = rwd.get_last_one(max_date=day, risk=risk)
start = (next_workday(last_one['date'])) if last_one else get_start_date() start = (next_workday(last_one['date'])) if last_one else self._executor.start_date
last_drift = last_one['drift'] if last_one else 0 last_drift = last_one['drift'] if last_one else 0
for date in workday_range(start, day): for date in workday_range(start, day):
portfolio = self._builder.get_portfolios(date, risk) portfolio = self._builder.get_portfolios(date, risk)
......
from api import PortfoliosRisk, SignalType from api import PortfoliosRisk, SignalType, RoboExecutor
from framework import component, filter_weekend, get_config from framework import component, filter_weekend, get_config, autowired
from rebalance.base_signal import BaseRebalanceSignal from rebalance.base_signal import BaseRebalanceSignal
from rebalance.dao import robo_rebalance_signal as rrs from rebalance.dao import robo_rebalance_signal as rrs
def get_start_date():
config = get_config('main')
return filter_weekend(config['start-date'])
@component(bean_name='init') @component(bean_name='init')
class InitSignalBuilder(BaseRebalanceSignal): class InitSignalBuilder(BaseRebalanceSignal):
@autowired(names={'executor': RoboExecutor.use_name()})
def __init__(self, executor: RoboExecutor = None):
super(InitSignalBuilder, self).__init__()
self._executor = executor
@property @property
def signal_type(self) -> SignalType: def signal_type(self) -> SignalType:
return SignalType.INIT return SignalType.INIT
...@@ -20,7 +20,7 @@ class InitSignalBuilder(BaseRebalanceSignal): ...@@ -20,7 +20,7 @@ class InitSignalBuilder(BaseRebalanceSignal):
signal = rrs.get_last_one(max_date=day, risk=risk, type=SignalType.INIT) signal = rrs.get_last_one(max_date=day, risk=risk, type=SignalType.INIT)
if signal: if signal:
return None if signal['effective'] else signal return None if signal['effective'] else signal
return super().get_signal(get_start_date(), risk) return super().get_signal(self._executor.start_date, risk)
def is_trigger(self, day, risk: PortfoliosRisk) -> bool: def is_trigger(self, day, risk: PortfoliosRisk) -> bool:
return True return True
import pandas as pd
from framework import component, autowired, block_execute, get_config, get_logger, filter_weekend, workday_range, format_date
from api import RoboExecutor, AssetRisk, Datum, AssetPool, PortfoliosBuilder, PortfoliosRisk, PortfoliosHolder
from datetime import datetime as dt
import time
logger = get_logger(__name__)
@component(bean_name='backtest')
class BacktestExector(RoboExecutor):
@autowired
def __init__(self, risk: AssetRisk = None, datum: Datum = None, pool: AssetPool = None,
builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None):
self._risk = risk
self._datum = datum
self._pool = pool
self._builder = builder
self._hold = hold
self._config = get_config(__name__)['backtest']
@property
def start_date(self):
return pd.to_datetime(filter_weekend(self._config['start-date']))
@property
def end_date(self):
return pd.to_datetime(self._config['end-date'])
def start_exec(self):
logger.info("start to build fund ewma value.".center(50, '-'))
now = dt.now()
block_execute(self._risk.build_risk_date, {x['id']: (x['id'], self.end_date) for x in self._datum.get_fund_datums(risk=(3, 4, 5))}, isolate=True, result=False)
logger.info(f"build fund ewma value success, use[{(dt.now() - now).seconds}s]")
logger.info("start to build asset pool".center(50, '-'))
now = dt.now()
workdays = workday_range(self.start_date, self.end_date)
for date in workdays:
self._risk.get_risk_pool(date)
time.sleep(0.05)
for date in workdays:
self._pool.get_pool(date)
logger.info(f"build fund ewma value success, use[{(dt.now() - now).seconds}s]")
logger.info("start to build normal portfolios".center(50, '-'))
now = dt.now()
block_execute(self._builder.get_portfolios, {f'{x.name}_{format_date(j)}': (j, x) for x in PortfoliosRisk for j in workday_range(self.start_date, self.end_date)}, isolate=True, result=False)
logger.info(f"build normal portfolios success, use[{(dt.now() - now).seconds}s]")
logger.info("start to build hold portfolios".center(50, '-'))
now = dt.now()
block_execute(self._hold.build_hold_portfolio, {x: (self.end_date, x) for x in PortfoliosRisk}, isolate=True, result=False)
logger.info(f"build hold portfolios success, use[{(dt.now() - now).seconds}s]")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment