Commit 4fb82792 authored by jichao's avatar jichao

依赖注入实现中

parent ce024def
......@@ -5,6 +5,7 @@ from enum import Enum, unique
@unique
class DatumType(Enum):
FUND = 'FUND'
INDEX = 'INDEX'
@unique
......@@ -32,6 +33,7 @@ class PortfoliosType(Enum):
CRISIS_2 = 'crisis_2'
RIGHT_SIDE = 'right_side'
NORMAL = 'normal'
CUSTOM = 'custom'
@unique
......@@ -41,12 +43,15 @@ class SolveType(Enum):
POEM = 2
class BusinessException(Exception):
def __init__(self, msg):
self.__msg = msg
def __str__(self):
return self.__msg
@unique
class SignalType(Enum):
INIT = 0
CRISIS_EXP = 1
CRISIS_ONE = 2
CRISIS_TWO = 3
MARKET_RIGHT = 4
HIGH_BUY = 5
LOW_BUY = 6
class Datum(ABC):
......@@ -65,6 +70,16 @@ class Datum(ABC):
'''
pass
@abstractmethod
def get_index_datums(self, ticker=None, index_ids=None):
'''
获取指标资料数据
:param ticker: 指标的彭博ticker
:param index_ids: 指标id列表
:return: 即表资料信息
'''
pass
class Navs(ABC):
'''
......@@ -82,6 +97,30 @@ class Navs(ABC):
'''
pass
@abstractmethod
def get_index_close(self, datum_ids=None, min_date=None, max_date=None, ticker=None):
'''
获取指标收盘价
:param datum_ids: 指标资料id
:param min_date: 起始时间
:param max_date: 截止时间
:param ticker: 指标资料ticker
:return: 指标收盘价信息
'''
pass
@abstractmethod
def get_last_index_close(self, max_date, datum_id=None, ticker=None, count=1):
'''
获取指定资料或ticker,指定日期之前最后count个收盘价,当指定datum_id后,ticker参数无效
:param max_date: 指定日期
:param datum_id: 指标id,只能指定一个
:param ticker: 指标ticker,只能指定一个,当指标id有值后,该参数无效
:param count: 指定要返回数据的个数
:return: 如果存在,则返回指定日期最后count个收盘价,否则返回None
'''
pass
class AssetOptimize(ABC):
'''
......@@ -175,10 +214,11 @@ class PortfoliosBuilder(ABC):
pass
class RebalanceSignal(ABC):
class SignalBuilder(ABC):
'''
控制信号,发起是否调仓服务
'''
@abstractmethod
def do_signal(self, day, risk: PortfoliosRisk):
def get_signal(self, day, risk: PortfoliosRisk):
pass
......@@ -6,7 +6,7 @@ import pandas as pd
from dateutil.relativedelta import relativedelta
from empyrical import sortino_ratio
from api import AssetOptimize, Navs, BusinessException, Datum, AssetPoolType
from api import AssetOptimize, Navs, Datum, AssetPoolType
from asset_pool.dao import robo_assets_pool as rop
from framework import filter_weekend, dict_remove, get_config, component, autowired, get_quarter_start
......@@ -21,8 +21,7 @@ class SortinoAssetOptimize(AssetOptimize, ABC):
} for x in optimize_config['sortino-weight']] if 'sortino-weight' in optimize_config else []
def find_optimize(self, fund_ids, day):
if not self._config:
raise BusinessException(f"find optimize, but not found sortino config.")
assert self._config, "find optimize, but not found sortino config."
pct_change = pd.DataFrame(self.get_pct_change(fund_ids, day))
pct_change.set_index('date', inplace=True)
sortino = pd.DataFrame()
......
......@@ -34,7 +34,7 @@ class CvarEwmaAssetRisk(AssetRisk):
def get_risk_pool(self, day):
asset_pool = rap.get_one(day, AssetPoolType.RISK)
if not asset_pool:
result = block_execute(self.is_risk, {x['id']: (x['id'], day) for x in self._datum.get_fund_datums()})
result = block_execute(self.is_risk, {x['id']: (x['id'], day) for x in self._datum.get_fund_datums(risk=(3, 4, 5))})
risk_ids = [x[0] for x in result.items() if x[1]]
rap.insert(day, AssetPoolType.RISK, risk_ids)
asset_pool = rap.get_one(day, AssetPoolType.RISK)
......@@ -85,8 +85,7 @@ class CvarEwmaAssetRisk(AssetRisk):
if row['rtn'] < rtns[rtns.date == cvar_start_date].iloc[0].rtn:
# 当日回报率跌破最低点, 则直接触发
tigger = True
elif row['rtn'] <= self._config['cvar']['threshold'] and len(cvar_rtns) >= self._config['cvar'][
'min-volume']:
elif row['rtn'] <= self._config['cvar']['threshold'] and len(cvar_rtns) >= self._config['cvar']['min-volume']:
# 当日回报率小于等于阀值并且有足够cvar累计计算数据,则计算cvar判断
alpha = 1 - self._config['cvar']['coef']
mean = cvar_rtns['rtn'].mean()
......
......@@ -52,5 +52,25 @@ CREATE TABLE IF NOT EXISTS `robo_exrate`
INDEX (`re_date`)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT '基金数据表';
DEFAULT CHARSET = utf8mb4 COMMENT '汇率数据表';
CREATE TABLE IF NOT EXISTS `robo_index_datas`
(
`rid_index_id` BIGINT UNSIGNED NOT NULL COMMENT '指标id',
`rid_date` DATETIME NOT NULL COMMENT '指标数据日期',
`rid_high` DOUBLE DEFAULT NULL COMMENT '最高价',
`rid_open` DOUBLE DEFAULT NULL COMMENT '最高价',
`rid_low` DOUBLE DEFAULT NULL COMMENT '最高价',
`rid_close` DOUBLE NOT NULL COMMENT '收盘价',
`rid_pe` DOUBLE DEFAULT NULL COMMENT '市盈率',
`rid_volume` DOUBLE DEFAULT NULL COMMENT '成交量',
`rid_create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`rid_update_time` DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`rid_index_id`, `rid_date`),
INDEX (`rid_date`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT '指标数据表';
......@@ -3,11 +3,12 @@ from framework import read, where, to_tuple
@read
def get_base_datums(type: DatumType = None, crncy=None, risk=None, fund_ids=None):
def get_base_datums(type: DatumType = None, crncy=None, risk=None, datum_ids=None, ticker=None):
kwargs = {
'rbd_id': to_tuple(fund_ids),
'rbd_id': to_tuple(datum_ids),
'v_rbd_type': type,
'v_rbd_crncy': crncy,
'v_rbd_risk': risk
'v_rbd_risk': risk,
'v_rbd_bloomberg_ticker': to_tuple(ticker)
}
return f'''select rbd_id as id, rbd_datas as datas from robo_base_datum {where(**kwargs)}'''
......@@ -14,4 +14,7 @@ def get_navs(fund_id=None, min_date=None, max_date=None):
sqls.append(f"rfn_date >= '{format_date(min_date)}'")
if max_date:
sqls.append(f"rfn_date <= '{format_date(max_date)}'")
return f'''select {','.join([f"`{x[0]}` as `{x[1]}`" for x in __COLUMNS__.items()])} from robo_fund_navs {where(*sqls, rfn_fund_id=to_tuple(fund_id))}'''
return f'''
select {','.join([f"`{x[0]}` as `{x[1]}`" for x in __COLUMNS__.items()])} from robo_fund_navs
{where(*sqls, rfn_fund_id=to_tuple(fund_id))} order by rfn_fund_id, rfn_date
'''
from framework import read, write, format_date, to_tuple, where
import requests
from datetime import datetime
import pandas as pd
__COLUMNS__ = {
'rid_index_id': 'index_id',
'rid_date': 'date',
'rid_high': 'high',
'rid_open': 'open',
'rid_low': 'low',
'rid_close': 'close',
'rid_pe': 'pe',
'rid_volume': 'volume',
}
@write
def insert(datas):
values = ','.join([f'''({','.join([(f"'{x[j]}'" if j in x and x[j] is not None else 'null') for j in __COLUMNS__.keys()])})''' for x in datas])
return f'''insert into robo_index_datas({','.join(__COLUMNS__.keys())}) values {values}'''
@read
def get_list(index_ids=None, min_date=None, max_date=None):
sqls = []
if min_date:
sqls.append(f"rid_date >= '{format_date(min_date)}'")
if max_date:
sqls.append(f"rid_date <= '{format_date(max_date)}'")
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_index_datas
{where(*sqls, rid_index_id=to_tuple(index_ids))} order by rid_index_id, rid_date
'''
@read(one=True)
def get_last_one(index_id, max_date):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_index_datas
{where(f"rid_date <= '{format_date(max_date)}'", rid_index_id=index_id)} order by rid_date desc limit 1
'''
@read
def get_last(index_id, max_date, count=1):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_index_datas
{where(f"rid_date <= '{format_date(max_date)}'", rid_index_id=index_id)} order by rid_date desc limit {count}
'''
......@@ -9,6 +9,11 @@ from framework import component, parse_date
class DefaultDatum(Datum):
def get_fund_datums(self, crncy=None, risk=None, fund_ids=None):
result = rbd.get_base_datums(type=DatumType.FUND, crncy=crncy, risk=risk, fund_ids=fund_ids)
result = rbd.get_base_datums(type=DatumType.FUND, crncy=crncy, risk=risk, datum_ids=fund_ids)
result = [{**json.loads(x['datas']), 'id': x['id']} for x in result]
return [{**x, 'inceptDate': parse_date(x['inceptDate'])} for x in result]
def get_index_datums(self, ticker=None, index_ids=None):
result = rbd.get_base_datums(type=DatumType.INDEX, ticker=ticker, datum_ids=index_ids)
return [{**json.loads(x['datas']), 'id': x['id']} for x in result]
import pandas as pd
from api import Navs, Datum
from basic.dao import robo_exrate as re, robo_fund_navs as rfn
from framework import get_config, component, autowired
from basic.dao import robo_exrate as re, robo_fund_navs as rfn, robo_index_datas as rid
from framework import get_config, component, autowired, to_tuple
@component
......@@ -32,3 +32,34 @@ class DefaultNavs(Navs):
navs.sort_values(by=['fund_id', 'nav_date'], inplace=True)
navs = navs.to_dict('records')
return navs
def get_index_close(self, datum_ids=None, min_date=None, max_date=None, ticker=None):
datum_ids = to_tuple(datum_ids)
if ticker:
datums = self._datum.get_index_datums(ticker=ticker)
datum_ids = tuple(set(list(datum_ids or []) + [x['id'] for x in datums]))
results = rid.get_list(index_ids=datum_ids, min_date=min_date, max_date=max_date)
return [{'index_id': x['index_id'], 'date': x['date'], 'close': x['close']} for x in results]
def get_last_index_close(self, max_date, datum_id=None, ticker=None, count=1):
if not datum_id:
assert ticker, "get last index close, datum_id and ticker give at least one"
datum = self._datum.get_index_datums(ticker=ticker)
datum_id = datum[0]['id'] if datum else None
assert datum_id, "get last index close, datum id is not found"
assert max_date, "get last index close, start_date is not found"
if count == 1:
last = rid.get_last_one(index_id=datum_id, max_date=max_date)
return {
'index_id': last['index_id'],
'date': last['date'],
'close': last['close']
} if last else None
else:
last = rid.get_last(index_id=datum_id, max_date=max_date, count=count)
return [{
'index_id': x['index_id'],
'date': x['date'],
'close': x['close']
} for x in last] if last else None
import unittest
from api import Navs
from framework import autowired, parse_date, get_logger
class BasicTest(unittest.TestCase):
logger = get_logger(__name__)
@autowired
def test_index_close(self, navs: Navs = None):
closes = navs.get_index_close(ticker='SPX Index', min_date=parse_date('2022-11-01'), datum_ids=67)
self.logger.info(closes)
if __name__ == '__main__':
unittest.main()
......@@ -110,6 +110,24 @@ portfolios:
risk: [ 1, 2 ]
mpt:
quantile: 0.1
rebalance:
crisis-signal:
exp-years: 3
exp-init: 2022-03-04
inversion-years: 1
inversion-threshold: 0.3
crisis-1:
mean-count: 850
consecut-days: 5
crisis-2:
negative-growth: 1
fed-months: 3
fed-threshold: 0.75
right-side:
rtn-days: 5
min-threshold: -0.05
coef: 0.95
......@@ -76,23 +76,31 @@ def write(func=None, config=None):
if func is None:
return functools.partial(write, config=config)
def get_result(db, sql, res):
if sql.find('insert into') >= 0:
return db.connect.insert_id()
else:
return res
def execute(db, sqls):
if isinstance(sqls, list):
results = []
for sql in sqls:
db.cursor.execute(sql)
return [x for x in get_result(db, sql, db.cursor.execute(sql))]
else:
db.cursor.execute(sqls)
return get_result(db, sqls, db.cursor.execute(sqls))
@functools.wraps(func)
def wraps(*args, **kwargs):
sqls = func(*args, **kwargs)
if hasattr(__local__, 'db'):
execute(__local__.db, sqls)
return execute(__local__.db, sqls)
else:
with Database(config) as db:
try:
execute(db, sqls)
result = execute(db, sqls)
db.connect.commit()
return result
except Exception as e:
db.connect.rollback()
raise e
......@@ -129,15 +137,18 @@ def where(*args, **kwargs) -> str:
for k, v in kwargs.items():
if isinstance(v, str):
result.append(f"{k} = '{v}'")
elif isinstance(v, bool):
result.append(f"{k} = {1 if v else 0}")
elif isinstance(v, datetime):
result.append(f"{k} = '{format_date(v)}'")
elif isinstance(v, Enum):
result.append(f"{k} = '{v.value}'")
elif isinstance(v, tuple) or isinstance(v, list):
if len(v) > 0:
result.append(f"{k} in {tuple(v)}" if len(v) > 1 else f"{k} = {v[0]}")
v = tuple([(x.value if isinstance(x, Enum) else x) for x in v])
result.append(f"{k} in {v}" if len(v) > 1 else f"{k} = '{v[0]}'")
elif v is not None:
result.append(f"{k} = {v}")
result.append(f"{k} = '{v}'")
if args:
result.extend([x for x in args if x])
return f"where {' and '.join(result)}" if result else ''
......
......@@ -75,7 +75,7 @@ def autowired(func=None, names=None):
target_name = types_config[class_name(p_type.annotation)]
find_cls = [x for x in components if class_name(x) == target_name]
if find_cls:
cls = get_instance(find_cls[0])
cls = find_cls[0]
kwargs[p_name] = get_instance(cls)
func(*args, **kwargs)
......
import datetime
import json
import os
import sys
......@@ -9,7 +10,7 @@ from numpy import NAN
from pyomo.environ import *
from api import PortfoliosBuilder, PortfoliosRisk, AssetPool, Navs, PortfoliosType, Datum, SolveType
from framework import component, autowired, get_config, format_date, get_logger
from framework import component, autowired, get_config, format_date, get_logger, parse_date
from portfolios.dao import robo_mpt_portfolios as rmp
logger = get_logger(__name__)
......@@ -172,7 +173,7 @@ class MptSolver:
weight_series[minidx] += 1 - weight_series.sum()
elif weight_series.sum() > 1:
weight_series[maxidx] += 1 - weight_series.sum()
return weight_series.apply(lambda x: float(x))
return weight_series.apply(lambda x: round(float(x), 2))
def calc_port_rtn(self, model):
return sum([model.w[i]._value * self.rtn_annualized[i] for i in model.indices])
......
from abc import ABC
import pandas as pd
from dateutil.relativedelta import relativedelta
from api import SignalBuilder, PortfoliosRisk, SignalType, Navs, PortfoliosBuilder, PortfoliosType
from framework import get_config, autowired, component
from rebalance.dao import robo_rebalance_signal as rrs
class CrisisSignal(SignalBuilder, ABC):
@autowired
def __init__(self, navs: Navs = None):
self._navs = navs
self._config = get_config(__name__)
@property
def exp_init(self):
return pd.to_datetime(self._config['exp-init']) if 'exp-init' in self._config else None
@property
def exp_years(self):
return self._config['exp-years'] if 'exp-years' in self._config else 1
@property
def inversion_years(self):
return self._config['inversion-years'] if 'inversion-years' in self._config else 1
@property
def inversion_threshold(self):
return self._config['inversion-threshold'] if 'inversion-threshold' in self._config else 1
def get_exp_start_date(self, day, risk: PortfoliosRisk):
assert day, "get crisis exp start date, day can not be none"
assert risk, "get crisis exp start date, PortfoliosRisk can not be none"
exp_date = day - relativedelta(years=self.exp_years)
if self.exp_init and self.exp_init >= exp_date:
return self.exp_init
exp_signal = rrs.get_first_after(type=SignalType.CRISIS_EXP, risk=risk, min_date=exp_date)
if not exp_signal:
inversion_date = day - relativedelta(years=self.inversion_years)
ten_before = self._navs.get_last_index_close(max_date=inversion_date, ticker='USGG10YR Index')
ten_today = self._navs.get_last_index_close(max_date=day, ticker='USGG10YR Index')
two_before = self._navs.get_last_index_close(max_date=inversion_date, ticker='USGG2YR Index')
two_today = self._navs.get_last_index_close(max_date=day, ticker='USGG2YR Index')
if ten_today['close'] - two_today['close'] <= ten_before['close'] - two_before['close'] and \
ten_today['close'] - two_today['close'] <= self.inversion_threshold:
rrs.insert({
'date': day,
'type': SignalType.CRISIS_EXP,
'risk': risk,
})
exp_signal = rrs.get_first_after(type=SignalType.CRISIS_EXP, risk=risk, min_date=exp_date)
return exp_signal['date'] if exp_signal else None
@component(bean_name='crisis_one')
class CrisisOneSignal(CrisisSignal):
@autowired
def __init__(self, builder: PortfoliosBuilder = None):
super(CrisisOneSignal, self).__init__()
self._builder = builder
@property
def consecut_days(self):
return self._config['crisis-1']['consecut-days']
@property
def mean_count(self):
return self._config['crisis-1']['mean-count']
def get_signal(self, day, risk: PortfoliosRisk):
exp_date = self.get_exp_start_date(day, risk)
if exp_date:
crisis_one = rrs.get_first_after(type=SignalType.CRISIS_ONE, risk=risk, min_date=exp_date)
if not crisis_one:
spx = self._navs.get_last_index_close(max_date=day, ticker='SPX Index', count=self.mean_count)
spx_ma850 = pd.DataFrame(spx).close.mean()
if len([x for x in spx[0:5] if x['close'] > spx_ma850]) == 0:
portfolio = self._builder.get_portfolios(day, risk, PortfoliosType.CRISIS_1)
id = rrs.insert({
'date': day,
'type': SignalType.CRISIS_ONE,
'risk': risk,
'portfolio_type': PortfoliosType.CRISIS_1,
'portfolio': portfolio
})
return rrs.get_by_id(id)
return None
@component(bean_name='crisis_two')
class CrisisTwoSignal(CrisisSignal):
@autowired
def __init__(self, builder: PortfoliosBuilder = None):
super(CrisisSignal, self).__init__()
self._builder = builder
@property
def negative_growth_years(self):
return self._config['crisis-2']['negative-growth']
@property
def fed_months(self):
return self._config['crisis-2']['fed-months']
@property
def fed_threshold(self):
return self._config['crisis-2']['fed-threshold']
def get_signal(self, day, risk: PortfoliosRisk):
exp_date = self.get_exp_start_date(day, risk)
if exp_date:
crisis_one = rrs.get_last_one(type=SignalType.CRISIS_ONE, risk=risk, max_date=day, effective=True)
if crisis_one:
return None
crisis_two = rrs.get_first_after(type=SignalType.CRISIS_TWO, risk=risk, min_date=exp_date, effective=True)
if not crisis_two:
ng_date = day - relativedelta(years=self.negative_growth_years)
ten_today = self._navs.get_last_index_close(max_date=day, ticker='USGG10YR Index')
cpi_today = self._navs.get_last_index_close(max_date=day, ticker='CPI YOY Index')
ten_before = self._navs.get_last_index_close(max_date=ng_date, ticker='USGG10YR Index')
cpi_before = self._navs.get_last_index_close(max_date=ng_date, ticker='CPI YOY Index')
before = ten_before['close'] - cpi_before['close']
today = ten_today['close'] - cpi_today['close']
fed_today = self._navs.get_last_index_close(max_date=day, ticker='FDTR_Index')
fed_before = self._navs.get_last_index_close(max_date=day-relativedelta(months=self.fed_months), ticker='FDTR_Index')
if today <= before and fed_today['close'] - fed_before['close'] < -self.fed_threshold:
portfolio = self._builder.get_portfolios(day, risk, PortfoliosType.CRISIS_2)
id = rrs.insert({
'date': day,
'type': SignalType.CRISIS_TWO,
'risk': risk,
'portfolio_type': PortfoliosType.CRISIS_2,
'portfolio': portfolio
})
return rrs.get_by_id(id)
return None
CREATE TABLE IF NOT EXISTS robo_rebalance_signal
(
rrs_id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
rrs_date DATETIME NOT NULL COMMENT '信号日期',
rrs_type TINYINT NOT NULL COMMENT '信号类型',
rrs_risk TINYINT NOT NULL COMMENT '风险等级',
rrs_p_type VARCHAR(255) DEFAULT NULL COMMENT '投组类型',
rrs_p_weight JSON DEFAULT NULL COMMENT '投组信息',
rrs_effective TINYINT NOT NULL DEFAULT 0 COMMENT '是否生效',
rrs_create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
rrs_update_time DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (rrs_id),
INDEX (rrs_date),
INDEX (rrs_type),
INDEX (rrs_risk)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT '再平衡信号表';
from framework import read, write, where, format_date
from api import SignalType, PortfoliosRisk
import json
from datetime import datetime
from enum import Enum
__COLUMNS__ = {
'rrs_id': 'id',
'rrs_date': 'date',
'rrs_type': 'type',
'rrs_risk': 'risk',
'rrs_p_type': 'portfolio_type',
'rrs_p_weight': 'portfolio',
'rrs_effective': 'effective',
}
@read(one=True)
def get_by_id(id):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal {where(rrs_id=id)}'''
@read(one=True)
def get_first_after(type: SignalType, risk: PortfoliosRisk, min_date, effective=None):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
{where(f"rrs_date >= {format_date(min_date)}", rrs_type=type, rrs_risk=risk, rrs_effective=effective)} order by rrs_date limit 1
'''
@read(one=True)
def get_last_one(max_date, risk: PortfoliosRisk, type: SignalType = None, effective=None):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
{where(f"rrs_date <= {format_date(max_date)}", rrs_type=type, rrs_risk=risk, rrs_effective=effective)} order by rrs_date desc limit 1
'''
@write
def insert(datas):
datas = {x[0]: datas[x[1]] for x in __COLUMNS__.items() if x[1] in datas and datas[x[1]] is not None}
datas = {
**datas,
**{x[0]: format_date(x[1]) for x in datas.items() if isinstance(x[1], datetime)},
**{x[0]: x[1].value for x in datas.items() if isinstance(x[1], Enum)},
**{x[0]: json.dumps(x[1]) for x in datas.items() if isinstance(x[1], dict)},
**{x[0]: (1 if x[1] else 0) for x in datas.items() if isinstance(x[1], bool)}
}
return f'''
insert into robo_rebalance_signal({','.join([x for x in datas.keys()])})
values ({','.join([f"'{x[1]}'" for x in datas.items()])})
'''
import pandas as pd
from framework import component, autowired, get_config
from api import SignalBuilder, SignalType, PortfoliosRisk, Navs, PortfoliosBuilder, PortfoliosType
from rebalance.dao import robo_rebalance_signal as rrs
from scipy.stats import norm
@component(bean_name='market-right')
class MarketRight(SignalBuilder):
@autowired
def __init__(self, navs: Navs = None, builder: PortfoliosBuilder = None):
self._navs = navs
self._builder = builder
self._config = get_config(__name__)
@property
def rtn_days(self):
return self._config['rtn-days']
@property
def min_threshold(self):
return self._config['min-threshold']
@property
def coef(self):
return self._config['coef']
def get_signal(self, day, risk: PortfoliosRisk):
last_re = rrs.get_last_one(risk=risk, max_date=day, effective=True)
if last_re is not None and SignalType(last_re['type']) in [SignalType.CRISIS_ONE, SignalType.CRISIS_TWO, SignalType.MARKET_RIGHT]:
return None
spx = self.load_spx_close_rtns(day)
if spx[-1]['rtn'] > self.min_threshold:
return None
cvar = self.get_cvar(day, risk, spx=spx)
if spx[-1]['rtn'] < cvar:
portfolio = self._builder.get_portfolios(day, risk, PortfoliosType.RIGHT_SIDE)
id = rrs.insert({
'date': day,
'type': SignalType.MARKET_RIGHT,
'risk': risk,
'portfolio_type': PortfoliosType.RIGHT_SIDE,
'portfolio': portfolio
})
return rrs.get_by_id(id)
return None
def get_cvar(self, day, risk: PortfoliosRisk, spx = None):
if spx is None:
spx = self.load_spx_close_rtns(day)
start_date = self.find_cvar_start_date(day, risk, spx=spx)
if start_date:
spx = pd.DataFrame(spx)
spx = spx[(pd_spx.date >= start_date) & (spx.date <= day)]
alpha = 1 - self.coef
mean = spx.rtn.mean()
std = spx.rtn.std()
return mean - std * norm.pdf(norm.ppf(alpha)) / alpha
return None
def find_cvar_start_date(self, day, risk: PortfoliosRisk, spx = None):
if spx is None:
spx = self.load_spx_close_rtns(day)
spx = pd.DataFrame(spx)
last_right = rrs.get_last_one(type=(SignalType.MARKET_RIGHT, SignalType.INIT), max_date=day, risk=risk, effective=True)
last_buy = rrs.get_last_one(type=(SignalType.LOW_BUY, SignalType.HIGH_BUY), max_date=day, risk=risk, effective=True)
if not last_buy or not last_right or last_buy['date'] <= last_right['date']:
return None
spx = spx[(spx['date'] >= last_right['date']) & (spx['date'] <= last_buy)]
if not spx.empty and len(spx) > 2:
return spx.loc[spx.close.idxmin()].date
return None
def load_spx_close_rtns(self, day):
spx = pd.DataFrame(self._navs.get_index_close(ticker='SPX Index', max_date=day))
spx.sort_values('date', inplace=True)
spx['rtn'] = spx['close'] / spx['close'].shift(self.rtn_days) - 1
spx.dropna(inplace=True)
spx = spx[['date', 'close', 'rtn']]
return spx.to_dict('records')
import unittest
from api import SignalBuilder, PortfoliosRisk
from framework import autowired, parse_date, get_logger
class RebalanceTest(unittest.TestCase):
logger = get_logger(__name__)
@autowired(names={'builder': 'crisis_one'})
def test_crisis_one(self, builder: SignalBuilder = None):
signal = builder.get_signal(parse_date('2022-10-13'), PortfoliosRisk.FT9)
self.logger.info(signal)
@autowired(names={'builder': 'market-right'})
def test_market_right(self, builder: SignalBuilder = None):
signal = builder.get_signal(parse_date('2022-10-13'), PortfoliosRisk.FT9)
self.logger.info(signal)
if __name__ == '__main__':
unittest.main()
import unittest
class MyTestCase(unittest.TestCase):
def test_something(self):
self.assertEqual(True, True) # add assertion here
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment