Commit db33dba7 authored by jichao's avatar jichao

依赖注入实现中

parent 42c68afc
......@@ -21,6 +21,27 @@ class AssetPoolType(Enum):
RISK = 2
@unique
class PortfoliosRisk(Enum):
FT3 = 3
FT6 = 6
FT9 = 9
@unique
class PortfoliosType(Enum):
CRISIS_1 = 'crisis_1'
CRISIS_2 = 'crisis_2'
RIGHT_SIDE = 'right_side'
NORMAL = 'normal'
@unique
class SolveType(Enum):
INFEASIBLE = 0
MPT = 1
POEM = 2
class BusinessException(Exception):
def __init__(self, msg):
......@@ -118,6 +139,7 @@ class AssetPool(ABC):
'''
资产池相关服务
'''
@abstractmethod
def get_pool(self, day):
'''
......@@ -126,3 +148,19 @@ class AssetPool(ABC):
:return: 资产id列表
'''
pass
class PortfoliosBuilder(ABC):
'''
投组组合构建器
'''
@abstractmethod
def get_portfolios(self, day, risk: PortfoliosRisk, type: PortfoliosType = PortfoliosType.NORMAL):
'''
获取指定日期,指定风险等级的投资组合
:param day: 指定日期
:param risk: 风险等级
:return: 资产组合字典{id: weight}
'''
pass
import time
import traceback
import pandas as pd
import json
from scipy.stats import norm
from datetime import datetime as dt
import pandas as pd
from dateutil.relativedelta import relativedelta
from framework import component, autowired, get_config, log, format_date
from scipy.stats import norm
from api import AssetRisk, Navs, AssetRiskDateType as DateType, Datum, AssetPoolType
from asset_pool.dao import asset_risk_dates as ard, asset_ewma_value as aev, robo_assets_pool as rap
from concurrent.futures import ProcessPoolExecutor, wait, as_completed
from framework import component, autowired, get_config, log, format_date, block_execute
def get_risk_start_date():
......@@ -16,10 +15,6 @@ def get_risk_start_date():
return config['start-date'] - relativedelta(months=3)
def test(*args, **kwargs):
print(args, kwargs)
@component
class CvarEwmaAssetRisk(AssetRisk):
'''
......@@ -36,14 +31,14 @@ class CvarEwmaAssetRisk(AssetRisk):
def get_risk_pool(self, day):
asset_pool = rap.get_one(day, AssetPoolType.RISK)
if not asset_pool:
with ProcessPoolExecutor(max_workers=4) as process:
futures = {process.submit(self.is_risk, x['id'], day): x for x in self._datum.get_fund_datums()}
risk_ids = [futures[x]['id'] for x in as_completed(futures) if x.result()]
result = block_execute(self.is_risk, {x['id']: (x['id'], day) for x in self._datum.get_fund_datums()})
risk_ids = [x[0] for x in result.items() if x[1]]
rap.insert(day, AssetPoolType.RISK, risk_ids)
asset_pool = rap.get_one(day, AssetPoolType.RISK)
return json.loads(asset_pool['asset_ids'])
def is_risk(self, id, day) -> bool:
print(id, day)
asset_pool = rap.get_one(day, AssetPoolType.RISK)
if asset_pool:
return id in json.loads(asset_pool['asset_ids'])
......@@ -51,8 +46,8 @@ class CvarEwmaAssetRisk(AssetRisk):
last = ard.get_last_one(id, day)
return DateType(last['type']) is DateType.START_DATE if last else False
def build_risk_date(self, asset_id, day=None):
risk_date = "first"
def build_risk_date(self, asset_id, day=dt.today()):
risk_date = not None
try:
log.debug(f"start build risk date for asset[{asset_id}] to date[{format_date(day)}]")
while risk_date is not None:
......@@ -123,15 +118,10 @@ class CvarEwmaAssetRisk(AssetRisk):
def get_income_return(self, asset_id, min_date=None, max_date=None):
fund_navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=asset_id, max_date=max_date))
fund_navs['rtn'] = fund_navs['nav_cal'] / fund_navs['nav_cal'].shift(self._config['rtn-day']) - 1
fund_navs['rtn'] = fund_navs['nav_cal'] / fund_navs['nav_cal'].shift(self._config['rtn-days']) - 1
fund_navs.dropna(inplace=True)
if min_date:
fund_navs = fund_navs[fund_navs.nav_date >= pd.to_datetime(min_date)]
fund_navs.rename(columns={'nav_date': 'date'}, inplace=True)
fund_navs = fund_navs[['date', 'rtn']]
return fund_navs.to_dict('records')
@autowired
def build_date_task(id, day=dt.today(), asset_risk: CvarEwmaAssetRisk = None):
asset_risk.build_risk_date(id, day)
\ No newline at end of file
......@@ -32,7 +32,7 @@ framework:
level: INFO
stream: ext://sys.stdout
file:
class: logging.handlers.TimedRotatingFileHandler
class: logging.handlers.TimedRotatingFileHandler
level: INFO
formatter: brief
filename: logs/info.log
......@@ -42,12 +42,12 @@ framework:
when: D
loggers:
prod:
handlers: [console,file]
handlers: [ console,file ]
level: INFO
propagate: 0
root:
level: INFO
handlers: [console]
handlers: [ console ]
main:
start-date: 2022-09-01
basic:
......@@ -61,12 +61,12 @@ asset-pool:
- months: 3
weight: 0.5
- months: 6
weight: 0.2
weight: 0.3
- years: 1
weight: 0.2
asset-risk:
advance-months: 3
rtn-day: 5
rtn-days: 5
ewma:
condition-total: 6
condition-meet: 4
......@@ -76,5 +76,43 @@ asset-pool:
min-volume: 30
threshold: -0.03
coef: 0.95
portfolios:
builder:
tol: 1E-10
navs:
months: 3
max-nan:
asset: 8
day: 0.5
risk:
ft3: [ 2, 3 ]
ft6: [ 2, 3, 4 ]
ft9: [ 2, 3, 4, 5 ]
matrix-rtn-days: 21
asset-count: 5 # count or [min, max]
mpt:
cvar-beta: 0.2
quantile: 0.9
low-weight: 0.05
high-weight: [ 1, 0.6, 0.35 ]
poem:
cvar-scale-factor: 0.1
right-side:
navs:
risk: [1, 2]
exclude-asset-type: ['STOCK', 'BALANCED']
mpt:
quantile: 0.3
crisis_1:
navs:
risk: [1, 2]
mpt:
quantile: 0.1
crisis_2:
navs:
risk: [ 1, 2 ]
mpt:
quantile: 0.1
......@@ -4,7 +4,7 @@ from .database import read, write, transaction, where, to_columns
from .env_config import config, get_config
from .logger import build_logger, logger as log
from .injectable import component, autowired, get_instance, init_injectable as _init_injectable
from .mulit_process import process_pool, process
from .mulit_process import process_pool, create_process_pool, block_execute
_init_injectable()
del injectable, logger, env_config, database, base, date_utils, _init_injectable, mulit_process
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor, as_completed
from framework.env_config import get_config
from functools import partial, wraps
config = get_config(__name__)
process_pool = ProcessPoolExecutor(max_workers=config['max-workers'] or 2)
def process(func=None):
if func is None:
return partial(process)
def create_process_pool(max_workers=None):
return ProcessPoolExecutor(max_workers=max_workers or config['max-workers'])
@wraps(func)
def wrap(*args, **kwargs):
return process_pool.submit(func, *args, **kwargs)
return wrap
def block_execute(func, params: dict, isolate=False) -> dict:
if isolate:
with create_process_pool() as process:
futures = {process.submit(func, *x[1]): x[0] for x in params.items()}
return {futures[x]: x.result() for x in as_completed(futures)}
else:
futures = {process_pool.submit(func, *x[1]): x[0] for x in params.items()}
return {futures[x]: x.result() for x in as_completed(futures)}
from framework import autowired, parse_date, log
from api import AssetPool
from api import PortfoliosBuilder, PortfoliosRisk
@autowired
def start(pool: AssetPool = None):
def start(builder: PortfoliosBuilder = None):
day = parse_date('2022-11-07')
log.info(pool.get_pool(day))
log.info(builder.get_portfolios(day, PortfoliosRisk.FT3))
def test(arg):
if arg:
return 1, 1
else:
return None
if __name__ == '__main__':
log.info("start")
start()
start()
\ No newline at end of file
import os
import sys
import json
import pandas as pd
from dateutil.relativedelta import relativedelta
from numpy import NAN
from pyomo.environ import *
from api import PortfoliosBuilder, PortfoliosRisk, AssetPool, Navs, PortfoliosType, Datum, SolveType
from framework import component, autowired, get_config
from portfolios.dao import robo_mpt_portfolios as rmp
def create_solver():
if sys.platform.find('win') == 0:
executor = 'bonmin.exe'
elif sys.platform == 'linux':
executor = 'bonmin_linux'
else:
executor = 'bonmin_mac'
return SolverFactory('Bonmin', executable=os.path.join(os.path.dirname(__file__), executor))
class MptSolver:
@autowired
def __init__(self, risk: PortfoliosRisk, type: PortfoliosType, assets: AssetPool = None, navs: Navs = None, datum: Datum = None):
self.__navs = None
self.risk = risk
self.type = type or PortfoliosType.NORMAL
self._assets = assets
self._navs = navs
self._datum = datum
self._config = get_config(__name__)
self._solver = create_solver()
self._solver.options['tol'] = float(self.get_config('tol') or 1E-10)
@property
def navs(self):
return self.__navs
@property
def rtn_matrix(self):
result = self.navs / self.navs.shift(self.get_config('matrix-rtn-days')) - 1
result.dropna(inplace=True)
return result
@property
def rtn_annualized(self):
return list(self.rtn_matrix.mean() * 12)
@property
def sigma(self):
rtn = (self.navs / self.navs.shift(1) - 1)[1:]
return rtn.cov() * 252
@property
def rtn_history(self):
result = self.rtn_matrix * 12
return result.values
@property
def k_beta(self):
return round(len(self.rtn_history) * self.get_config('mpt.cvar-beta') + 0.499999)
def solve_max_rtn(self):
model = self.create_model()
model.objective = Objective(expr=sum([model.w[i] * self.rtn_annualized[i] for i in model.indices]), sense=maximize)
self._solver.solve(model)
max_rtn = self.calc_port_rtn(model)
max_var = self.calc_port_var(model)
minCVaR_whenMaxR = self.calc_port_cvar(model)
return max_rtn, max_var, minCVaR_whenMaxR
def solve_min_rtn(self):
model = self.create_model()
model.objective = Objective(
expr=sum([model.w[i] * model.w[j] * self.sigma.iloc[i, j] for i in model.indices for j in model.indices]),
sense=minimize)
self._solver.solve(model)
min_rtn = self.calc_port_rtn(model)
min_var = self.calc_port_var(model)
maxCVaR_whenMinV = self.calc_port_cvar(model)
return min_rtn, min_var, maxCVaR_whenMinV
def solve_mpt(self, min_rtn, max_rtn):
big_y = min_rtn + self.get_config('mpt.quantile') * (max_rtn - min_rtn)
model = self.create_model()
model.cons_rtn = Constraint(expr=sum([model.w[i] * self.rtn_annualized[i] for i in model.indices]) >= big_y)
model.objective = Objective(
expr=sum([model.w[i] * model.w[j] * self.sigma.iloc[i, j] for i in model.indices for j in model.indices]),
sense=minimize)
result = self._solver.solve(model)
if result.solver.termination_condition == TerminationCondition.infeasible:
return None, None
return self.calc_port_weight(model), self.calc_port_cvar(model)
def solve_poem(self, min_rtn, max_rtn, base_cvar, max_cvar):
k_history = len(self.rtn_history)
quantile = self.get_config('mpt.quantile')
big_y = min_rtn + quantile * (max_rtn - min_rtn)
small_y = base_cvar + (max_cvar - base_cvar) * self.get_config('poem.cvar-scale-factor') * quantile
model = self.create_model()
model.alpha = Var(domain=Reals)
model.x = Var(range(k_history), domain=NonNegativeReals)
model.cons_cvar_aux = Constraint(range(k_history), rule=lambda m, k: m.x[k] >= m.alpha - sum([m.w[i] * self.rtn_history[k][i] for i in m.indices]))
model.cons_rtn = Constraint(expr=sum([model.w[i] * self.rtn_annualized[i] for i in model.indices]) >= big_y)
model.cons_cvar = Constraint(expr=model.alpha - (1 / self.k_beta) * sum([model.x[k] for k in range(k_history)]) >= small_y)
result = self._solver.solve(model)
if result.solver.termination_condition == TerminationCondition.infeasible:
return None, None
return self.calc_port_weight(model), self.calc_port_cvar(model)
def calc_port_weight(self, model):
id_list = self._navs.columns
weight_list = []
for i in model.indices:
weight_list.append(model.w[i]._value * model.z[i]._value)
df_w = pd.DataFrame(data=weight_list, index=id_list, columns=['weight'])
df_w.replace(0, NAN, inplace=True)
df_w.dropna(axis=0, inplace=True)
df_w['weight'] = self.format_weight(df_w['weight'])
dict_w = df_w.to_dict()['weight']
return dict_w
@staticmethod
def format_weight(weight_series):
weight_series = weight_series.fillna(0)
minidx = weight_series[weight_series > 0].idxmin()
maxidx = weight_series.idxmax()
weight_series = weight_series.apply(lambda x: round(x, 2))
if weight_series.sum() < 1:
weight_series[minidx] += 1 - weight_series.sum()
elif weight_series.sum() > 1:
weight_series[maxidx] += 1 - weight_series.sum()
return weight_series.apply(lambda x: float(x))
def calc_port_rtn(self, model):
return sum([model.w[i]._value * self.rtn_annualized[i] for i in model.indices])
def calc_port_var(self, model):
return sum([model.w[i]._value * model.w[j]._value * self.sigma.iloc[i, j] for i in model.indices for j in model.indices])
def calc_port_cvar(self, model):
port_r_hist = []
for k in range(len(self.rtn_history)):
port_r_hist.append(sum([model.w[i]._value * model.z[i]._value * self.rtn_history[k][i] for i in model.indices]))
port_r_hist.sort()
return sum(port_r_hist[0: self.k_beta]) / self.k_beta
def create_model(self):
count = self.get_config('asset-count')
min_count = count[0] if isinstance(count, list) else count
max_count = count[1] if isinstance(count, list) else count
low_weight = self.get_config('mpt.low-weight')
high_weight = self.get_config('mpt.high-weight')
if isinstance(high_weight, list):
high_weight = high_weight[min(len(self.navs.columns), min_count, len(high_weight)) - 1]
model = ConcreteModel()
model.indices = range(0, len(self._navs.columns))
model.w = Var(model.indices, domain=NonNegativeReals)
model.z = Var(model.indices, domain=Binary)
model.cons_sum_weight = Constraint(expr=sum([model.w[i] for i in model.indices]) == 1)
model.cons_num_asset = Constraint(expr=inequality(min_count, sum([model.z[i] for i in model.indices]), max_count, strict=False))
model.cons_bounds_low = Constraint(model.indices, rule=lambda m, i: m.z[i] * low_weight <= m.w[i])
model.cons_bounds_up = Constraint(model.indices, rule=lambda m, i: m.z[i] * high_weight >= m.w[i])
return model
def reset_navs(self, day):
asset_ids = self._assets.get_pool(day)
asset_risk = self.get_config('navs.risk')
datum = self._datum.get_fund_datums(fund_ids=asset_ids, risk=asset_risk)
exclude = self.get_config('navs.exclude-asset-type') or []
asset_ids = list(set(asset_ids) & set([x['id'] for x in datum if x['assetType'] not in exclude]))
min_date = day - relativedelta(months=self.get_config('navs.months'))
navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=asset_ids, max_date=day, min_date=min_date))
navs['nav_date'] = pd.to_datetime(navs['nav_date'])
navs = navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
navs = navs.sort_index()
navs_nan = navs.isna().sum()
navs.drop(columns=[x for x in navs_nan.index if navs_nan.loc[x] >= self.get_config('navs.max-nan.asset')], inplace=True)
navs_nan = navs.apply(lambda r: r.isna().sum() / len(r), axis=1)
navs.drop(index=[x for x in navs_nan.index if navs_nan.loc[x] >= self.get_config('navs.max-nan.day')], inplace=True)
navs.fillna(method='ffill', inplace=True)
self.__navs = navs
def get_config(self, name):
def load_config(config):
for key in name.split('.'):
if key in config:
config = config[key]
else:
return None
return config
value = load_config(self._config[self.type.value] if self.type is not PortfoliosType.NORMAL else self._config)
if value is None:
value = load_config(self._config)
return value[f'ft{self.risk.value}'] if value and isinstance(value, dict) else value
@component(bean_name='mpt')
class MptPortfoliosBuilder(PortfoliosBuilder):
@autowired
def __init__(self, assets: AssetPool = None, navs: Navs = None, datum: Datum = None):
self._assets = assets
self._navs = navs
self._datum = datum
def get_portfolios(self, day, risk: PortfoliosRisk, type: PortfoliosType = PortfoliosType.NORMAL):
portfolio = rmp.get_one(day, type, risk)
if not portfolio:
self.build_portfolio(day, type)
portfolio = rmp.get_one(day, type, risk)
return json.loads(portfolio['portfolio']) if SolveType(portfolio['rmp_rolve']) is not SolveType.INFEASIBLE else None
def build_portfolio(self, day, type: PortfoliosType):
for risk in PortfoliosRisk:
solver = MptSolver(risk, type)
solver.reset_navs(day)
max_rtn, max_var, minCVaR_whenMaxR = solver.solve_max_rtn()
min_rtn, min_var, maxCVaR_whenMinV = solver.solve_min_rtn()
portfolio, cvar = solver.solve_mpt(min_rtn, max_rtn)
if portfolio:
rmp.insert({
'date': day,
'risk': risk,
'type': type,
'solve': SolveType.MPT,
'portfolio': json.dumps(portfolio),
'cvar': cvar
})
else:
rmp.insert({
'date': day,
'risk': risk,
'type': type,
'solve': SolveType.INFEASIBLE
})
@component(bean_name='poem')
class PoemPortfoliosBuilder(MptPortfoliosBuilder):
def build_portfolio(self, day, type: PortfoliosType):
for risk in PortfoliosRisk:
solver = MptSolver(risk, type)
solver.reset_navs(day)
max_rtn, max_var, minCVaR_whenMaxR = solver.solve_max_rtn()
min_rtn, min_var, maxCVaR_whenMinV = solver.solve_min_rtn()
portfolio, cvar = solver.solve_mpt(min_rtn, max_rtn)
solve = SolveType.MPT
if portfolio is not None:
poem_port, poem_cvar = solver.solve_poem(min_rtn, max_rtn, cvar, maxCVaR_whenMinV)
if poem_port:
portfolio = poem_port
cvar = poem_cvar
solve = SolveType.POEM
if portfolio:
rmp.insert({
'date': day,
'risk': risk,
'type': type,
'solve': solve,
'portfolio': json.dumps(portfolio),
'cvar': cvar
})
else:
rmp.insert({
'date': day,
'risk': risk,
'type': type,
'solve': SolveType.INFEASIBLE
})
CREATE TABLE IF NOT EXISTS robo_mpt_portfolios
(
rmp_id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
rmp_date DATETIME NOT NULL COMMENT '日期',
rmp_risk TINYINT NOT NULL COMMENT '风险等级',
rmp_type VARCHAR(255) NOT NULL COMMENT '投组类型',
rmp_rolve TINYINT NOT NULL COMMENT '求解方式',
rmp_portfolio JSON DEFAULT NULL COMMENT '投组权重',
rmp_cvar DOUBLE DEFAULT NULL COMMENT '投组cvar',
rmp_create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
rmp_update_time DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (rmp_id),
UNIQUE INDEX (rmp_date, rmp_risk, rmp_type),
INDEX (rmp_risk),
INDEX (rmp_type)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT '最优投组表';
\ No newline at end of file
from datetime import datetime
from framework import read, write, where, format_date
from enum import Enum
from api import PortfoliosRisk, PortfoliosType
__COLUMNS__ = {
'rmp_id': 'id',
'rmp_date': 'date',
'rmp_risk': 'risk',
'rmp_type': 'type',
'rmp_rolve': 'solve',
'rmp_portfolio': 'portfolio',
'rmp_cvar': 'cvar',
}
@write
def insert(datas):
datas = {x[0]: datas[x[1]] for x in __COLUMNS__.items() if x[1] in datas and datas[x[1]] is not None}
datas = {
**datas,
**{x[0]: format_date(x[1]) for x in datas.items() if isinstance(x[1], datetime)},
**{x[0]: x[1].value for x in datas.items() if isinstance(x[1], Enum)}
}
return f'''
insert into robo_mpt_portfolios({','.join([x for x in datas.keys()])})
values ({','.join([f"'{x[1]}'" for x in datas.items()])})
'''
@read(one=True)
def get_one(day, type: PortfoliosType, risk: PortfoliosRisk):
return f'''
select {','.join([x for x in __COLUMNS__.keys()])} from robo_mpt_portfolios
{where(rmp_date=day, rmp_risk=risk, rmp_type=type)}
'''
......@@ -6,7 +6,9 @@ lxml==4.9.0
numpy==1.23.4
pandas==1.5.1
pandas-datareader==0.10.0
ply==3.11
PyMySQL==1.0.2
Pyomo @ git+https://github.com/Pyomo/pyomo.git@3cb35f91c7c1ab4e50871110450429ec29950273
python-dateutil==2.8.2
pytz==2022.6
PyYAML==6.0
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment