Commit a4e059a9 authored by jichao's avatar jichao

使用异步框架

parent 050358c5
...@@ -504,6 +504,15 @@ class RebalanceRuler(ABC): ...@@ -504,6 +504,15 @@ class RebalanceRuler(ABC):
''' '''
pass pass
@abstractmethod
def clear_signal(self, day=None, risk: PortfoliosRisk = None):
'''
清除指定风险等级,指定日期之后的调仓信号
:param day: 指定清除的开始日期,可选,如果没给,则清除全部日期
:param risk: 指定风险等级,如果没给,则清除全部风险等级
'''
pass
class RoboExecutor(ABC): class RoboExecutor(ABC):
''' '''
......
...@@ -4,7 +4,7 @@ from datetime import datetime as dt ...@@ -4,7 +4,7 @@ from datetime import datetime as dt
import pandas as pd import pandas as pd
from dateutil.relativedelta import relativedelta from dateutil.relativedelta import relativedelta
from py_jftech import component, autowired, get_config, format_date, block_execute, transaction from py_jftech import component, autowired, get_config, format_date, transaction, asynchronized
from scipy.stats import norm from scipy.stats import norm
from api import AssetRisk, Navs, AssetRiskDateType as DateType, Datum, AssetPoolType, RoboExecutor from api import AssetRisk, Navs, AssetRiskDateType as DateType, Datum, AssetPoolType, RoboExecutor
...@@ -34,12 +34,16 @@ class CvarEwmaAssetRisk(AssetRisk): ...@@ -34,12 +34,16 @@ class CvarEwmaAssetRisk(AssetRisk):
def get_risk_pool(self, day): def get_risk_pool(self, day):
asset_pool = rap.get_one(day, AssetPoolType.RISK) asset_pool = rap.get_one(day, AssetPoolType.RISK)
if not asset_pool: if not asset_pool:
result = block_execute(self.is_risk, {x['id']: (x['id'], day) for x in self._datum.get_fund_datums(risk=(3, 4, 5))}) result = {x['id']: self.is_risk(x['id'], day) for x in self._datum.get_fund_datums(risk=(3, 4, 5))}
risk_ids = [x[0] for x in result.items() if x[1]] risk_ids = [x[0] for x in result.items() if x[1].result()]
rap.insert(day, AssetPoolType.RISK, risk_ids) rap.insert(day, AssetPoolType.RISK, risk_ids)
asset_pool = rap.get_one(day, AssetPoolType.RISK) asset_pool = rap.get_one(day, AssetPoolType.RISK)
return json.loads(asset_pool['asset_ids']) return json.loads(asset_pool['asset_ids'])
@asynchronized
def async_is_risk(self, id, day):
return self.is_risk(id, day)
def is_risk(self, id, day) -> bool: def is_risk(self, id, day) -> bool:
asset_pool = rap.get_one(day, AssetPoolType.RISK) asset_pool = rap.get_one(day, AssetPoolType.RISK)
if asset_pool: if asset_pool:
......
...@@ -2,6 +2,7 @@ import json ...@@ -2,6 +2,7 @@ import json
import logging import logging
from py_jftech import component, autowired, format_date from py_jftech import component, autowired, format_date
from pymysql import IntegrityError, constants
from api import PortfoliosBuilder, PortfoliosRisk, AssetPool, Navs, PortfoliosType, Datum, SolveType, SolverFactory from api import PortfoliosBuilder, PortfoliosRisk, AssetPool, Navs, PortfoliosType, Datum, SolveType, SolverFactory
from portfolios.dao import robo_mpt_portfolios as rmp from portfolios.dao import robo_mpt_portfolios as rmp
...@@ -25,12 +26,17 @@ class MptPortfoliosBuilder(PortfoliosBuilder): ...@@ -25,12 +26,17 @@ class MptPortfoliosBuilder(PortfoliosBuilder):
if not portfolio: if not portfolio:
result, detail = self.build_portfolio(day, type) result, detail = self.build_portfolio(day, type)
for build_risk, datas in result.items(): for build_risk, datas in result.items():
rmp.insert({ try:
**datas, rmp.insert({
'risk': build_risk, **datas,
'type': type, 'risk': build_risk,
'date': day 'type': type,
}) 'date': day
})
except IntegrityError as e:
code, msg = e.args
if code != constants.ER.DUP_ENTRY:
raise e
portfolio = rmp.get_one(day, type, risk) portfolio = rmp.get_one(day, type, risk)
if SolveType(portfolio['solve']) is not SolveType.INFEASIBLE: if SolveType(portfolio['solve']) is not SolveType.INFEASIBLE:
result = json.loads(portfolio['portfolio']) result = json.loads(portfolio['portfolio'])
...@@ -38,6 +44,7 @@ class MptPortfoliosBuilder(PortfoliosBuilder): ...@@ -38,6 +44,7 @@ class MptPortfoliosBuilder(PortfoliosBuilder):
return None return None
except Exception as e: except Exception as e:
logger.exception(f"build protfolio of type[{type.name}] and risk[{risk.name}] with date[{format_date(day)}] failure.", e) logger.exception(f"build protfolio of type[{type.name}] and risk[{risk.name}] with date[{format_date(day)}] failure.", e)
raise e
def build_portfolio(self, day, type: PortfoliosType): def build_portfolio(self, day, type: PortfoliosType):
result = {} result = {}
......
...@@ -52,4 +52,4 @@ def delete(min_date=None, risk: PortfoliosRisk = None): ...@@ -52,4 +52,4 @@ def delete(min_date=None, risk: PortfoliosRisk = None):
return 'truncate table robo_hold_portfolios' return 'truncate table robo_hold_portfolios'
else: else:
sql = f"rhp_date >= '{format_date(min_date)}'" if min_date else None sql = f"rhp_date >= '{format_date(min_date)}'" if min_date else None
return f"delete from robo_hold_portfolios {where(sql, rhp_risk=risk)}" return f"delete from robo_hold_portfolios {where(sql, rhp_risk=risk)}"
\ No newline at end of file
...@@ -5,7 +5,7 @@ from logging import DEBUG, getLogger ...@@ -5,7 +5,7 @@ from logging import DEBUG, getLogger
import pandas as pd import pandas as pd
from dateutil.relativedelta import relativedelta from dateutil.relativedelta import relativedelta
from numpy import NAN from numpy import NAN
from py_jftech import component, autowired, get_config from py_jftech import component, autowired, get_config, is_workday
from pyomo.environ import * from pyomo.environ import *
from api import SolverFactory as Factory, PortfoliosRisk, PortfoliosType, AssetPool, Navs, Solver, Datum from api import SolverFactory as Factory, PortfoliosRisk, PortfoliosType, AssetPool, Navs, Solver, Datum
...@@ -216,6 +216,7 @@ class DefaultSolver(Solver): ...@@ -216,6 +216,7 @@ class DefaultSolver(Solver):
min_date = day - relativedelta(months=self.get_config('navs.months')) min_date = day - relativedelta(months=self.get_config('navs.months'))
navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=asset_ids, max_date=day, min_date=min_date)) navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=asset_ids, max_date=day, min_date=min_date))
navs = navs[navs['nav_date'].dt.day_of_week < 5]
navs['nav_date'] = pd.to_datetime(navs['nav_date']) navs['nav_date'] = pd.to_datetime(navs['nav_date'])
navs = navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal') navs = navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
navs = navs.sort_index() navs = navs.sort_index()
......
...@@ -3,16 +3,17 @@ import unittest ...@@ -3,16 +3,17 @@ import unittest
from py_jftech import autowired, parse_date from py_jftech import autowired, parse_date
from api import PortfoliosBuilder, PortfoliosType, PortfoliosRisk, PortfoliosHolder from api import PortfoliosBuilder, PortfoliosType, PortfoliosRisk, PortfoliosHolder, SolveType
from portfolios.dao import robo_mpt_portfolios as rmp
from pymysql import IntegrityError, constants
class PortfoliosTest(unittest.TestCase): class PortfoliosTest(unittest.TestCase):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@autowired(names={'builder': 'poem'}) @autowired(names={'builder': 'poem'})
def test_poem_build_portfolio(self, builder: PortfoliosBuilder = None): def test_poem_build_portfolio(self, builder: PortfoliosBuilder = None):
result, detail = builder.build_portfolio(parse_date('2022-11-07'), PortfoliosType.NORMAL) result, detail = builder.build_portfolio(parse_date('2016-09-22'), PortfoliosType.NORMAL)
self.logger.info("portfolios: ") self.logger.info("portfolios: ")
for risk, portfolio in result.items(): for risk, portfolio in result.items():
self.logger.info(risk.name) self.logger.info(risk.name)
...@@ -30,7 +31,7 @@ class PortfoliosTest(unittest.TestCase): ...@@ -30,7 +31,7 @@ class PortfoliosTest(unittest.TestCase):
@autowired(names={'hold': 'next-re'}) @autowired(names={'hold': 'next-re'})
def test_build_hold(self, hold: PortfoliosHolder = None): def test_build_hold(self, hold: PortfoliosHolder = None):
hold.build_hold_portfolio(parse_date('2009-01-01'), PortfoliosRisk.FT9) hold.build_hold_portfolio(parse_date('2016-01-01'), PortfoliosRisk.FT9)
pass pass
......
...@@ -68,3 +68,12 @@ def update(id, datas): ...@@ -68,3 +68,12 @@ def update(id, datas):
set {','.join([f"{x[0]} = '{x[1]}'" for x in datas.items()])} set {','.join([f"{x[0]} = '{x[1]}'" for x in datas.items()])}
where rrs_id = {id} where rrs_id = {id}
''' '''
@write
def delete(min_date=None, risk: PortfoliosRisk = None):
if min_date is None and risk is None:
return 'truncate table robo_rebalance_signal'
else:
sql = f"rrs_date >= '{format_date(min_date)}'" if min_date else None
return f"delete from robo_rebalance_signal {where(sql, rrs_risk=risk)}"
...@@ -70,3 +70,6 @@ class LevelRebalanceRuler(RebalanceRuler): ...@@ -70,3 +70,6 @@ class LevelRebalanceRuler(RebalanceRuler):
def commit_signal(self, sign_id): def commit_signal(self, sign_id):
rrs.update(sign_id, {'effective': True}) rrs.update(sign_id, {'effective': True})
def clear_signal(self, day=None, risk: PortfoliosRisk = None):
rrs.delete(min_date=day, risk=risk)
...@@ -33,9 +33,6 @@ class RebalanceTest(unittest.TestCase): ...@@ -33,9 +33,6 @@ class RebalanceTest(unittest.TestCase):
def test_rebalance_builder(self, builder: RebalanceRuler = None): def test_rebalance_builder(self, builder: RebalanceRuler = None):
builder.take_next_signal(parse_date('2022-09-01'), PortfoliosRisk.FT3) builder.take_next_signal(parse_date('2022-09-01'), PortfoliosRisk.FT3)
def test_logger(self):
self.logger.info('123123')
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
...@@ -7,7 +7,7 @@ numpy==1.23.4 ...@@ -7,7 +7,7 @@ numpy==1.23.4
pandas==1.5.1 pandas==1.5.1
pandas-datareader==0.10.0 pandas-datareader==0.10.0
ply==3.11 ply==3.11
PyJFTech==1.0.0 PyJFTech==1.1.0
PyMySQL==1.0.2 PyMySQL==1.0.2
Pyomo==6.4.3 Pyomo==6.4.3
python-dateutil==2.8.2 python-dateutil==2.8.2
......
...@@ -3,14 +3,18 @@ import sys ...@@ -3,14 +3,18 @@ import sys
import time import time
from datetime import datetime as dt from datetime import datetime as dt
from enum import Enum, unique from enum import Enum, unique
from concurrent.futures import wait
import pandas as pd import pandas as pd
from py_jftech import ( from py_jftech import (
component, autowired, block_execute, get_config, filter_weekend, component, autowired, get_config, filter_weekend, asynchronized,
workday_range, format_date, prev_workday, parse_date workday_range, format_date, prev_workday, parse_date
) )
from api import RoboExecutor, AssetRisk, Datum, AssetPool, PortfoliosBuilder, PortfoliosRisk, PortfoliosHolder, PortfoliosType from api import (
RoboExecutor, AssetRisk, Datum, AssetPool, PortfoliosBuilder,
PortfoliosRisk, PortfoliosHolder, PortfoliosType, RebalanceRuler
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -31,12 +35,13 @@ class BacktestExector(RoboExecutor): ...@@ -31,12 +35,13 @@ class BacktestExector(RoboExecutor):
@autowired @autowired
def __init__(self, risk: AssetRisk = None, datum: Datum = None, pool: AssetPool = None, def __init__(self, risk: AssetRisk = None, datum: Datum = None, pool: AssetPool = None,
builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None): builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None, rule: RebalanceRuler = None):
self._risk = risk self._risk = risk
self._datum = datum self._datum = datum
self._pool = pool self._pool = pool
self._builder = builder self._builder = builder
self._hold = hold self._hold = hold
self._rule = rule
self._config = get_config(__name__)['backtest'] self._config = get_config(__name__)['backtest']
@property @property
...@@ -64,14 +69,14 @@ class BacktestExector(RoboExecutor): ...@@ -64,14 +69,14 @@ class BacktestExector(RoboExecutor):
if self.start_step.within(BacktestStep.HOLD_PORTFOLIO): if self.start_step.within(BacktestStep.HOLD_PORTFOLIO):
logger.info('start to clear hold portfolios'.center(50, '-')) logger.info('start to clear hold portfolios'.center(50, '-'))
self._hold.clear() self._hold.clear()
self._rule.clear_signal()
def start_exec(self): def start_exec(self):
self.clear_datas() self.clear_datas()
if self.start_step.within(BacktestStep.EWMA_VALUE): if self.start_step.within(BacktestStep.EWMA_VALUE):
logger.info("start to build fund ewma value.".center(50, '-')) logger.info("start to build fund ewma value.".center(50, '-'))
now = dt.now() now = dt.now()
block_execute(self._risk.build_risk_date, {x['id']: (x['id'], self.end_date) for x in self._datum.get_fund_datums(risk=(3, 4, 5))}, isolate=True, wait([self.async_build_risk_date(x['id']) for x in self._datum.get_fund_datums(risk=(3, 4, 5))])
result=False)
logger.info(f"build fund ewma value success, use[{(dt.now() - now).seconds}s]") logger.info(f"build fund ewma value success, use[{(dt.now() - now).seconds}s]")
if self.start_step.within(BacktestStep.ASSET_POOL): if self.start_step.within(BacktestStep.ASSET_POOL):
logger.info("start to build asset pool".center(50, '-')) logger.info("start to build asset pool".center(50, '-'))
...@@ -79,23 +84,34 @@ class BacktestExector(RoboExecutor): ...@@ -79,23 +84,34 @@ class BacktestExector(RoboExecutor):
workdays = workday_range(self.start_date, self.end_date) workdays = workday_range(self.start_date, self.end_date)
for date in workdays: for date in workdays:
self._risk.get_risk_pool(date) self._risk.get_risk_pool(date)
time.sleep(0.05) # 这里需要sleep,否则里面多进程太快,数据库连接容易超时 # time.sleep(0.05) # 这里需要sleep,否则里面多进程太快,数据库连接容易超时
for date in workdays: for date in workdays:
self._pool.get_pool(date) self._pool.get_pool(date)
logger.info(f"build asset pool success, use[{(dt.now() - now).seconds}s]") logger.info(f"build asset pool success, use[{(dt.now() - now).seconds}s]")
if self.start_step.within(BacktestStep.NORMAL_PORTFOLIO): if self.start_step.within(BacktestStep.NORMAL_PORTFOLIO):
logger.info("start to build normal portfolios".center(50, '-')) logger.info("start to build normal portfolios".center(50, '-'))
now = dt.now() now = dt.now()
block_execute(self._builder.get_portfolios, wait([self.async_build_portfolios(day, risk) for risk in PortfoliosRisk for day in workday_range(self.start_date, self.end_date)])
{f'{x.name}_{format_date(j)}': (j, x) for x in PortfoliosRisk for j in workday_range(self.start_date, self.end_date)}, isolate=True,
result=False)
logger.info(f"build normal portfolios success, use[{(dt.now() - now).seconds}s]") logger.info(f"build normal portfolios success, use[{(dt.now() - now).seconds}s]")
if self.start_step.within(BacktestStep.HOLD_PORTFOLIO): if self.start_step.within(BacktestStep.HOLD_PORTFOLIO):
logger.info("start to build hold portfolios".center(50, '-')) logger.info("start to build hold portfolios".center(50, '-'))
now = dt.now() now = dt.now()
block_execute(self._hold.build_hold_portfolio, {x: (self.end_date, x) for x in PortfoliosRisk}, isolate=True, result=False) wait([self.async_build_hold(x) for x in PortfoliosRisk])
logger.info(f"build hold portfolios success, use[{(dt.now() - now).seconds}s]") logger.info(f"build hold portfolios success, use[{(dt.now() - now).seconds}s]")
@asynchronized(isolate=True)
def async_build_risk_date(self, asset_id):
print(asset_id)
self._risk.build_risk_date(asset_id, self.end_date)
@asynchronized(isolate=True)
def async_build_portfolios(self, day, risk: PortfoliosRisk):
self._builder.get_portfolios(day, risk)
@asynchronized(isolate=True)
def async_build_hold(self, risk: PortfoliosRisk):
self._hold.build_hold_portfolio(day=self.end_date, risk=risk)
@component(bean_name='real') @component(bean_name='real')
class RealExecutor(RoboExecutor): class RealExecutor(RoboExecutor):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment