Commit 2c487c1a authored by jichao's avatar jichao

依赖注入实现中

parent 3052be21
......@@ -223,6 +223,68 @@ class PortfoliosBuilder(ABC):
pass
class Solver(ABC):
'''
解算器
'''
def solve_max_rtn(self):
'''
:return: max_rtn, max_var, minCVaR_whenMaxR
'''
pass
def solve_min_rtn(self):
'''
:return: min_rtn, min_var, maxCVaR_whenMinR
'''
pass
def solve_mpt(self, min_rtn, max_rtn):
'''
常规mpt计算
:param min_rtn: 最小回报率
:param max_rtn: 最大回报率
:return: 投组,cvar
'''
pass
def solve_poem(self, min_rtn, max_rtn, base_cvar, max_cvar):
'''
poem方式的mpt计算
:param min_rtn: 最小回报率
:param max_rtn: 最大回报率
:param base_cvar: 基础cvar
:param max_cvar: 最大cvar
:return: 投组,cvar
'''
pass
def reset_navs(self, day):
'''
根据指定的日期,重置当前解算器,其他计算,全部依赖这里重置后的基金净值数据
:param day: 指定的日期
:return: 根据指定日期获取的,基金净值数据
'''
pass
@property
@abstractmethod
def navs(self):
'''
:return: 当前解算器需要的基金净值
'''
pass
class SolverFactory(ABC):
'''
解算器工厂
'''
@abstractmethod
def create_solver(self, risk: PortfoliosRisk, type: PortfoliosType = PortfoliosType.NORMAL) -> Solver:
pass
class SignalBuilder(ABC):
'''
控制信号,发起是否调仓服务
......
import json
from api import DatumType, Datum
from api import DatumType, Datum, PortfoliosRisk
from basic.dao import robo_base_datum as rbd
from framework import component, parse_date
......@@ -17,3 +17,6 @@ class DefaultDatum(Datum):
result = rbd.get_base_datums(type=DatumType.INDEX, ticker=ticker, datum_ids=index_ids)
return [{**json.loads(x['datas']), 'id': x['id']} for x in result]
def get_high_risk_datums(self, risk: PortfoliosRisk):
pass
......@@ -74,7 +74,7 @@ asset-pool:
threshold: -0.03
coef: 0.95
portfolios:
builder:
solver:
tol: 1E-10
navs:
months: 3
......
import datetime
import json
import os
import sys
from logging import DEBUG
import pandas as pd
from dateutil.relativedelta import relativedelta
from numpy import NAN
from pyomo.environ import *
from api import PortfoliosBuilder, PortfoliosRisk, AssetPool, Navs, PortfoliosType, Datum, SolveType
from framework import component, autowired, get_config, format_date, get_logger, parse_date
from api import PortfoliosBuilder, PortfoliosRisk, AssetPool, Navs, PortfoliosType, Datum, SolveType, SolverFactory
from framework import component, autowired, format_date, get_logger
from portfolios.dao import robo_mpt_portfolios as rmp
logger = get_logger(__name__)
def create_solver():
if sys.platform.find('win') == 0:
executor = 'bonmin.exe'
elif sys.platform == 'linux':
executor = 'bonmin_linux'
else:
executor = 'bonmin_mac'
return SolverFactory('Bonmin', executable=os.path.join(os.path.dirname(__file__), executor))
class MptSolver:
@autowired
def __init__(self, risk: PortfoliosRisk, type: PortfoliosType, assets: AssetPool = None, navs: Navs = None,
datum: Datum = None):
self.__navs = None
self.risk = risk
self.type = type or PortfoliosType.NORMAL
self._assets = assets
self._navs = navs
self._datum = datum
self._config = get_config(__name__)
self._solver = create_solver()
self._solver.options['tol'] = float(self.get_config('tol') or 1E-10)
@property
def navs(self):
return self.__navs
@property
def rtn_matrix(self):
result = self.navs / self.navs.shift(self.get_config('matrix-rtn-days')) - 1
result.dropna(inplace=True)
return result
@property
def rtn_annualized(self):
return list(self.rtn_matrix.mean() * 12)
@property
def sigma(self):
rtn = (self.navs / self.navs.shift(1) - 1)[1:]
return rtn.cov() * 252
@property
def rtn_history(self):
result = self.rtn_matrix * 12
return result.values
@property
def beta(self):
return self.get_config('mpt.cvar-beta')
@property
def k_beta(self):
return round(len(self.rtn_history) * self.beta + 0.499999)
@property
def quantile(self):
return self.get_config('mpt.quantile')
def solve_max_rtn(self):
model = self.create_model()
model.objective = Objective(expr=sum([model.w[i] * self.rtn_annualized[i] for i in model.indices]),
sense=maximize)
self._solver.solve(model)
self.debug_solve_result(model)
max_rtn = self.calc_port_rtn(model)
max_var = self.calc_port_var(model)
minCVaR_whenMaxR = self.calc_port_cvar(model)
logger.debug({
'max_rtn': max_rtn,
'max_var': max_var,
'minCVaR_whenMaxR': minCVaR_whenMaxR,
})
return max_rtn, max_var, minCVaR_whenMaxR
def solve_min_rtn(self):
model = self.create_model()
model.objective = Objective(
expr=sum([model.w[i] * model.w[j] * self.sigma.iloc[i, j] for i in model.indices for j in model.indices]),
sense=minimize)
self._solver.solve(model)
self.debug_solve_result(model)
min_rtn = self.calc_port_rtn(model)
min_var = self.calc_port_var(model)
maxCVaR_whenMinV = self.calc_port_cvar(model)
logger.debug({
'min_rtn': min_rtn,
'min_var': min_var,
'maxCVaR_whenMinV': maxCVaR_whenMinV,
})
return min_rtn, min_var, maxCVaR_whenMinV
def solve_mpt(self, min_rtn, max_rtn):
logger.debug(f'...... ...... ...... ...... ...... ...... ...... ...... '
f'MPT ... sub risk : pct_value = {self.quantile}')
big_y = min_rtn + self.quantile * (max_rtn - min_rtn)
logger.debug(f'big_Y = target_Return = {big_y}')
model = self.create_model()
model.cons_rtn = Constraint(expr=sum([model.w[i] * self.rtn_annualized[i] for i in model.indices]) >= big_y)
model.objective = Objective(
expr=sum([model.w[i] * model.w[j] * self.sigma.iloc[i, j] for i in model.indices for j in model.indices]),
sense=minimize)
result = self._solver.solve(model)
if result.solver.termination_condition == TerminationCondition.infeasible:
logger.debug('...... MPT: Infeasible Optimization Problem.')
return None, None
logger.debug('...... MPT: Has solution.')
self.debug_solve_result(model)
return self.calc_port_weight(model), self.calc_port_cvar(model)
def solve_poem(self, min_rtn, max_rtn, base_cvar, max_cvar):
k_history = len(self.rtn_history)
quantile = self.quantile
logger.debug(f'...... ...... ...... ...... ...... ...... ...... ...... '
f'POEM With CVaR constraints ... sub risk : pct_value = {quantile}')
big_y = min_rtn + quantile * (max_rtn - min_rtn)
small_y = base_cvar + (max_cvar - base_cvar) * self.get_config('poem.cvar-scale-factor') * quantile
logger.debug(f'big_Y = target_Return = {big_y} | small_y = target_cvar = {small_y}')
model = self.create_model()
model.alpha = Var(domain=Reals)
model.x = Var(range(k_history), domain=NonNegativeReals)
model.cons_cvar_aux = Constraint(range(k_history), rule=lambda m, k: m.x[k] >= m.alpha - sum(
[m.w[i] * self.rtn_history[k][i] for i in m.indices]))
model.cons_rtn = Constraint(expr=sum([model.w[i] * self.rtn_annualized[i] for i in model.indices]) >= big_y)
model.cons_cvar = Constraint(
expr=model.alpha - (1 / self.k_beta) * sum([model.x[k] for k in range(k_history)]) >= small_y)
result = self._solver.solve(model)
if result.solver.termination_condition == TerminationCondition.infeasible:
logger.debug('...... POEM: Infeasible Optimization Problem.')
return None, None
logger.debug('...... POEM: Has solution.')
self.debug_solve_result(model)
return self.calc_port_weight(model), self.calc_port_cvar(model)
def calc_port_weight(self, model):
id_list = self.navs.columns
weight_list = []
for i in model.indices:
weight_list.append(model.w[i]._value * model.z[i]._value)
df_w = pd.DataFrame(data=weight_list, index=id_list, columns=['weight'])
df_w.replace(0, NAN, inplace=True)
df_w.dropna(axis=0, inplace=True)
df_w['weight'] = self.format_weight(df_w['weight'])
dict_w = df_w.to_dict()['weight']
return dict_w
@staticmethod
def format_weight(weight_series):
weight_series = weight_series.fillna(0)
minidx = weight_series[weight_series > 0].idxmin()
maxidx = weight_series.idxmax()
weight_series = weight_series.apply(lambda x: round(x, 2))
if weight_series.sum() < 1:
weight_series[minidx] += 1 - weight_series.sum()
elif weight_series.sum() > 1:
weight_series[maxidx] += 1 - weight_series.sum()
return weight_series.apply(lambda x: round(float(x), 2))
def calc_port_rtn(self, model):
return sum([model.w[i]._value * self.rtn_annualized[i] for i in model.indices])
def calc_port_var(self, model):
return sum([model.w[i]._value * model.w[j]._value * self.sigma.iloc[i, j] for i in model.indices for j in
model.indices])
def calc_port_cvar(self, model):
port_r_hist = []
for k in range(len(self.rtn_history)):
port_r_hist.append(
sum([model.w[i]._value * model.z[i]._value * self.rtn_history[k][i] for i in model.indices]))
port_r_hist.sort()
return sum(port_r_hist[0: self.k_beta]) / self.k_beta
def create_model(self):
count = self.get_config('asset-count')
min_count = count[0] if isinstance(count, list) else count
max_count = count[1] if isinstance(count, list) else count
low_weight = self.get_config('mpt.low-weight')
high_weight = self.get_config('mpt.high-weight')
if isinstance(high_weight, list):
high_weight = high_weight[min(len(self.navs.columns), min_count, len(high_weight)) - 1]
model = ConcreteModel()
model.indices = range(0, len(self.navs.columns))
model.w = Var(model.indices, domain=NonNegativeReals)
model.z = Var(model.indices, domain=Binary)
model.cons_sum_weight = Constraint(expr=sum([model.w[i] for i in model.indices]) == 1)
model.cons_num_asset = Constraint(
expr=inequality(min_count, sum([model.z[i] for i in model.indices]), max_count, strict=False))
model.cons_bounds_low = Constraint(model.indices, rule=lambda m, i: m.z[i] * low_weight <= m.w[i])
model.cons_bounds_up = Constraint(model.indices, rule=lambda m, i: m.z[i] * high_weight >= m.w[i])
return model
def reset_navs(self, day):
asset_ids = self._assets.get_pool(day)
asset_risk = self.get_config('navs.risk')
datum = self._datum.get_fund_datums(fund_ids=asset_ids, risk=asset_risk)
exclude = self.get_config('navs.exclude-asset-type') or []
asset_ids = list(set(asset_ids) & set([x['id'] for x in datum if x['assetType'] not in exclude]))
min_date = day - relativedelta(months=self.get_config('navs.months'))
navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=asset_ids, max_date=day, min_date=min_date))
navs['nav_date'] = pd.to_datetime(navs['nav_date'])
navs = navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
navs = navs.sort_index()
navs_nan = navs.isna().sum()
navs.drop(columns=[x for x in navs_nan.index if navs_nan.loc[x] >= self.get_config('navs.max-nan.asset')],
inplace=True)
navs_nan = navs.apply(lambda r: r.isna().sum() / len(r), axis=1)
navs.drop(index=[x for x in navs_nan.index if navs_nan.loc[x] >= self.get_config('navs.max-nan.day')],
inplace=True)
navs.fillna(method='ffill', inplace=True)
self.__navs = navs
def get_config(self, name):
def load_config(config):
for key in name.split('.'):
if key in config:
config = config[key]
else:
return None
return config
value = load_config(self._config[self.type.value] if self.type is not PortfoliosType.NORMAL else self._config)
if value is None:
value = load_config(self._config)
return value[f'ft{self.risk.value}'] if value and isinstance(value, dict) else value
def debug_solve_result(self, model):
if logger.isEnabledFor(DEBUG):
logger.debug('===============================')
logger.debug('solution: id | w(id)')
w_sum = 0
for i in model.indices:
if model.z[i]._value == 1:
logger.debug(f'{self.navs.columns[i]} | {model.w[i]._value}')
w_sum += model.w[i]._value
logger.debug(f'w_sum = {w_sum}')
logger.debug({
'beta': self.beta,
'kbeta': self.k_beta,
'port_R': self.calc_port_rtn(model),
'port_V': self.calc_port_cvar(model),
'port_CVaR': self.calc_port_cvar(model)
})
logger.debug('-------------------------------')
@component(bean_name='mpt')
class MptPortfoliosBuilder(PortfoliosBuilder):
@autowired
def __init__(self, assets: AssetPool = None, navs: Navs = None, datum: Datum = None):
def __init__(self, assets: AssetPool = None, navs: Navs = None, datum: Datum = None, factory: SolverFactory = None):
self._assets = assets
self._navs = navs
self._datum = datum
self._factory = factory
def get_portfolios(self, day, risk: PortfoliosRisk, type: PortfoliosType = PortfoliosType.NORMAL):
portfolio = rmp.get_one(day, type, risk)
......@@ -297,7 +37,7 @@ class MptPortfoliosBuilder(PortfoliosBuilder):
for risk in PortfoliosRisk:
logger.info(
f"start build protfolio of type[{type.name}] and risk[{risk.name}] with date[{format_date(day)}]")
solver = MptSolver(risk, type)
solver = self._factory.create_solver(risk, type)
solver.reset_navs(day)
logger.debug({
'Khist': len(solver.rtn_history),
......@@ -333,7 +73,7 @@ class PoemPortfoliosBuilder(MptPortfoliosBuilder):
for risk in PortfoliosRisk:
if result[risk]['solve'] is SolveType.INFEASIBLE:
continue
solver = MptSolver(risk, type)
solver = self._factory.create_solver(risk, type)
solver.reset_navs(day)
min_rtn = detail[risk]['min_rtn']
max_rtn = detail[risk]['max_rtn']
......
import os
import sys
from logging import DEBUG
import pandas as pd
from dateutil.relativedelta import relativedelta
from numpy import NAN
from pyomo.environ import *
from api import SolverFactory as Factory, PortfoliosRisk, PortfoliosType, AssetPool, Navs, Solver, Datum
from framework import component, autowired, get_config, get_logger
logger = get_logger(__name__)
def create_solver():
if sys.platform.find('win') == 0:
executor = 'bonmin.exe'
elif sys.platform == 'linux':
executor = 'bonmin_linux'
else:
executor = 'bonmin_mac'
return SolverFactory('Bonmin', executable=os.path.join(os.path.dirname(__file__), executor))
@component
class DefaultFactory(Factory):
def create_solver(self, risk: PortfoliosRisk, type: PortfoliosType = PortfoliosType.NORMAL) -> Solver:
return DefaultSolver(risk, type)
class DefaultSolver(Solver):
@autowired
def __init__(self, risk: PortfoliosRisk, type: PortfoliosType, assets: AssetPool = None, navs: Navs = None,
datum: Datum = None):
self.__navs = None
self.risk = risk
self.type = type or PortfoliosType.NORMAL
self._assets = assets
self._navs = navs
self._datum = datum
self._config = get_config(__name__)
self._solver = create_solver()
self._solver.options['tol'] = float(self.get_config('tol') or 1E-10)
@property
def navs(self):
return self.__navs
@property
def rtn_matrix(self):
result = self.navs / self.navs.shift(self.get_config('matrix-rtn-days')) - 1
result.dropna(inplace=True)
return result
@property
def rtn_annualized(self):
return list(self.rtn_matrix.mean() * 12)
@property
def sigma(self):
rtn = (self.navs / self.navs.shift(1) - 1)[1:]
return rtn.cov() * 252
@property
def rtn_history(self):
result = self.rtn_matrix * 12
return result.values
@property
def beta(self):
return self.get_config('mpt.cvar-beta')
@property
def k_beta(self):
return round(len(self.rtn_history) * self.beta + 0.499999)
@property
def quantile(self):
return self.get_config('mpt.quantile')
def solve_max_rtn(self):
model = self.create_model()
model.objective = Objective(expr=sum([model.w[i] * self.rtn_annualized[i] for i in model.indices]),
sense=maximize)
self._solver.solve(model)
self.debug_solve_result(model)
max_rtn = self.calc_port_rtn(model)
max_var = self.calc_port_var(model)
minCVaR_whenMaxR = self.calc_port_cvar(model)
logger.debug({
'max_rtn': max_rtn,
'max_var': max_var,
'minCVaR_whenMaxR': minCVaR_whenMaxR,
})
return max_rtn, max_var, minCVaR_whenMaxR
def solve_min_rtn(self):
model = self.create_model()
model.objective = Objective(
expr=sum([model.w[i] * model.w[j] * self.sigma.iloc[i, j] for i in model.indices for j in model.indices]),
sense=minimize)
self._solver.solve(model)
self.debug_solve_result(model)
min_rtn = self.calc_port_rtn(model)
min_var = self.calc_port_var(model)
maxCVaR_whenMinV = self.calc_port_cvar(model)
logger.debug({
'min_rtn': min_rtn,
'min_var': min_var,
'maxCVaR_whenMinV': maxCVaR_whenMinV,
})
return min_rtn, min_var, maxCVaR_whenMinV
def solve_mpt(self, min_rtn, max_rtn):
logger.debug(f'...... ...... ...... ...... ...... ...... ...... ...... '
f'MPT ... sub risk : pct_value = {self.quantile}')
big_y = min_rtn + self.quantile * (max_rtn - min_rtn)
logger.debug(f'big_Y = target_Return = {big_y}')
model = self.create_model()
model.cons_rtn = Constraint(expr=sum([model.w[i] * self.rtn_annualized[i] for i in model.indices]) >= big_y)
model.objective = Objective(
expr=sum([model.w[i] * model.w[j] * self.sigma.iloc[i, j] for i in model.indices for j in model.indices]),
sense=minimize)
result = self._solver.solve(model)
if result.solver.termination_condition == TerminationCondition.infeasible:
logger.debug('...... MPT: Infeasible Optimization Problem.')
return None, None
logger.debug('...... MPT: Has solution.')
self.debug_solve_result(model)
return self.calc_port_weight(model), self.calc_port_cvar(model)
def solve_poem(self, min_rtn, max_rtn, base_cvar, max_cvar):
k_history = len(self.rtn_history)
quantile = self.quantile
logger.debug(f'...... ...... ...... ...... ...... ...... ...... ...... '
f'POEM With CVaR constraints ... sub risk : pct_value = {quantile}')
big_y = min_rtn + quantile * (max_rtn - min_rtn)
small_y = base_cvar + (max_cvar - base_cvar) * self.get_config('poem.cvar-scale-factor') * quantile
logger.debug(f'big_Y = target_Return = {big_y} | small_y = target_cvar = {small_y}')
model = self.create_model()
model.alpha = Var(domain=Reals)
model.x = Var(range(k_history), domain=NonNegativeReals)
model.cons_cvar_aux = Constraint(range(k_history), rule=lambda m, k: m.x[k] >= m.alpha - sum(
[m.w[i] * self.rtn_history[k][i] for i in m.indices]))
model.cons_rtn = Constraint(expr=sum([model.w[i] * self.rtn_annualized[i] for i in model.indices]) >= big_y)
model.cons_cvar = Constraint(
expr=model.alpha - (1 / self.k_beta) * sum([model.x[k] for k in range(k_history)]) >= small_y)
result = self._solver.solve(model)
if result.solver.termination_condition == TerminationCondition.infeasible:
logger.debug('...... POEM: Infeasible Optimization Problem.')
return None, None
logger.debug('...... POEM: Has solution.')
self.debug_solve_result(model)
return self.calc_port_weight(model), self.calc_port_cvar(model)
def calc_port_weight(self, model):
id_list = self.navs.columns
weight_list = []
for i in model.indices:
weight_list.append(model.w[i]._value * model.z[i]._value)
df_w = pd.DataFrame(data=weight_list, index=id_list, columns=['weight'])
df_w.replace(0, NAN, inplace=True)
df_w.dropna(axis=0, inplace=True)
df_w['weight'] = self.format_weight(df_w['weight'])
dict_w = df_w.to_dict()['weight']
return dict_w
@staticmethod
def format_weight(weight_series):
weight_series = weight_series.fillna(0)
minidx = weight_series[weight_series > 0].idxmin()
maxidx = weight_series.idxmax()
weight_series = weight_series.apply(lambda x: round(x, 2))
if weight_series.sum() < 1:
weight_series[minidx] += 1 - weight_series.sum()
elif weight_series.sum() > 1:
weight_series[maxidx] += 1 - weight_series.sum()
return weight_series.apply(lambda x: round(float(x), 2))
def calc_port_rtn(self, model):
return sum([model.w[i]._value * self.rtn_annualized[i] for i in model.indices])
def calc_port_var(self, model):
return sum([model.w[i]._value * model.w[j]._value * self.sigma.iloc[i, j] for i in model.indices for j in
model.indices])
def calc_port_cvar(self, model):
port_r_hist = []
for k in range(len(self.rtn_history)):
port_r_hist.append(
sum([model.w[i]._value * model.z[i]._value * self.rtn_history[k][i] for i in model.indices]))
port_r_hist.sort()
return sum(port_r_hist[0: self.k_beta]) / self.k_beta
def create_model(self):
count = self.get_config('asset-count')
min_count = count[0] if isinstance(count, list) else count
max_count = count[1] if isinstance(count, list) else count
low_weight = self.get_config('mpt.low-weight')
high_weight = self.get_config('mpt.high-weight')
if isinstance(high_weight, list):
high_weight = high_weight[min(len(self.navs.columns), min_count, len(high_weight)) - 1]
model = ConcreteModel()
model.indices = range(0, len(self.navs.columns))
model.w = Var(model.indices, domain=NonNegativeReals)
model.z = Var(model.indices, domain=Binary)
model.cons_sum_weight = Constraint(expr=sum([model.w[i] for i in model.indices]) == 1)
model.cons_num_asset = Constraint(
expr=inequality(min_count, sum([model.z[i] for i in model.indices]), max_count, strict=False))
model.cons_bounds_low = Constraint(model.indices, rule=lambda m, i: m.z[i] * low_weight <= m.w[i])
model.cons_bounds_up = Constraint(model.indices, rule=lambda m, i: m.z[i] * high_weight >= m.w[i])
return model
def reset_navs(self, day):
asset_ids = self._assets.get_pool(day)
asset_risk = self.get_config('navs.risk')
datum = self._datum.get_fund_datums(fund_ids=asset_ids, risk=asset_risk)
exclude = self.get_config('navs.exclude-asset-type') or []
asset_ids = list(set(asset_ids) & set([x['id'] for x in datum if x['assetType'] not in exclude]))
min_date = day - relativedelta(months=self.get_config('navs.months'))
navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=asset_ids, max_date=day, min_date=min_date))
navs['nav_date'] = pd.to_datetime(navs['nav_date'])
navs = navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
navs = navs.sort_index()
navs_nan = navs.isna().sum()
navs.drop(columns=[x for x in navs_nan.index if navs_nan.loc[x] >= self.get_config('navs.max-nan.asset')],
inplace=True)
navs_nan = navs.apply(lambda r: r.isna().sum() / len(r), axis=1)
navs.drop(index=[x for x in navs_nan.index if navs_nan.loc[x] >= self.get_config('navs.max-nan.day')],
inplace=True)
navs.fillna(method='ffill', inplace=True)
self.__navs = navs
def get_config(self, name):
def load_config(config):
for key in name.split('.'):
if key in config:
config = config[key]
else:
return None
return config
value = load_config(self._config[self.type.value] if self.type is not PortfoliosType.NORMAL else self._config)
if value is None:
value = load_config(self._config)
return value[f'ft{self.risk.value}'] if value and isinstance(value, dict) else value
def debug_solve_result(self, model):
if logger.isEnabledFor(DEBUG):
logger.debug('===============================')
logger.debug('solution: id | w(id)')
w_sum = 0
for i in model.indices:
if model.z[i]._value == 1:
logger.debug(f'{self.navs.columns[i]} | {model.w[i]._value}')
w_sum += model.w[i]._value
logger.debug(f'w_sum = {w_sum}')
logger.debug({
'beta': self.beta,
'kbeta': self.k_beta,
'port_R': self.calc_port_rtn(model),
'port_V': self.calc_port_cvar(model),
'port_CVaR': self.calc_port_cvar(model)
})
logger.debug('-------------------------------')
import unittest
from framework import autowired, parse_date, get_logger
from api import PortfoliosBuilder, PortfoliosType
from api import PortfoliosBuilder, PortfoliosType, PortfoliosRisk
class PortfoliosTest(unittest.TestCase):
......@@ -8,14 +8,19 @@ class PortfoliosTest(unittest.TestCase):
logger = get_logger(__name__)
@autowired(names={'builder': 'poem'})
def test_poem_portfolio_builder(self, builder: PortfoliosBuilder = None):
result, detail = builder.build_portfolio(parse_date('2011-11-07'), PortfoliosType.NORMAL)
def test_poem_build_portfolio(self, builder: PortfoliosBuilder = None):
result, detail = builder.build_portfolio(parse_date('2022-11-07'), PortfoliosType.NORMAL)
self.logger.info("portfolios: ")
for risk, portfolio in result.items():
self.logger.info(risk.name)
self.logger.info(portfolio)
self.logger.info(detail[risk])
@autowired(names={'builder': 'poem'})
def test_poem_get_portfolio(self, builder: PortfoliosBuilder = None):
portfolio = builder.get_portfolios(parse_date('2022-11-07'), PortfoliosRisk.FT9)
self.logger.info(portfolio)
if __name__ == '__main__':
unittest.main()
import unittest
class MyTestCase(unittest.TestCase):
def test_something(self):
self.assertEqual(True, True) # add assertion here
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment