Commit 55b338de authored by wenwen.tang's avatar wenwen.tang 😕

定期配息产品,回测

parent 65c7bb8d
from abc import ABC, abstractmethod
from datetime import datetime as dt
from enum import Enum, unique
from typing import List, Dict
from py_jftech import get_config
......@@ -13,30 +12,18 @@ class DatumType(Enum):
ECO = 'ECO'
@unique
class AssetRiskDateType(Enum):
START_DATE = 1
STOP_DATE = 2
@unique
class AssetPoolType(Enum):
OPTIMIZE = 1
RISK = 2
@unique
class PortfoliosRisk(Enum):
FT3 = 3
FT6 = 6
FT9 = 9
@unique
class PortfoliosType(Enum):
CRISIS_1 = 'crisis_1'
CRISIS_2 = 'crisis_2'
RIGHT_SIDE = 'right_side'
NORMAL = 'normal'
CUSTOM = 'custom'
......@@ -48,37 +35,6 @@ class SolveType(Enum):
POEM = 2
@unique
class SignalType(Enum):
NONE = -1
INIT = 0
CRISIS_EXP = 1
CRISIS_ONE = 2
CRISIS_TWO = 3
MARKET_RIGHT = 4
HIGH_BUY = 5
LOW_BUY = 6
DRIFT_BUY = 7
# 信号处理优先级
SignalType.CRISIS_ONE.level = 1
SignalType.CRISIS_TWO.level = 1
SignalType.MARKET_RIGHT.level = 3
SignalType.HIGH_BUY.level = 4
SignalType.LOW_BUY.level = 5
SignalType.DRIFT_BUY.level = 5
SignalType.INIT.level = 6
# 对应需要再平衡的投组类型
SignalType.CRISIS_ONE.p_type = PortfoliosType.CRISIS_1
SignalType.CRISIS_TWO.p_type = PortfoliosType.CRISIS_2
SignalType.MARKET_RIGHT.p_type = PortfoliosType.RIGHT_SIDE
SignalType.HIGH_BUY.p_type = PortfoliosType.NORMAL
SignalType.LOW_BUY.p_type = PortfoliosType.NORMAL
SignalType.DRIFT_BUY.p_type = PortfoliosType.NORMAL
SignalType.INIT.p_type = PortfoliosType.RIGHT_SIDE
class DataSync(ABC):
'''
数据同步服务,需要同步数据的服务,可以实现该接口
......@@ -232,48 +188,6 @@ class AssetOptimize(ABC):
pass
class AssetRisk(ABC):
'''
ewma相关服务
'''
@abstractmethod
def get_risk_pool(self, day):
'''
获取指定日期的风控池
:param day: 指定的日期
:return: 风控id列表
'''
pass
@abstractmethod
def is_risk(self, id, day) -> bool:
'''
判断指定的id,在指定的日期,是处于风控状态
:param id: 指定的资产id
:param day: 指定的日期
:return: 如果处于风控状态则返回True,否则返回False
'''
pass
@abstractmethod
def build_risk_date(self, asset_id, day=dt.today()):
'''
构建指定资产的所有风险时间点
:param asset_id: 指定的资产id
:param day: 构建的截止日期
'''
pass
@abstractmethod
def clear(self, day=None):
'''
清除指定日期之后的资产风控ewma数据,如果没有给日期,则全部清空
:param day: 指定清除的开始日期,可选
'''
pass
class AssetPool(ABC):
'''
资产池相关服务
......@@ -383,6 +297,27 @@ class Solver(ABC):
'''
pass
@abstractmethod
def set_navs(self, navs):
'''
根据指定的navs,重置当前解算器
:param navs: 指定的navs
'''
pass
@abstractmethod
def set_category(self, category):
'''
根据指定的category,重置当前解算器
:param category: 指定的category
'''
pass
@property
@abstractmethod
def category(self):
pass
@property
@abstractmethod
def navs(self):
......@@ -391,6 +326,15 @@ class Solver(ABC):
'''
pass
@property
@abstractmethod
def transfer_type(self):
"""
得出调仓类型
@return:
"""
pass
class SolverFactory(ABC):
'''
......@@ -462,15 +406,6 @@ class PortfoliosHolder(ABC):
'''
pass
@abstractmethod
def get_rebalance_date_by_signal(self, signal_id):
'''
获取指定调仓信号触发的实际调仓日期
:param signal_id: 指定的调仓信号
:return: 实际调仓日期
'''
pass
@property
@abstractmethod
def interval_days(self):
......@@ -490,92 +425,6 @@ class PortfoliosHolder(ABC):
pass
class DriftSolver(ABC):
'''
漂移解算器
'''
@abstractmethod
def get_drift(self, day, risk: PortfoliosRisk):
'''
获取指定日期,指定风险等级的漂移计算结果
:param day: 指定日期
:param risk: 指定风险等级
:return: 漂移计算结果
'''
pass
class RebalanceSignal(ABC):
'''
控制信号,发起是否调仓服务
'''
@abstractmethod
def get_signal(self, day, risk: PortfoliosRisk):
pass
@property
@abstractmethod
def signal_type(self) -> SignalType:
'''
返回信号类型
:return: 信号类型
'''
pass
class RebalanceRuler(ABC):
'''
再平衡信号分配器,根据既定的规则,再众多信号中,选出进行再平衡的信号
'''
@abstractmethod
def take_next_signal(self, day, risk: PortfoliosRisk):
'''
取出指定日期,指定风险等级的再平衡信号数据,注意取出消费后,无法退回,非幂等函数
:param day: 指定日期
:param risk: 指定风险等级
:return: 如果存在,则返回取出的再平衡信号信息,否则返回None
'''
pass
@abstractmethod
def commit_signal(self, sign_id):
'''
提交信号ID为已消费状态
:param sign_id: 信号ID
'''
pass
@abstractmethod
def get_signal_type(self, sign_id) -> SignalType | Dict[int, SignalType]:
'''
获取指定id的信号类型
:param sign_id: 信号id, 可以多个,使用元祖包裹
:return: 信号类型
'''
pass
@abstractmethod
def get_signal_date(self, sign_id):
'''
获取指定id的信号日期
:param sign_id: 信号id, 可以多个,使用元祖包裹
:return: 信号日期
'''
pass
@abstractmethod
def clear_signal(self, day=None, risk: PortfoliosRisk = None):
'''
清除指定风险等级,指定日期之后的调仓信号
:param day: 指定清除的开始日期,可选,如果没给,则清除全部日期
:param risk: 指定风险等级,如果没给,则清除全部风险等级
'''
pass
class RoboExecutor(ABC):
'''
ROBO执行器,整合以上逻辑,进行实盘或回测
......@@ -599,43 +448,3 @@ class RoboExecutor(ABC):
@staticmethod
def use_name():
return get_config('robo-executor')['use']
class RoboReportor(ABC):
'''
投组报告器
'''
@property
@abstractmethod
def report_name(self) -> str:
'''
返回报告名称
:return: 报告名称
'''
pass
@abstractmethod
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
'''
获取指定日期的报告
:param max_date: 指定截止日期
:param min_date: 指定开始日期
:return: 报告数据
'''
pass
class RoboExportor(ABC):
'''
投组导出器
'''
@abstractmethod
def export(self, max_date=dt.today(), min_date=None):
'''
根据参数以及配置信息执行导出相关操作
:param max_date: 指定截止日期
:param min_date: 指定开始日期
'''
pass
import json
from abc import ABC, abstractmethod
from datetime import datetime as dt, timedelta
from sys import exception
import pandas as pd
from dateutil.relativedelta import relativedelta
from empyrical import sortino_ratio
from py_jftech import filter_weekend, dict_remove, get_config, component, autowired, get_quarter_start, next_workday, is_workday
from py_jftech import filter_weekend, dict_remove, get_config, component, autowired, next_workday, \
is_workday
from api import AssetOptimize, Navs, Datum, AssetPoolType, DatumType
from asset_pool.dao import robo_assets_pool as rop
......@@ -30,41 +31,9 @@ class SortinoAssetOptimize(AssetOptimize, ABC):
return result
def find_optimize(self, fund_ids, day):
assert self._config, "find optimize, but not found sortino config."
pct_change = pd.DataFrame(self.get_pct_change(fund_ids, day))
pct_change.set_index('date', inplace=True)
sortino = pd.DataFrame()
for item in self._config:
ratio = dict(sortino_ratio(pct_change.truncate(before=(day - relativedelta(**dict_remove(item, ('weight', 'name')))))))
sortino = pd.concat([sortino, pd.DataFrame([ratio], index=[item['name']])])
sortino = sortino.T
sortino['score'] = sortino.apply(lambda r: sum([x['weight'] * r[x['name']] for x in self._config]), axis=1)
sortino.sort_values('score', ascending=False, inplace=True)
return pct_change.columns[sortino.index[0]]
pass
def get_optimize_pool(self, day):
opt_pool = rop.get_one(day=day, type=AssetPoolType.OPTIMIZE)
if opt_pool is not None:
return json.loads(opt_pool['asset_ids'])
last_one = rop.get_last_one(day=day, type=AssetPoolType.OPTIMIZE)
start = get_quarter_start(day or dt.today())
if not last_one or start > last_one['date'] or self.has_incept_asset(last_one['date'] + timedelta(1), day) or self.has_change(day):
pool = []
min_dates = self.nav_min_dates
max_incept_date = sorted([(day - relativedelta(**x)) for x in self.delta_kwargs])[0]
max_incept_date = max_incept_date if is_workday(max_incept_date) else next_workday(max_incept_date)
for fund_group in self.get_groups():
fund_group = [x for x in fund_group if min_dates[x] <= max_incept_date]
if len(fund_group) > 1:
pool.append(self.find_optimize(tuple(fund_group), day))
elif len(fund_group) == 1:
pool.append(fund_group[0])
rop.insert(day, AssetPoolType.OPTIMIZE, sorted(pool))
last_one = rop.get_last_one(day=day, type=AssetPoolType.OPTIMIZE)
return json.loads(last_one['asset_ids'])
@abstractmethod
def has_incept_asset(self, start_date, end_date):
pass
@property
......@@ -94,41 +63,81 @@ class SortinoAssetOptimize(AssetOptimize, ABC):
return False
@component
class FundSortinoAssetOptimize(SortinoAssetOptimize):
'''
@component(bean_name='dividend')
class FundDividendSortinoAssetOptimize(SortinoAssetOptimize):
"""
根据索提诺比率计算基金优选的优选实现
'''
以美国资产为主:US_STOCK、US_HY_BOND、US_IG_BOND
Sortino ratio对资产进行排序,选出排名靠前的资产(非一类选一只)
"""
@autowired
def __init__(self, navs: Navs = None, datum: Datum = None):
super().__init__()
self._navs = navs
self._datum = datum
self._conf = get_config(__name__)
@property
def asset_include(self):
return self._conf['asset-include']
@property
def optimize_count(self):
return self._conf['optimize-count']
@property
def nav_min_dates(self) -> dict:
return self._navs.get_nav_start_date()
def has_incept_asset(self, start_date, end_date):
start_date = sorted([(start_date - relativedelta(**x)) for x in self.delta_kwargs])[0]
end_date = sorted([(end_date - relativedelta(**x)) for x in self.delta_kwargs])[0]
return len([x for x in self.nav_min_dates.items() if start_date <= x[1] <= end_date]) > 0
def has_change(self, day):
return self._datum.update_change(day)
def find_optimize(self, fund_ids, day):
assert self._config, "find optimize, but not found sortino config."
pct_change = pd.DataFrame(self.get_pct_change(fund_ids, day))
pct_change.set_index('date', inplace=True)
sortino = pd.DataFrame()
for item in self._config:
ratio = dict(sortino_ratio(
pct_change.truncate(before=(day - relativedelta(**dict_remove(item, ('weight', 'name')))))))
sortino = pd.concat([sortino, pd.DataFrame([ratio], index=[item['name']])])
sortino = sortino.T
sortino['score'] = sortino.apply(lambda r: sum([x['weight'] * r[x['name']] for x in self._config]), axis=1)
sortino.sort_values('score', ascending=False, inplace=True)
# 取得分数高的前optimize_count个
return pct_change.columns[sortino.index[0:self.optimize_count]].values
def get_optimize_pool(self, day):
opt_pool = rop.get_one(day=day, type=AssetPoolType.OPTIMIZE)
if opt_pool is not None:
return json.loads(opt_pool['asset_ids'])
last_one = rop.get_last_one(day=day, type=AssetPoolType.OPTIMIZE)
if not last_one or day > last_one['date']:
pool = []
min_dates = self.nav_min_dates
max_incept_date = sorted([(day - relativedelta(**x)) for x in self.delta_kwargs])[0]
max_incept_date = max_incept_date if is_workday(max_incept_date) else next_workday(max_incept_date)
for fund_group in self.get_groups():
fund_group = [x for x in fund_group if min_dates[x] <= max_incept_date]
if len(fund_group) > self.optimize_count:
pool.extend(self.find_optimize(tuple(fund_group), day))
elif len(fund_group) <= self.optimize_count:
pool.extend(fund_group)
rop.insert(day, AssetPoolType.OPTIMIZE, sorted(pool))
last_one = rop.get_last_one(day=day, type=AssetPoolType.OPTIMIZE)
return json.loads(last_one['asset_ids'])
def get_groups(self):
funds = pd.DataFrame(self._datum.get_datums(type=DatumType.FUND))
min_dates = self._navs.get_nav_start_date()
result = []
for (category, asset_type), fund_group in funds.groupby(by=['category', 'assetType']):
result.append(tuple(fund_group['id']))
if category in self.asset_include:
result.append(tuple(fund_group['id']))
return result
def get_pct_change(self, fund_ids, day):
if not self._config:
raise BusinessException(f"find optimize, but not found sortino config.")
raise exception(f"find optimize, but not found sortino config.")
start = filter_weekend(
sorted([day - relativedelta(days=1, **dict_remove(x, ('weight', 'name'))) for x in self._config])[0])
fund_navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=tuple(fund_ids), min_date=start, max_date=day))
......
import json
import logging
from datetime import datetime as dt
import pandas as pd
from dateutil.relativedelta import relativedelta
from py_jftech import component, autowired, get_config, format_date, transaction, asynchronized
from scipy.stats import norm
from api import AssetRisk, Navs, AssetRiskDateType as DateType, Datum, AssetPoolType, RoboExecutor, DatumType
from asset_pool.dao import asset_risk_dates as ard, asset_ewma_value as aev, robo_assets_pool as rap
logger = logging.getLogger(__name__)
@component
class CvarEwmaAssetRisk(AssetRisk):
'''
CVAR方式决定风控开始。风控开始后,开始计算ewma寻找风控结束日期,也就是ewma的起始日期
EWMA方式决定风控结束:风控结束后,就可以找到风控期的最低点日期,该日期作为下一轮cvar计算的起始日期
'''
@autowired(names={'executor': RoboExecutor.use_name()})
def __init__(self, navs: Navs = None, datum: Datum = None, executor: RoboExecutor = None):
self._navs = navs
self._datum = datum
self._executor = executor
self._config = get_config(__name__)
@property
def risk_start_date(self):
return self._executor.start_date - relativedelta(months=self._config['advance-months'])
def get_risk_pool(self, day):
asset_pool = rap.get_one(day, AssetPoolType.RISK)
if not asset_pool:
result = {x['id']: self.async_is_risk(x['id'], day) for x in self._datum.get_datums(type=DatumType.FUND, risk=(3, 4, 5))}
risk_ids = [x[0] for x in result.items() if x[1].result()]
rap.insert(day, AssetPoolType.RISK, risk_ids)
asset_pool = rap.get_one(day, AssetPoolType.RISK)
return json.loads(asset_pool['asset_ids'])
@asynchronized
def async_is_risk(self, id, day):
return self.is_risk(id, day)
def is_risk(self, id, day) -> bool:
asset_pool = rap.get_one(day, AssetPoolType.RISK)
if asset_pool:
return id in json.loads(asset_pool['asset_ids'])
last = ard.get_last_one(fund_id=id)
if last and last['date'] < day:
self.build_risk_date(id, day)
result = ard.get_last_one(id, day)
return DateType(result['type']) is DateType.START_DATE if result else True
def build_risk_date(self, asset_id, day=dt.today()):
risk_date = not None
try:
logger.debug(f"start build risk date for asset[{asset_id}] to date[{format_date(day)}]")
while risk_date is not None:
risk_date = self.get_next_date(asset_id, day=day)
except Exception as e:
logger.exception(f"build risk date for asset[{asset_id}] after date[{risk_date}] to date[{format_date(day)}] error", e)
@transaction
def clear(self, day=None):
ard.delete(day)
aev.delete(day)
def get_next_date(self, asset_id, day=dt.today()):
last = ard.get_last_one(asset_id, day)
if not last or DateType(last['type']) is DateType.START_DATE:
start_date = last['date'] if last else self.risk_start_date
ewma = pd.DataFrame(self.get_ewma_value(asset_id, min_date=start_date, max_date=day))
total = self._config['ewma']['condition-total']
meet = self._config['ewma']['condition-meet']
threshold = self._config['ewma']['threshold']
if len(ewma) < total:
return None
for index in range(total, len(ewma) - 1):
sub_ewma = ewma[index - total:index]
if len(sub_ewma[sub_ewma['ewma'] >= threshold]) >= meet:
stop_date = sub_ewma.iloc[-1]['date']
ard.insert(asset_id, DateType.STOP_DATE, stop_date)
return {'date': stop_date, 'type': DateType.STOP_DATE}
elif DateType(last['type']) is DateType.STOP_DATE:
last_start = ard.get_last_one(asset_id, last['date'], type=DateType.START_DATE)
start_date = last_start['date'] if last_start else self.risk_start_date
rtns = pd.DataFrame(self.get_income_return(asset_id, min_date=start_date, max_date=day))
risk_rtns = rtns[rtns.date <= last['date']]
cvar_start_date = risk_rtns.loc[risk_rtns.nav.idxmin()].date
for index, row in rtns[rtns.date >= cvar_start_date].iterrows():
tigger = False
cvar_rtns = rtns[(rtns.date >= cvar_start_date) & (rtns.date <= row['date'])]
if row.nav < rtns[rtns.date == cvar_start_date].iloc[0].nav:
tigger = True
elif row['rtn'] <= self._config['cvar']['threshold'] and len(cvar_rtns) >= self._config['cvar']['min-volume']:
# 当日回报率小于等于阀值并且有足够cvar累计计算数据,则计算cvar判断
alpha = 1 - self._config['cvar']['coef']
mean = cvar_rtns['rtn'].mean()
std = cvar_rtns['rtn'].std()
cvar = mean - std * norm.pdf(norm.ppf(alpha)) / alpha
tigger = row['rtn'] < cvar
if tigger:
ard.insert(asset_id, DateType.START_DATE, row['date'])
return {'date': row['date'], 'type': DateType.START_DATE}
return None
def get_ewma_value(self, id, min_date=None, max_date=None):
rtn = pd.DataFrame(self.get_income_return(id, min_date=min_date or self.risk_start_date, max_date=max_date))
if rtn.empty:
return []
rtn.sort_values('date', inplace=True)
last_one = aev.get_last_one(id, max_date=max_date)
if not last_one:
aev.insert(asset_id=id, date=rtn.iloc[0].date, value=rtn.iloc[0].rtn)
last_one = aev.get_last_one(id, max_date=max_date)
last_day = last_one['date']
if last_day < max_date:
ewma = last_one['value']
factor = self._config['ewma']['factor']
for index, row in rtn[rtn['date'] > last_day].iterrows():
ewma = factor * row['rtn'] + (1 - factor) * ewma
aev.insert(id, row['date'], ewma)
result = aev.get_list(id, min_date=min_date, max_date=max_date)
return [{'date': x['date'], 'ewma': x['value']} for x in result]
def get_income_return(self, asset_id, min_date=None, max_date=None):
fund_navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=asset_id, max_date=max_date))
if not fund_navs.empty:
fund_navs['rtn'] = fund_navs['nav_cal'] / fund_navs['nav_cal'].shift(self._config['rtn-days']) - 1
fund_navs.dropna(inplace=True)
if min_date:
fund_navs = fund_navs[fund_navs.nav_date >= pd.to_datetime(min_date)]
fund_navs.rename(columns={'nav_date': 'date', 'nav_cal': 'nav'}, inplace=True)
fund_navs = fund_navs[['date', 'nav', 'rtn']]
return fund_navs.to_dict('records')
return []
from py_jftech import read, write, where, format_date
__COLUMNS__ = {
'aev_id': 'id',
'aev_date': 'date',
'aev_asset_id': 'asset_id',
'aev_value': 'value',
}
@write
def insert(asset_id, date, value):
return f'''
insert into asset_ewma_value(aev_date, aev_asset_id, aev_value)
values ('{format_date(date)}', {asset_id}, {value})
'''
@write
def delete(day=None):
if day:
return f"delete from asset_ewma_value where aev_date >= '{format_date(day)}'"
else:
return 'truncate table asset_ewma_value'
@read(one=True)
def get_last_one(asset_id, max_date=None):
sqls = []
if max_date:
sqls.append(f"aev_date <= '{format_date(max_date)}'")
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from asset_ewma_value
{where(*sqls, aev_asset_id=asset_id)} order by aev_date desc limit 1
'''
@read
def get_list(asset_id, min_date=None, max_date=None):
sqls = []
if min_date:
sqls.append(f"aev_date >= '{format_date(min_date)}'")
if max_date:
sqls.append(f"aev_date <= '{format_date(max_date)}'")
return f'''select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from asset_ewma_value {where(*sqls, aev_asset_id=asset_id)}'''
@read
def get_last(asset_id, max_date=None, limit=1):
sqls = []
if max_date:
sqls.append(f"aev_date <= '{format_date(max_date)}'")
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from asset_ewma_value
{where(*sqls, aev_asset_id=asset_id)} order by aev_date desc limit {limit}
'''
from py_jftech import read, write, where, format_date
from api import AssetRiskDateType as DateType
__COLUMNS__ = {
'ard_id': 'id',
'ard_date': 'date',
'ard_type': 'type',
'ard_asset_id': 'asset_id',
}
@write
def insert(asset_id, type: DateType, date):
return f'''
insert into asset_risk_dates(ard_asset_id, ard_type, ard_date)
values ({asset_id}, {type.value}, '{format_date(date)}')
'''
@read
def get_list(asset_ids=None, max_date=None, min_date=None):
sqls = []
if max_date:
sqls.append(f"rap_date <= '{format_date(max_date)}'")
if min_date:
sqls.append(f"rap_date >= '{format_date(min_date)}'")
return f'''
select {','.join([f"`{x[0]}` as `{x[1]}`" for x in __COLUMNS__.items()])} from asset_risk_dates
{where(*sqls, ard_asset_id=asset_ids)} order by ard_asset_id, ard_date
'''
@read(one=True)
def get_last_one(fund_id, date=None, type: DateType = None):
kwargs = {
'ard_asset_id': fund_id,
'ard_type': type.value if type is not None else None
}
sql = f"ard_date <= '{format_date(date)}'" if date else None
return f'''
select {','.join([f"`{x[0]}` as `{x[1]}`" for x in __COLUMNS__.items()])}
from asset_risk_dates {where(sql, **kwargs)} order by ard_date desc, ard_type asc limit 1
'''
@write
def delete(day=None):
if day:
return f"delete from asset_risk_dates where ard_date >= '{format_date(day)}'"
else:
return 'truncate table asset_risk_dates'
......@@ -13,35 +13,3 @@ CREATE TABLE IF NOT EXISTS robo_assets_pool
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT '资产池';
CREATE TABLE IF NOT EXISTS asset_risk_dates
(
ard_id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
ard_date DATETIME NOT NULL COMMENT '风控日期',
ard_type TINYINT NOT NULL COMMENT '日期类型',
ard_asset_id BIGINT UNSIGNED NOT NULL COMMENT '资产ID',
ard_create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
ard_update_time DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (ard_id),
INDEX (ard_date),
INDEX (ard_type),
INDEX (ard_asset_id)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT '资产风控日期表';
CREATE TABLE IF NOT EXISTS asset_ewma_value
(
aev_id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
aev_date DATETIME NOT NULL COMMENT '日期',
aev_asset_id BIGINT UNSIGNED NOT NULL COMMENT '资产ID',
aev_value DOUBLE NOT NULL COMMENT 'ewma值',
aev_create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
aev_update_time DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (aev_id),
UNIQUE INDEX (aev_asset_id, aev_date),
INDEX (aev_date)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT '资产EWMA数据';
\ No newline at end of file
from datetime import datetime as dt
from typing import List
import pandas as pd
from py_jftech import component, autowired, workday_range, parse_date
from py_jftech import component, autowired
from api import AssetPool, AssetOptimize, AssetRisk, RoboReportor, Datum, DatumType, RoboExecutor
from api import AssetPool, AssetOptimize
from asset_pool.dao import robo_assets_pool as rap
......@@ -12,42 +10,11 @@ from asset_pool.dao import robo_assets_pool as rap
class FundAssetPool(AssetPool):
@autowired
def __init__(self, optimize: AssetOptimize = None, risk: AssetRisk = None):
def __init__(self, optimize: AssetOptimize = None):
self._optimize = optimize
self._risk = risk
def get_pool(self, day=dt.today()):
opti_pool = self._optimize.get_optimize_pool(day)
risk_pool = self._risk.get_risk_pool(day)
return [x for x in opti_pool if x not in risk_pool]
return self._optimize.get_optimize_pool(day)
def clear(self, day=None):
rap.delete(day)
@component(bean_name='asset-pool-report')
class AssetPoolReportor(RoboReportor):
@autowired
def __init__(self, optimize: AssetOptimize = None, risk: AssetRisk = None, datum: Datum = None, executor: RoboExecutor = None):
self._optimize = optimize
self._risk = risk
self._datum = datum
self._executor = executor
@property
def report_name(self) -> str:
return '基金池'
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
datums = self._datum.get_datums(type=DatumType.FUND)
result = []
for date in workday_range(min_date or self._executor.start_date, max_date):
opts = self._optimize.get_optimize_pool(date)
risk = self._risk.get_risk_pool(date)
datas = {x['bloombergTicker']: 0 if x['id'] in risk else 1 if x['id'] in opts else -1 for x in datums}
result.append({'date': date, **datas})
return result
import logging
import unittest
from py_jftech import autowired, parse_date, to_str
from py_jftech import autowired, parse_date
from api import AssetPool, RoboReportor, AssetRisk
from api import AssetOptimize
logger = logging.getLogger(__name__)
class AssetPoolTest(unittest.TestCase):
@autowired
def test_asset_pool(self, pool: AssetPool = None):
result = pool.get_pool(parse_date('2008-11-05'))
@autowired(names={'report': 'asset-pool-report'})
def test_pool_report(self, report: RoboReportor = None):
result = report.load_report(max_date=parse_date('2009-12-31'))
logger.info(to_str(result))
@autowired
def test_next_risk_date(self, risk: AssetRisk = None):
risk.build_risk_date(asset_id=46)
@autowired(names={'asset': 'dividend'})
def test_dividend_asset_optimize(self, asset: AssetOptimize = None):
asset.get_optimize_pool(parse_date('2023-03-01'))
if __name__ == '__main__':
......
import json
import os
from datetime import datetime as dt
from typing import List
import pandas as pd
from py_jftech import component, parse_date, get_config, to_tuple, autowired, get_project_path, transaction
from py_jftech import component, parse_date, get_config, to_tuple, get_project_path, transaction
from api import DatumType, Datum, PortfoliosRisk, RoboReportor, RoboExecutor
from api import DatumType, Datum, PortfoliosRisk, RoboExecutor
from basic.dao import robo_base_datum as rbd
......@@ -55,7 +53,8 @@ class DefaultDatum(Datum):
datum_ids = tuple(set(datum_ids or []) | {x['id'] for x in datums})
result = rbd.get_base_datums(type=type, crncy=crncy, risk=risk, datum_ids=datum_ids)
result = [{**json.loads(x['datas']), 'id': x['id']} for x in result]
return [self.format_datum(x) for x in result if not exclude or x['id'] in (datum_ids or []) or x['bloombergTicker'] not in self.excludes]
return [self.format_datum(x) for x in result if
not exclude or x['id'] in (datum_ids or []) or x['bloombergTicker'] not in self.excludes]
def get_high_risk_datums(self, risk: PortfoliosRisk):
risk3 = self.get_datums(type=DatumType.FUND, risk=3)
......@@ -80,22 +79,3 @@ class DefaultDatum(Datum):
}))
return True
return False
@component(bean_name='funds-report')
class FundReportor(RoboReportor):
@autowired
def __init__(self, datum: Datum = None):
self._datum = datum
@property
def report_name(self) -> str:
return '基金资料'
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
datums = self._datum.get_datums(type=DatumType.FUND)
datums = pd.DataFrame(datums)
datums = datums[
['id', 'ftTicker', 'bloombergTicker', 'chineseName', 'englishName', 'lipperKey', 'isin', 'currency', 'risk', 'inceptDate', 'category', 'assetType']]
return datums.to_dict('records')
......@@ -4,7 +4,7 @@ from typing import List
import pandas as pd
from py_jftech import get_config, component, autowired, to_tuple
from api import Navs, Datum, DatumType, RoboReportor
from api import Navs, Datum, DatumType
from basic.dao import robo_exrate as re, robo_fund_navs as rfn, robo_index_datas as rid, robo_eco_datas as red
......@@ -88,34 +88,3 @@ class DefaultNavs(Navs):
else:
return red.get_last(eco_id=datum_id, max_date=max_date, count=count, by_release_date=by_release_date)
@component(bean_name='navs-report')
class NavsReportor(RoboReportor):
@autowired
def __init__(self, datum: Datum = None, navs: Navs = None):
self._datum = datum;
self._navs = navs
self._config = get_config('reports.navs')
@property
def report_name(self) -> str:
return "基金净值"
@property
def tickers(self):
return self._config['tickers'] if 'tickers' in self._config else None
@property
def type(self):
return DatumType[self._config['type']]
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
asset_ids = {x['id']: x for x in self._datum.get_datums(ticker=self.tickers, type=self.type)}
if self.type == DatumType.FUND:
result = pd.DataFrame(self._navs.get_fund_navs(fund_ids=tuple(asset_ids.keys()), max_date=max_date, min_date=min_date))
result = result.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
result.rename(columns={x[0]: x[1]['bloombergTicker'] for x in asset_ids.items()}, inplace=True)
result.reset_index(inplace=True)
return result.to_dict('records')
return []
......@@ -4,7 +4,7 @@ from typing import List
from py_jftech import autowired, parse_date, to_str
from api import Navs, Datum, PortfoliosRisk, DataSync, RoboReportor
from api import Navs, Datum, PortfoliosRisk, DataSync
logger = logging.getLogger(__name__)
......@@ -42,16 +42,6 @@ class BasicTest(unittest.TestCase):
for sync in syncs:
sync.do_sync()
@autowired(names={'report': 'navs-report'})
def test_export_navs(self, report: RoboReportor = None):
result = report.load_report()
logger.info(to_str(result))
@autowired(names={'report': 'funds-report'})
def test_export_funds(self, report: RoboReportor = None):
result = report.load_report()
logger.info(to_str(result))
if __name__ == '__main__':
unittest.main()
This diff is collapsed.
This diff is collapsed.
py-jftech:
logger:
version: 1
formatters:
brief:
format: "%(asctime)s - %(levelname)s - %(message)s"
simple:
format: "%(asctime)s - %(filename)s - %(levelname)s - %(message)s"
handlers:
console:
class: logging.StreamHandler
formatter: simple
level: DEBUG
stream: ext://sys.stdout
file:
class: logging.handlers.TimedRotatingFileHandler
level: INFO
formatter: brief
filename: ${LOG_FILE:logs/info.log}
interval: 1
backupCount: 30
encoding: utf8
when: D
# loggers:
# basic.sync:
# level: DEBUG
# handlers: [console]
# propagate: no
root:
level: ${LOG_LEVEL:INFO}
handlers: ${LOG_HANDLERS:[ console ]}
database:
host: ${MYSQL_HOST:192.168.68.81}
port: ${MYSQL_PORT:3306}
user: ${MYSQL_USER:root}
password: ${MYSQL_PWD:changeit}
dbname: ${MYSQL_DBNAME:jftech_robo}
injectable:
types:
api.PortfoliosBuilder: portfolios.builder.PoemPortfoliosBuilder
email:
server: smtphz.qiye.163.com
user: jft-ra@thizgroup.com
password: 5dbb#30ec6d3
mulit-process:
max-workers: ${MAX_PROCESS:4}
basic: # 基础信息模块
sync:
start-date: 2007-01-01 # 同步数据开始日期
datum: # 资料模块
change:
date: ${DATUM_CHANGE_DATE}
file: ${DATUM_CHANGE_FILE}
excludes: # 排除的资料彭博ticker
backtest:
- 'TEMUSGI LX Equity'
real:
- 'FGFSACU LX Equity'
# navs: # 净值模块
# exrate: # 汇率,如果不开启,整个这块注释掉
# - from: EUR # 需要转换的货币类型
# ticker: EURUSD BGN Curncy # 汇率值的彭博ticker
asset-pool: # 资产池模块
asset-optimize: # 资产优选模块
sortino-weight: # sortino计算需要的权重,下面每一条为一次计算,e.g. months: 3, weight: 0.5 表示 3个月数据使用权重0.5来计算分值
- months: 3
weight: 0.5
- months: 6
weight: 0.3
- years: 1
weight: 0.2
asset-include: ['US_STOCK','US_IG_BOND','US_HY_BOND']
optimize-count: 3 #基金优选个数
portfolios: # 投组模块
holder: # 持仓投组相关
init-nav: 100 # 初始金额
min-interval-days: 10 # 两次实际调仓最小间隔期,单位交易日
solver: # 解算器相关
tol: 1E-10 # 误差满足条件
navs: # 净值要求
range: # 需要净值数据的区间, days: 90 表示90自然日,months: 3 表示3个自然月
days: 90
max-nan: # 最大缺失净值条件
asset: 8 # 单一资产最多缺少多少交易日数据,则踢出资产池
day: 0.5 # 单一交易日最多缺少百分之多少净值,则删除该交易日
normal-ratio: #US_STOCK:US_HY_BOND:US_IG_BOND三者分别对应低中高风险所占比率
US_STOCK: [ 0.3, 0.5, 0.7 ]
US_HY_BOND: [ 0.6, 0.4, 0.2 ]
US_IG_BOND: [ 0.1, 0.1, 0.1 ]
dividend-rate: 0.09
riskctl-ratio:
US_STOCK: [ 0.2, 0.4, 0.6 ]
US_HY_BOND: [ 0.5, 0.3, 0.1 ]
US_IG_BOND: [ 0.3, 0.3, 0.3 ]
dividend-rate: 0.09
matrix-rtn-days: 20 # 计算回报率矩阵时,回报率滚动天数
asset-count: [1,3] # 投组资产个数。e.g. count 或 [min, max] 分别表示 最大最小都为count 或 最小为min 最大为max,另外这里也可以类似上面给不同风险等级分别配置
mpt: # mpt计算相关
cvar-beta: 0.2 # 计算Kbeta 需要用到
quantile: 0.9 # 分位点,也可以给不同风险等级分别配置
low-weight: 0.05 # 最低权重
high-weight: [ 1 ] # 最高权重比例,可给一个值,也可以给多个值,当多个值时,第一个表示只有一个资产时权重,第二个表示只有两个资产时权重,以此类推,最后一个表示其他资产个数时的权重
poem: # poem相关
cvar-scale-factor: 0.1 # 计算时用到的系数
robo-executor: # 执行器相关
use: ${ROBO_EXECUTOR:backtest} # 执行哪个执行器,优先取系统环境变量ROBO_EXECUTOR的值,默认backtest
sync-data: ${SYNC_DATA:off} # 是否开启同步资料数据
backtest: # 回测执行器相关
start-date: 2022-09-01 # 回测起始日期
end-date: 2023-03-03 # 回测截止日期
start-step: ${BACKTEST_START_STEP:3} # 回测从哪一步开始执行 1:计算资产池;2:计算最优投组:3:计算再平衡信号以及持仓投组
end-step: ${BACKTEST_END_STEP:3} # 回测从哪一步执行完成后结束执行 1:计算资产池;2:计算最优投组:3:计算再平衡信号以及持仓投组
clean-up: true
real: # 实盘执行器
start-date: 2022-09-01 # 实盘开始时间
import json
import logging
from datetime import datetime as dt
from typing import List
from py_jftech import component, autowired, format_date
from pymysql import IntegrityError, constants
from api import PortfoliosBuilder, PortfoliosRisk, AssetPool, Navs, PortfoliosType, Datum, SolveType, SolverFactory, RoboReportor, DatumType
from api import PortfoliosBuilder, PortfoliosRisk, AssetPool, Navs, PortfoliosType, Datum, SolveType, SolverFactory
from portfolios.dao import robo_mpt_portfolios as rmp
logger = logging.getLogger(__name__)
......@@ -26,7 +24,7 @@ class MptPortfoliosBuilder(PortfoliosBuilder):
try:
portfolio = rmp.get_one(day, type, risk)
if not portfolio:
result, detail = self.build_portfolio(day, type)
result = self.build_portfolio(day, type)
for build_risk, datas in result.items():
try:
rmp.insert({
......@@ -45,41 +43,37 @@ class MptPortfoliosBuilder(PortfoliosBuilder):
return {int(x[0]): x[1] for x in result.items()}
return None
except Exception as e:
logger.exception(f"build protfolio of type[{type.name}] and risk[{risk.name}] with date[{format_date(day)}] failure.", e)
logger.exception(
f"build protfolio of type[{type.name}] and risk[{risk.name}] with date[{format_date(day)}] failure.", e)
raise e
def build_portfolio(self, day, type: PortfoliosType):
result = {}
detail = {}
portfolios = {}
for risk in PortfoliosRisk:
logger.info(
f"start to build protfolio of type[{type.name}] and risk[{risk.name}] with date[{format_date(day)}]")
solver = self._factory.create_solver(risk, type)
solver.reset_navs(day)
logger.debug({
'Khist': len(solver.rtn_history),
'beta': solver.get_config('mpt.cvar-beta'),
'Kbeta': solver.k_beta,
})
max_rtn, max_var, minCVaR_whenMaxR = solver.solve_max_rtn()
min_rtn, min_var, maxCVaR_whenMinV = solver.solve_min_rtn()
portfolio, cvar = solver.solve_mpt(min_rtn, max_rtn)
result[risk] = {
'solve': SolveType.MPT,
'portfolio': json.dumps(portfolio),
'cvar': cvar
} if portfolio else {
'solve': SolveType.INFEASIBLE
}
detail[risk] = {
'max_rtn': max_rtn,
'max_var': max_var,
'minCVaR_whenMaxR': minCVaR_whenMaxR,
'min_rtn': min_rtn,
'min_var': min_var,
'maxCVaR_whenMinV': maxCVaR_whenMinV,
}
return result, detail
navs_group = solver.reset_navs(day)
for category, navs in navs_group.items():
solver.set_navs(navs)
solver.set_category(category)
logger.debug({
'Khist': len(solver.rtn_history),
'beta': solver.get_config('mpt.cvar-beta'),
'Kbeta': solver.k_beta,
})
max_rtn, max_var, minCVaR_whenMaxR = solver.solve_max_rtn()
min_rtn, min_var, maxCVaR_whenMinV = solver.solve_min_rtn()
portfolio, cvar = solver.solve_mpt(min_rtn, max_rtn)
portfolios = {**portfolios, **portfolio}
result[risk] = {
'solve': SolveType.MPT,
'portfolio': json.dumps(portfolios),
} if portfolios else {
'solve': SolveType.INFEASIBLE
}
return result
def clear(self, day=None, risk: PortfoliosRisk = None):
rmp.delete(min_date=day, risk=risk)
......@@ -89,62 +83,24 @@ class MptPortfoliosBuilder(PortfoliosBuilder):
class PoemPortfoliosBuilder(MptPortfoliosBuilder):
def build_portfolio(self, day, type: PortfoliosType):
result, detail = super(PoemPortfoliosBuilder, self).build_portfolio(day, type)
result = {}
portfolios = {}
for risk in PortfoliosRisk:
if result[risk]['solve'] is SolveType.INFEASIBLE:
continue
solver = self._factory.create_solver(risk, type)
solver.reset_navs(day)
min_rtn = detail[risk]['min_rtn']
max_rtn = detail[risk]['max_rtn']
mpt_cvar = result[risk]['cvar']
maxCVaR_whenMinV = detail[risk]['maxCVaR_whenMinV']
portfolio, cvar = solver.solve_poem(min_rtn, max_rtn, mpt_cvar, maxCVaR_whenMinV)
if portfolio:
navs_group = solver.reset_navs(day)
for category, navs in navs_group.items():
solver.set_navs(navs)
solver.set_category(category)
max_rtn, max_var, minCVaR_whenMaxR = solver.solve_max_rtn()
min_rtn, min_var, maxCVaR_whenMinV = solver.solve_min_rtn()
mpt_portfolio, mpt_cvar = solver.solve_mpt(min_rtn, max_rtn)
portfolio, cvar = solver.solve_poem(min_rtn, max_rtn, mpt_cvar, maxCVaR_whenMinV)
if not portfolio:
portfolio = mpt_portfolio
portfolios = {**portfolios, **portfolio }
if portfolios:
result[risk] = {
'solve': SolveType.POEM,
'portfolio': json.dumps(portfolio),
'cvar': cvar
'portfolio': json.dumps(portfolios),
}
detail[risk]['mpt_cvar'] = mpt_cvar
return result, detail
@component(bean_name='mpt-report')
class MptReportor(RoboReportor):
@autowired
def __init__(self, datum: Datum = None):
self._datum = datum
@property
def report_name(self) -> str:
return '最优投组'
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
results = []
datums = {x['id']: x for x in self._datum.get_datums(type=DatumType.FUND, exclude=False)}
for portfolio in rmp.get_list(max_date=max_date, min_date=min_date):
solve_type = SolveType(portfolio['solve'])
datas = {
'date': portfolio['date'],
'risk': PortfoliosRisk(portfolio['risk']).name,
'type': PortfoliosType(portfolio['type']).name,
'solve': solve_type.name,
'cvar': portfolio['cvar']
}
if solve_type is not SolveType.INFEASIBLE:
for asset_id, weight in json.loads(portfolio['portfolio']).items():
datum = datums[int(asset_id)]
results.append({
**datas,
'ft_ticker': datum['ftTicker'],
'lipper_id': datum['lipperKey'],
'bloomberg_ticker': datum['bloombergTicker'],
'name': datum['chineseName'],
'weight': weight,
})
else:
results.append(datas)
return results
return result
import json
import logging
from datetime import datetime as dt
from typing import List
from functools import reduce
import pandas as pd
from py_jftech import (
component, autowired, get_config, next_workday, prev_workday, transaction, workday_range, format_date
component, autowired, get_config, next_workday, format_date
)
from api import PortfoliosHolder, PortfoliosRisk, RebalanceRuler, Navs, SignalType, RoboExecutor, PortfoliosType, RoboReportor, Datum, DatumType
from api import PortfoliosHolder, PortfoliosRisk, Navs, RoboExecutor, PortfoliosType
from portfolios.dao import robo_hold_portfolios as rhp
from portfolios.utils import format_weight
logger = logging.getLogger(__name__)
@component(bean_name='next-re')
class NextReblanceHolder(PortfoliosHolder):
@component(bean_name='dividend-holder')
class DividendPortfoliosHolder(PortfoliosHolder):
@autowired(names={'executor': RoboExecutor.use_name()})
def __init__(self, rule: RebalanceRuler, navs: Navs = None, executor: RoboExecutor = None):
self._rule = rule
def __init__(self, navs: Navs = None, executor: RoboExecutor = None):
self._navs = navs
self._executor = executor
self._config = get_config(__name__)
def get_portfolio_type(self, day, risk: PortfoliosRisk) -> PortfoliosType:
hold = rhp.get_one(day, risk)
if hold:
signal_type = self._rule.get_signal_type(hold['signal_id'])
return signal_type.p_type if signal_type else PortfoliosType.NORMAL
return PortfoliosType.NORMAL
def get_last_rebalance_date(self, risk: PortfoliosRisk, max_date=None):
......@@ -38,10 +30,6 @@ class NextReblanceHolder(PortfoliosHolder):
last = rhp.get_last_one(max_date=max_date, risk=risk, rebalance=True)
return last['date'] if last else None
def get_rebalance_date_by_signal(self, signal_id):
last = rhp.get_last_one(signal_id=signal_id, rebalance=True)
return last['date'] if last else None
def get_portfolios_weight(self, day, risk: PortfoliosRisk):
hold = rhp.get_one(day, risk)
if hold:
......@@ -56,50 +44,16 @@ class NextReblanceHolder(PortfoliosHolder):
last_nav = rhp.get_last_one(max_date=day, risk=risk)
start = next_workday(last_nav['date'] if last_nav else self._executor.start_date)
try:
if not last_nav:
pass
while start <= day:
logger.info(f"start to build hold portfolio[{risk.name}] for date[{format_date(start)}]")
signal = None
if last_nav:
last_re_date = self.get_last_rebalance_date(risk=risk, max_date=start)
if len(workday_range(last_re_date, start)) > self.interval_days:
signal = self._rule.take_next_signal(prev_workday(start), risk)
else:
signal = self._rule.take_next_signal(prev_workday(start), risk)
if signal and not signal['effective']:
logger.info(f"start to rebalance hold portfolio[{risk.name}] for date[{format_date(start)}] "
f"with signal[{SignalType(signal['type']).name}]")
self.do_rebalance(start, risk, signal, last_nav)
elif last_nav and signal is None:
self.no_rebalance(start, risk, last_nav)
self.no_rebalance(start, risk, last_nav)
start = next_workday(start)
last_nav = rhp.get_last_one(max_date=day, risk=risk)
except Exception as e:
logger.exception(f"build hold portfolio[{risk.name}] for date[{format_date(start)}] failure.", e)
@transaction
def do_rebalance(self, day, risk: PortfoliosRisk, signal, last_nav):
weight = {int(x[0]): x[1] for x in json.loads(signal['portfolio']).items()}
if last_nav:
share = {int(x): y for x, y in json.loads(last_nav['portfolios'])['share'].items()}
navs = self.get_navs(fund_ids=tuple(set(weight) | set(share)), day=day)
nav = round(sum([navs[x] * y for x, y in share.items()]), 4)
else:
nav = self.init_nav
navs = self.get_navs(fund_ids=tuple(weight), day=day)
share = {x: nav * w / navs[x] for x, w in weight.items()}
rhp.insert({
'date': day,
'risk': risk,
'signal_id': signal['id'],
'rebalance': True,
'portfolios': {
'weight': weight,
'share': share,
},
'nav': nav,
})
self._rule.commit_signal(signal['id'])
def no_rebalance(self, day, risk: PortfoliosRisk, last_nav):
share = {int(x): y for x, y in json.loads(last_nav['portfolios'])['share'].items()}
navs = self.get_navs(fund_ids=tuple(share), day=day)
......@@ -134,62 +88,3 @@ class NextReblanceHolder(PortfoliosHolder):
@property
def init_nav(self):
return self._config['init-nav']
@component(bean_name='hold-report')
class HoldReportor(RoboReportor):
@autowired
def __init__(self, rule: RebalanceRuler = None):
self._rule = rule
@property
def report_name(self) -> str:
return '投组净值'
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
holds = pd.DataFrame(rhp.get_list(max_date=max_date, min_date=min_date))
if not holds.empty:
signal_types = self._rule.get_signal_type(tuple(set(holds['signal_id'])))
holds['signal_type'] = holds.apply(lambda row: signal_types[row['signal_id']].name, axis=1)
holds['risk'] = holds.apply(lambda row: PortfoliosRisk(row['risk']).name, axis=1)
holds = holds[['risk', 'date', 'nav', 'signal_type']]
return holds.to_dict('records')
return []
@component(bean_name='daily-hold-report')
class DailyHoldReportor(RoboReportor):
@autowired
def __init__(self, rule: RebalanceRuler = None, datum: Datum = None):
self._rule = rule
self._datum = datum
@property
def report_name(self) -> str:
return '每日持仓信息'
def load_report(self, max_date=prev_workday(dt.today()), min_date=None) -> List[dict]:
holds = pd.DataFrame(rhp.get_list(max_date=max_date, min_date=min_date))
holds = holds[holds['date'].dt.date == max_date.date()]
if not holds.empty:
signal_types = self._rule.get_signal_type(tuple(set(holds['signal_id'])))
signal_dates = self._rule.get_signal_date(tuple(set(holds['signal_id'])))
datum_ids = reduce(lambda x, y: x | y, holds['portfolios'].apply(lambda x: set(json.loads(x)['weight'].keys())))
datums = pd.DataFrame(self._datum.get_datums(type=DatumType.FUND, datum_ids=datum_ids))
datums.set_index('id', inplace=True)
holds['rebalance_type'] = holds.apply(lambda row: signal_types[row['signal_id']].name, axis=1)
holds['rebalance_date'] = holds.apply(lambda row: signal_dates[row['signal_id']], axis=1)
holds['risk'] = holds.apply(lambda row: PortfoliosRisk(row['risk']).name, axis=1)
holds['portfolios'] = holds.apply(lambda row: [x for x in json.loads(row['portfolios'])['weight'].items()], axis=1)
holds = holds.explode('portfolios', ignore_index=True)
holds['weight'] = holds.apply(lambda row: row['portfolios'][1], axis=1)
holds['asset_ids'] = holds.apply(lambda row: datums.loc[int(row['portfolios'][0])]['ftTicker'], axis=1)
holds['name'] = holds.apply(lambda row: datums.loc[int(row['portfolios'][0])]['chineseName'], axis=1)
holds['lipper_id'] = holds.apply(lambda row: datums.loc[int(row['portfolios'][0])]['lipperKey'], axis=1)
holds = holds[['risk', 'date', 'asset_ids', 'weight', 'rebalance_type', 'rebalance_date', 'name', 'lipper_id']]
return holds.to_dict('records')
return []
......@@ -5,7 +5,7 @@ from logging import DEBUG, getLogger
import pandas as pd
from dateutil.relativedelta import relativedelta
from numpy import NAN
from py_jftech import component, autowired, get_config, is_workday
from py_jftech import component, autowired, get_config
from pyomo.environ import *
from api import SolverFactory as Factory, PortfoliosRisk, PortfoliosType, AssetPool, Navs, Solver, Datum, DatumType
......@@ -36,6 +36,8 @@ class DefaultSolver(Solver):
@autowired
def __init__(self, risk: PortfoliosRisk, type: PortfoliosType, assets: AssetPool = None, navs: Navs = None,
datum: Datum = None):
self._category = None
self._transfer_type = None
self.__navs = None
self.risk = risk
self.type = type or PortfoliosType.NORMAL
......@@ -82,6 +84,21 @@ class DefaultSolver(Solver):
def quantile(self):
return self.get_config('mpt.quantile')
@property
def category(self):
return self._category
@property
def transfer_type(self):
self._transfer_type = self.get_config("normal-ratio")
return self._transfer_type
def set_navs(self, navs):
self.__navs = navs
def set_category(self, category):
self._category = category
def solve_max_rtn(self):
model = self.create_model()
model.objective = Objective(expr=sum([model.w[i] * self.rtn_annualized[i] for i in model.indices]),
......@@ -165,7 +182,7 @@ class DefaultSolver(Solver):
df_w = pd.DataFrame(data=weight_list, index=id_list, columns=['weight'])
df_w.replace(0, NAN, inplace=True)
df_w.dropna(axis=0, inplace=True)
df_w['weight'] = pd.Series(format_weight(dict(df_w['weight'])))
df_w['weight'] = pd.Series(format_weight(dict(df_w['weight']), self.get_weight()))
dict_w = df_w.to_dict()['weight']
return dict_w
......@@ -184,6 +201,10 @@ class DefaultSolver(Solver):
port_r_hist.sort()
return sum(port_r_hist[0: self.k_beta]) / self.k_beta
def get_weight(self):
# todo 根据self.risk找配置
return self.transfer_type[self.category][0]
def create_model(self):
count = self.get_config('asset-count')
min_count = count[0] if isinstance(count, list) else count
......@@ -194,13 +215,12 @@ class DefaultSolver(Solver):
high_weight = self.get_config('mpt.high-weight')
if isinstance(high_weight, list):
high_weight = high_weight[min(len(self.navs.columns), min_count, len(high_weight)) - 1]
model = ConcreteModel()
model.indices = range(0, len(self.navs.columns))
model.w = Var(model.indices, domain=NonNegativeReals)
model.z = Var(model.indices, domain=Binary)
model.cons_sum_weight = Constraint(expr=sum([model.w[i] for i in model.indices]) == 1)
model.cons_sum_weight = Constraint(expr=sum([model.w[i] for i in model.indices]) == high_weight)
model.cons_num_asset = Constraint(
expr=inequality(min_count, sum([model.z[i] for i in model.indices]), max_count, strict=False))
model.cons_bounds_low = Constraint(model.indices, rule=lambda m, i: m.z[i] * low_weight <= m.w[i])
......@@ -209,28 +229,29 @@ class DefaultSolver(Solver):
def reset_navs(self, day):
asset_ids = self._assets.get_pool(day)
asset_risk = self.get_config('navs.risk')
datum = self._datum.get_datums(type=DatumType.FUND, datum_ids=asset_ids, risk=asset_risk)
exclude = self.get_config('navs.exclude-asset-type') or []
asset_ids = list(set(asset_ids) & set([x['id'] for x in datum if x['assetType'] not in exclude]))
min_date = day - relativedelta(**self.get_config('navs.range'))
navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=asset_ids, max_date=day, min_date=min_date))
navs = navs[navs['nav_date'].dt.day_of_week < 5]
navs['nav_date'] = pd.to_datetime(navs['nav_date'])
navs = navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
navs = navs.sort_index()
navs_nan = navs.isna().sum()
navs.drop(columns=[x for x in navs_nan.index if navs_nan.loc[x] >= self.get_config('navs.max-nan.asset')],
inplace=True)
navs_nan = navs.apply(lambda r: r.isna().sum() / len(r), axis=1)
navs.drop(index=[x for x in navs_nan.index if navs_nan.loc[x] >= self.get_config('navs.max-nan.day')],
inplace=True)
navs.fillna(method='ffill', inplace=True)
if navs.iloc[0].isna().sum() > 0:
navs.fillna(method='bfill', inplace=True)
self.__navs = navs
datum = self._datum.get_datums(type=DatumType.FUND, datum_ids=asset_ids)
asset_ids_group = {k: [d['id'] for d in datum if d['category'] == k] for k in set(d['category'] for d in datum)}
navs_group = {}
for category, asset_ids in asset_ids_group.items():
min_date = day - relativedelta(**self.get_config('navs.range'))
navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=asset_ids, max_date=day, min_date=min_date))
navs = navs[navs['nav_date'].dt.day_of_week < 5]
navs['nav_date'] = pd.to_datetime(navs['nav_date'])
navs = navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
navs = navs.sort_index()
navs_nan = navs.isna().sum()
navs.drop(columns=[x for x in navs_nan.index if navs_nan.loc[x] >= self.get_config('navs.max-nan.asset')],
inplace=True)
navs_nan = navs.apply(lambda r: r.isna().sum() / len(r), axis=1)
navs.drop(index=[x for x in navs_nan.index if navs_nan.loc[x] >= self.get_config('navs.max-nan.day')],
inplace=True)
navs.fillna(method='ffill', inplace=True)
if navs.iloc[0].isna().sum() > 0:
navs.fillna(method='bfill', inplace=True)
navs_group[category] = navs
self.__navs = navs_group
return navs_group
def get_config(self, name):
def load_config(config):
......@@ -244,7 +265,8 @@ class DefaultSolver(Solver):
value = load_config(self._config[self.type.value] if self.type is not PortfoliosType.NORMAL else self._config)
if value is None:
value = load_config(self._config)
return value[f'ft{self.risk.value}'] if value and isinstance(value, dict) and f'ft{self.risk.value}' in value else value
return value[f'ft{self.risk.value}'] if value and isinstance(value,
dict) and f'ft{self.risk.value}' in value else value
def debug_solve_result(self, model):
if logger.isEnabledFor(DEBUG):
......
import logging
import unittest
from py_jftech import autowired, parse_date, to_str
from py_jftech import autowired, parse_date
from api import PortfoliosBuilder, PortfoliosType, PortfoliosRisk, PortfoliosHolder, RoboReportor
from api import PortfoliosBuilder, PortfoliosType, PortfoliosRisk, PortfoliosHolder
class PortfoliosTest(unittest.TestCase):
......@@ -23,28 +23,18 @@ class PortfoliosTest(unittest.TestCase):
portfolio = builder.get_portfolios(parse_date('2022-11-07'), PortfoliosRisk.FT9)
self.logger.info(portfolio)
@autowired(names={'hold': 'next-re'})
@autowired(names={'hold': 'dividend-holder'})
def test_has_hold(self, hold: PortfoliosHolder = None):
self.logger.info(hold.has_hold(PortfoliosRisk.FT3))
@autowired(names={'hold': 'next-re'})
@autowired(names={'hold': 'dividend-holder'})
def test_build_hold(self, hold: PortfoliosHolder = None):
hold.build_hold_portfolio(parse_date('2023-02-23'), PortfoliosRisk.FT9)
@autowired(names={'reportor': 'hold-report'})
def test_hold_report(self, reportor: RoboReportor = None):
report = reportor.load_report()
self.logger.info(to_str(report))
@autowired(names={'hold': 'next-re'})
@autowired(names={'hold': 'dividend-holder'})
def test_clear(self, hold: PortfoliosHolder = None):
hold.clear()
@autowired(names={'reportor': 'daily-hold-report'})
def test_daily_hold_report(self, reportor: RoboReportor = None):
report = reportor.load_report()
self.logger.info(to_str(report))
if __name__ == '__main__':
unittest.main()
import pandas as pd
def format_weight(weight: dict) -> dict:
def format_weight(weight: dict, to=1) -> dict:
"""
对权重的小数点进行截取,到指定权重
@param weight:
@param to: 指定权重
@return:
"""
weight_series = pd.Series(weight)
weight_series = weight_series.fillna(0)
minidx = weight_series[weight_series > 0].idxmin()
maxidx = weight_series.idxmax()
weight_series = weight_series.apply(lambda x: round(x, 2))
if weight_series.sum() == 1:
if weight_series.sum() == to:
return dict(weight_series)
elif weight_series.sum() < 1:
weight_series[minidx] += 1 - weight_series.sum()
elif weight_series.sum() > 1:
weight_series[maxidx] += 1 - weight_series.sum()
elif weight_series.sum() < to:
weight_series[minidx] += to - weight_series.sum()
elif weight_series.sum() > to:
weight_series[maxidx] += to - weight_series.sum()
return dict(weight_series.apply(lambda x: round(float(x), 2)))
if __name__ == '__main__':
print(format_weight({19: 0.13, 27: 0.17, 31: 0.35, 56: 0.36}))
print(format_weight({19: 0.13, 27: 0.17, 56: 0.36}))
from abc import ABC, abstractmethod
from py_jftech import autowired
from api import RebalanceSignal, PortfoliosBuilder, PortfoliosRisk
from rebalance.dao import robo_rebalance_signal as rrs
class BaseRebalanceSignal(RebalanceSignal, ABC):
@autowired
def __init__(self, builder: PortfoliosBuilder = None):
self._builder = builder
def get_signal(self, day, risk: PortfoliosRisk):
signal = rrs.get_one(type=self.signal_type, risk=risk, date=day)
if signal:
return signal
trigger = self.is_trigger(day, risk)
if trigger:
portfolio = self._builder.get_portfolios(day, risk, self.signal_type.p_type)
id = rrs.insert({
'date': day,
'type': self.signal_type,
'risk': risk,
'portfolio_type': self.signal_type.p_type,
'portfolio': portfolio
})
return rrs.get_by_id(id)
return None
@abstractmethod
def is_trigger(self, day, risk: PortfoliosRisk) -> bool:
pass
CREATE TABLE IF NOT EXISTS robo_rebalance_signal
(
rrs_id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
rrs_date DATETIME NOT NULL COMMENT '信号日期',
rrs_type TINYINT NOT NULL COMMENT '信号类型',
rrs_risk TINYINT NOT NULL COMMENT '风险等级',
rrs_p_type VARCHAR(255) DEFAULT NULL COMMENT '投组类型',
rrs_p_weight JSON DEFAULT NULL COMMENT '投组信息',
rrs_effective TINYINT NOT NULL DEFAULT 0 COMMENT '是否生效',
rrs_create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
rrs_update_time DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (rrs_id),
INDEX (rrs_date),
INDEX (rrs_type),
INDEX (rrs_risk)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT '再平衡信号表';
CREATE TABLE IF NOT EXISTS robo_weight_drift
(
rwd_id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
rwd_date DATETIME NOT NULL COMMENT '日期',
rwd_risk TINYINT NOT NULL COMMENT '风险等级',
rwd_weight DOUBLE NOT NULL COMMENT '高风险资产权重',
rwd_drift DOUBLE NOT NULL COMMENT '资产权重漂移计算值',
rwd_create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
rwd_update_time DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (rwd_id),
UNIQUE INDEX (rwd_date, rwd_risk),
INDEX (rwd_risk)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT '高风险资产权重漂移表';
from py_jftech import read, write, where, format_date, mapper_columns, to_tuple
from api import SignalType, PortfoliosRisk
__COLUMNS__ = {
'rrs_id': 'id',
'rrs_date': 'date',
'rrs_type': 'type',
'rrs_risk': 'risk',
'rrs_p_type': 'portfolio_type',
'rrs_p_weight': 'portfolio',
'rrs_effective': 'effective',
}
@read
def get_list(min_date=None, max_date=None, risk: PortfoliosRisk = None, type: SignalType = None, effective: bool = None):
sqls = []
if min_date:
sqls.append(f"rrs_date >= '{format_date(min_date)}'")
if max_date:
sqls.append(f"rrs_date <= '{format_date(max_date)}'")
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
{where(*sqls, rrs_risk=risk, rrs_type=type, rrs_effective=effective)} order by rrs_risk, rrs_date
'''
@read
def get_by_ids(ids):
return f'''select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal {where(rrs_id=to_tuple(ids))}'''
@read(one=True)
def get_by_id(id):
return f'''select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal {where(rrs_id=id)}'''
@read(one=True)
def get_one(type: SignalType, risk: PortfoliosRisk, date):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
{where(rrs_date=date, rrs_type=type, rrs_risk=risk)}
'''
@read(one=True)
def get_first_after(type: SignalType, risk: PortfoliosRisk, min_date, effective=None):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
{where(f"rrs_date >= '{format_date(min_date)}'", rrs_type=type, rrs_risk=risk, rrs_effective=effective)} order by rrs_date limit 1
'''
@read(one=True)
def get_last_one(max_date, risk: PortfoliosRisk, type: SignalType = None, effective=None):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
{where(f"rrs_date <= '{format_date(max_date)}'", rrs_type=type, rrs_risk=risk, rrs_effective=effective)} order by rrs_date desc limit 1
'''
def get_count(risk: PortfoliosRisk = None, day=None, effective=None):
@read(one=True)
def exec():
return f"select count(*) as `count` from robo_rebalance_signal {where(rrs_risk=risk, rrs_date=day, rrs_effective=effective)}"
result = exec()
return result['count']
@write
def insert(datas):
datas = mapper_columns(datas=datas, columns=__COLUMNS__)
return f'''
insert into robo_rebalance_signal({','.join([x for x in datas.keys()])})
values ({','.join([f"'{x[1]}'" for x in datas.items()])})
'''
@write
def update(id, datas):
datas = mapper_columns(datas=datas, columns=__COLUMNS__)
return f'''
update robo_rebalance_signal
set {','.join([f"{x[0]} = '{x[1]}'" for x in datas.items()])}
where rrs_id = {id}
'''
@write
def delete_by_id(id):
return f"delete from robo_rebalance_signal where rrs_id = {id}"
@write
def delete(min_date=None, risk: PortfoliosRisk = None):
if min_date is None and risk is None:
return 'truncate table robo_rebalance_signal'
else:
sql = f"rrs_date >= '{format_date(min_date)}'" if min_date else None
return f"delete from robo_rebalance_signal {where(sql, rrs_risk=risk)}"
from py_jftech import read, write, where, format_date, mapper_columns
from api import PortfoliosRisk
__COLUMNS__ = {
'rwd_id': 'id',
'rwd_date': 'date',
'rwd_risk': 'risk',
'rwd_weight': 'weight',
'rwd_drift': 'drift',
}
@read(one=True)
def get_one(day, risk: PortfoliosRisk):
return f"select {','.join([f'{x[0]} as {x[1]}' for x in __COLUMNS__.items()])} from robo_weight_drift {where(rwd_date=day, rwd_risk=risk)}"
@read(one=True)
def get_last_one(max_date, risk: PortfoliosRisk):
return f'''
select {','.join([f'{x[0]} as {x[1]}' for x in __COLUMNS__.items()])} from robo_weight_drift
{where(f"rwd_date <= '{format_date(max_date)}'", rwd_risk=risk)} order by rwd_date desc limit 1
'''
@write
def insert(datas):
datas = mapper_columns(datas=datas, columns=__COLUMNS__)
return f'''
insert into robo_weight_drift({','.join([x for x in datas.keys()])})
values ({','.join([f"'{x[1]}'" for x in datas.items()])})
'''
@write
def lock():
return "lock tables robo_weight_drift read"
@write
def unlock():
return "UNLOCK TABLES"
from py_jftech import component, autowired, get_config, workday_range, next_workday
from api import DriftSolver, PortfoliosRisk, PortfoliosBuilder, Datum, RoboExecutor
from rebalance.dao import robo_rebalance_signal as rrs, robo_weight_drift as rwd
@component(bean_name='date-curve')
class DateCurve(DriftSolver):
def __init__(self):
self._config = get_config(__name__)['date-curve']
@property
def diff_threshold(self):
return self._config['diff-threshold']
@property
def init_factor(self):
return self._config['init-factor']
def get_drift(self, day, risk: PortfoliosRisk):
last_re = rrs.get_last_one(max_date=day, risk=risk, effective=True)
result = self.diff_threshold - self.init_factor * (day - last_re['date']).days ** 4
return max(0, result)
@component(bean_name='high-weight')
class PortfolioHighWeight(DriftSolver):
@autowired(names={'executor': RoboExecutor.use_name()})
def __init__(self, builder: PortfoliosBuilder = None, datum: Datum = None, executor: RoboExecutor = None):
self._builder = builder
self._datum = datum
self._executor = executor
self._config = get_config(__name__)['high-weight']
@property
def drift_coef(self):
return self._config['coef']
def get_drift(self, day, risk: PortfoliosRisk):
drift = rwd.get_one(day, risk)
if not drift:
datum_ids = [x['id'] for x in self._datum.get_high_risk_datums(risk)]
last_one = rwd.get_last_one(max_date=day, risk=risk)
start = (next_workday(last_one['date'])) if last_one else self._executor.start_date
last_drift = last_one['drift'] if last_one else 0
for date in workday_range(start, day):
portfolio = self._builder.get_portfolios(date, risk)
weight = round(sum([x[1] for x in portfolio.items() if x[0] in datum_ids]), 2)
last_drift = round((weight * self.drift_coef + (1 - self.drift_coef) * last_drift) if last_drift else weight, 2)
rwd.insert({
'date': date,
'risk': risk,
'weight': weight,
'drift': last_drift,
})
drift = rwd.get_last_one(day, risk)
return drift['drift']
import json
from datetime import datetime as dt
from typing import List, Dict
from functools import reduce
import pandas as pd
from py_jftech import component, autowired, get_config, workday_range, next_workday, to_tuple, prev_workday
from api import RebalanceRuler, PortfoliosRisk, RebalanceSignal, SignalType, PortfoliosType, PortfoliosHolder, RoboReportor, Datum, DatumType
from rebalance.dao import robo_rebalance_signal as rrs
@component
class LevelRebalanceRuler(RebalanceRuler):
'''
定义:
1.定义所有调仓类型为非NORMAL类型的信号为清仓信号
2.定义所有调仓类型为NORMAL类型的信号为加仓信号
3.定义持久信号为上次选用调仓的信号时间到当前时间内,该信号都有效
4.定义临时信号为仅当天有效
规则:
1.所有清仓信号为持久信号,所有加仓信号为临时信号
2.对于持久信号规则如下:
2.1 上一次选用信号到当前时间内,是否有持久信号
2.2 如果有,则看级别是否高于上一次选用信号
2.3 如果高于,则输出该信号
3.如果没有持久信号,则从临时信号中根据级别排序找出第一个,作为输出信号
'''
@autowired
def __init__(self, signals: List[RebalanceSignal] = None, hold: PortfoliosHolder = None):
self._signals = signals
self._hold = hold
self._config = get_config(__name__)
@property
def disable_period(self):
result = self._config['disable-period']
if isinstance(result, dict):
return {PortfoliosType(x[0]): x[1] for x in result.items()}
else:
return {t: result for t in PortfoliosType}
def without_disable_period(self, day, risk: PortfoliosRisk) -> bool:
last_re = rrs.get_last_one(max_date=day, risk=risk, effective=True)
if SignalType(last_re['type']).p_type in self.disable_period:
return len(workday_range(last_re['date'], day)) > self.disable_period[SignalType(last_re['type']).p_type]
return False
def take_next_signal(self, day, risk: PortfoliosRisk):
last_re = rrs.get_last_one(max_date=day, risk=risk, effective=True)
if not last_re:
builder = [x for x in self._signals if x.signal_type is SignalType.INIT][0]
return builder.get_signal(day, risk)
risk_signals = [x for x in self._signals if x.signal_type.p_type is not PortfoliosType.NORMAL]
buy_signals = [x for x in self._signals if x.signal_type.p_type is PortfoliosType.NORMAL]
last_signal = rrs.get_last_one(max_date=day, risk=risk)
start = next_workday(last_signal['date'])
signals = rrs.get_list(min_date=last_re['date'], risk=risk, effective=False)
signals = {SignalType(x['type']): x for x in signals if SignalType(x['type']) is not SignalType.NONE and SignalType(x['type']).level > SignalType(last_re['type']).level}
while start <= day:
# 检查风控信号
today_signals = {x.signal_type: x.get_signal(start, risk) for x in risk_signals if x.signal_type.level <= SignalType(last_re['type']).level}
today_signals = {x[0]: x[1] for x in today_signals.items() if x[1] is not None}
signals = {**signals, **today_signals}
start = next_workday(start)
# 上次实际调仓类型为危机信号,本次危机信号不调仓
if signals and SignalType(last_re['type']) in [SignalType.CRISIS_ONE, SignalType.CRISIS_TWO]:
signals = {x[0]: x[1] for x in signals.items() if x[0] not in [SignalType.CRISIS_ONE, SignalType.CRISIS_TWO]}
# 检查买入信号,只有当天需要检查
if not signals and self.without_disable_period(day, risk):
signals = {x.signal_type: x.get_signal(day, risk) for x in buy_signals}
signals = {x[0]: x[1] for x in signals.items() if x[1] is not None}
if signals:
if SignalType(last_signal['type']) is SignalType.NONE:
rrs.delete_by_id(last_signal['id'])
return signals[sorted(signals.keys(), key=lambda x: x.level)[0]]
if SignalType(last_signal['type']) is SignalType.NONE:
rrs.update(last_signal['id'], {'date': day})
else:
rrs.insert({
'date': day,
'type': SignalType.NONE,
'risk': risk
})
return None
def get_signal_type(self, sign_id) -> SignalType | Dict[int, SignalType]:
sign_id = to_tuple(sign_id)
if len(sign_id) > 1:
return {x['id']: SignalType(x['type']) for x in rrs.get_by_ids(sign_id)}
else:
signal = rrs.get_by_id(sign_id[0])
return SignalType(signal['type']) if signal else None
def get_signal_date(self, sign_id):
sign_id = to_tuple(sign_id)
if len(sign_id) > 1:
return {x['id']: x['date'] for x in rrs.get_by_ids(sign_id)}
else:
signal = rrs.get_by_id(sign_id[0])
return signal['date'] if signal else None
def commit_signal(self, sign_id):
rrs.update(sign_id, {'effective': True})
def clear_signal(self, day=None, risk: PortfoliosRisk = None):
rrs.delete(min_date=day, risk=risk)
@component(bean_name='signal-report')
class SignalReportor(RoboReportor):
@autowired
def __init__(self, hold: PortfoliosHolder = None, datum: Datum = None):
self._hold = hold
self._datum = datum
@property
def report_name(self) -> str:
return '调仓信号'
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
result = []
datums = {str(x['id']): x for x in self._datum.get_datums(type=DatumType.FUND, exclude=False)}
for signal in rrs.get_list(max_date=max_date, min_date=min_date, effective=True):
rebalance_date = self._hold.get_rebalance_date_by_signal(signal['id'])
for fund_id, weight in json.loads(signal['portfolio']).items():
result.append({
'risk': PortfoliosRisk(signal['risk']).name,
'type': SignalType(signal['type']).name,
'signal_date': signal['date'],
'rebalance_date': rebalance_date,
'portfolio_type': PortfoliosType(signal['portfolio_type']).name,
'ft_ticker': datums[fund_id]['ftTicker'],
'blooberg_ticker': datums[fund_id]['bloombergTicker'],
'fund_name': datums[fund_id]['chineseName'],
'weight': weight
})
return result
@component(bean_name='daily-signal-report')
class DailySignalReportor(RoboReportor):
@autowired
def __init__(self, hold: PortfoliosHolder = None, datum: Datum = None):
self._hold = hold
self._datum = datum
@property
def report_name(self) -> str:
return '每日调仓信号'
def load_report(self, max_date=prev_workday(dt.today()), min_date=None) -> List[dict]:
signals = pd.DataFrame(rrs.get_list(max_date=max_date, min_date=min_date))
signals = signals[(signals['date'].dt.date == max_date.date()) & (signals['type'] != SignalType.NONE.value)]
if not signals.empty:
datum_ids = reduce(lambda x, y: x | y, signals['portfolio'].apply(lambda x: set(json.loads(x).keys())))
datums = pd.DataFrame(self._datum.get_datums(type=DatumType.FUND, datum_ids=datum_ids))
datums.set_index('id', inplace=True)
signals['risk'] = signals.apply(lambda row: PortfoliosRisk(row['risk']).name, axis=1)
signals['rebalance_type'] = signals.apply(lambda row: SignalType(row['type']).name, axis=1)
signals['portfolio_type'] = signals.apply(lambda row: PortfoliosType(row['portfolio_type']).name, axis=1)
signals['portfolio'] = signals.apply(lambda row: [x for x in json.loads(row['portfolio']).items()], axis=1)
signals = signals.explode('portfolio', ignore_index=True)
signals['weight'] = signals.apply(lambda row: row['portfolio'][1], axis=1)
signals['asset_ids'] = signals.apply(lambda row: datums.loc[int(row['portfolio'][0])]['ftTicker'], axis=1)
signals['name'] = signals.apply(lambda row: datums.loc[int(row['portfolio'][0])]['chineseName'], axis=1)
signals['lipper_id'] = signals.apply(lambda row: datums.loc[int(row['portfolio'][0])]['lipperKey'], axis=1)
signals = signals[['risk', 'date', 'rebalance_type', 'asset_ids', 'lipper_id', 'name', 'weight']]
return signals.to_dict('records')
return []
from abc import ABC
import pandas as pd
from dateutil.relativedelta import relativedelta
from py_jftech import get_config, autowired, component
from api import PortfoliosRisk, SignalType, Navs
from rebalance.base_signal import BaseRebalanceSignal
from rebalance.dao import robo_rebalance_signal as rrs
class CrisisSignal(BaseRebalanceSignal, ABC):
@autowired
def __init__(self, navs: Navs = None):
super().__init__()
self._navs = navs
self._config = get_config(__name__)
@property
def exp_init(self):
return pd.to_datetime(self._config['exp-init']) if 'exp-init' in self._config else None
@property
def exp_years(self):
return self._config['exp-years'] if 'exp-years' in self._config else 1
@property
def inversion_years(self):
return self._config['inversion-years'] if 'inversion-years' in self._config else 1
@property
def inversion_threshold(self):
return self._config['inversion-threshold'] if 'inversion-threshold' in self._config else 0.3
def get_exp_start_date(self, day, risk: PortfoliosRisk):
assert day, "get crisis exp start date, day can not be none"
assert risk, "get crisis exp start date, PortfoliosRisk can not be none"
exp_date = day - relativedelta(years=self.exp_years)
if self.exp_init and self.exp_init >= exp_date:
return self.exp_init
exp_signal = rrs.get_first_after(type=SignalType.CRISIS_EXP, risk=risk, min_date=exp_date)
if not exp_signal:
inversion_date = day - relativedelta(years=self.inversion_years)
ten_before = self._navs.get_last_index_close(max_date=inversion_date, ticker='USGG10YR Index')
ten_today = self._navs.get_last_index_close(max_date=day, ticker='USGG10YR Index')
two_before = self._navs.get_last_index_close(max_date=inversion_date, ticker='USGG2YR Index')
two_today = self._navs.get_last_index_close(max_date=day, ticker='USGG2YR Index')
if ten_today['close'] - two_today['close'] <= ten_before['close'] - two_before['close'] and \
ten_today['close'] - two_today['close'] <= self.inversion_threshold:
last_signal = rrs.get_last_one(max_date=day, risk=risk)
if SignalType(last_signal['type']) is SignalType.NONE:
rrs.update(last_signal['id'], {
'date': day,
'type': SignalType.CRISIS_EXP,
})
else:
rrs.insert({
'date': day,
'type': SignalType.CRISIS_EXP,
'risk': risk,
})
exp_signal = rrs.get_first_after(type=SignalType.CRISIS_EXP, risk=risk, min_date=exp_date)
return exp_signal['date'] if exp_signal else None
@component(bean_name='crisis_one')
class LastRateCrisisOneSignal(CrisisSignal, BaseRebalanceSignal):
'''
(close / 850ma – 1) < threshold,threshold=-0.05、-0.1
'''
@property
def mean_count(self):
return self._config['crisis-1']['mean-count']
@property
def threshold(self):
return self._config['crisis-1']['threshold']
@property
def signal_type(self):
return SignalType.CRISIS_ONE
def is_trigger(self, day, risk: PortfoliosRisk) -> bool:
exp_date = self.get_exp_start_date(day, risk)
if exp_date:
crisis_one = rrs.get_first_after(type=SignalType.CRISIS_ONE, risk=risk, min_date=exp_date)
if not crisis_one:
spx = self._navs.get_last_index_close(max_date=day, ticker='SPX Index', count=self.mean_count)
spx = pd.DataFrame(spx)
spx.sort_values(by='date', inplace=True)
return spx.iloc[-1]['close'] / spx['close'].mean() - 1 < self.threshold
return False
@component(bean_name='crisis_two')
class CrisisTwoSignal(CrisisSignal, BaseRebalanceSignal):
@property
def negative_growth_years(self):
return self._config['crisis-2']['negative-growth']
@property
def fed_months(self):
return self._config['crisis-2']['fed-months']
@property
def fed_threshold(self):
return self._config['crisis-2']['fed-threshold']
@property
def signal_type(self):
return SignalType.CRISIS_TWO
def is_trigger(self, day, risk: PortfoliosRisk) -> bool:
exp_date = self.get_exp_start_date(day, risk)
if exp_date:
crisis_two = rrs.get_first_after(type=SignalType.CRISIS_TWO, risk=risk, min_date=exp_date)
if not crisis_two:
ng_date = day - relativedelta(years=self.negative_growth_years)
ten_today = self._navs.get_last_index_close(max_date=day, ticker='USGG10YR Index')
cpi_today = self._navs.get_last_eco_values(max_date=day, ticker='CPI YOY Index', by_release_date=True)
ten_before = self._navs.get_last_index_close(max_date=ng_date, ticker='USGG10YR Index')
cpi_before = self._navs.get_last_eco_values(max_date=ng_date, ticker='CPI YOY Index', by_release_date=True)
before = ten_before['close'] - cpi_before['indicator']
today = ten_today['close'] - cpi_today['indicator']
fed_today = self._navs.get_last_eco_values(max_date=day, ticker='FDTR Index', by_release_date=True)
fed_before = self._navs.get_last_eco_values(max_date=day - relativedelta(months=self.fed_months), ticker='FDTR Index', by_release_date=True)
return today <= before and fed_today['indicator'] - fed_before['indicator'] < self.fed_threshold
return False
from py_jftech import component, autowired
from dateutil.relativedelta import relativedelta
from api import PortfoliosRisk, SignalType, Datum, PortfoliosHolder, DriftSolver
from rebalance.base_signal import BaseRebalanceSignal
from rebalance.dao import robo_rebalance_signal as rrs
@component(bean_name='curve-drift')
class CurveDrift(BaseRebalanceSignal):
@autowired(names={'solver': 'date-curve'})
def __init__(self, datum: Datum = None, hold: PortfoliosHolder = None, solver: DriftSolver = None):
super().__init__()
self._datum = datum
self._hold = hold
self._solver = solver
@property
def exclude_last_type(self):
return [
SignalType.CRISIS_ONE,
SignalType.CRISIS_TWO,
SignalType.MARKET_RIGHT,
SignalType.INIT
]
def is_trigger(self, day, risk: PortfoliosRisk) -> bool:
last_re = rrs.get_last_one(max_date=day, risk=risk, effective=True)
if last_re is None or SignalType(last_re['type']) in self.exclude_last_type:
return False
if last_re['date'] + relativedelta(days=120) <= day:
return True
hr_datums = self._datum.get_high_risk_datums(risk)
datum_ids = [x['id'] for x in hr_datums]
normal_portfolio = self._builder.get_portfolios(day, risk)
normal_weight = round(sum([x[1] for x in normal_portfolio.items() if x[0] in datum_ids]), 2)
hold_portfolio = self._hold.get_portfolios_weight(day, risk)
hold_weight = round(sum([x[1] for x in hold_portfolio.items() if x[0] in datum_ids]), 2)
return normal_weight - hold_weight >= self._solver.get_drift(day, risk)
@property
def signal_type(self) -> SignalType:
return SignalType.DRIFT_BUY
from py_jftech import component, autowired, get_config
from api import SignalType, PortfoliosRisk, DriftSolver
from rebalance.base_signal import BaseRebalanceSignal
from rebalance.dao import robo_rebalance_signal as rrs
@component(bean_name='high-buy')
class HighBuySignal(BaseRebalanceSignal):
@autowired(names={'solver': 'high-weight'})
def __init__(self, solver: DriftSolver = None):
super().__init__()
self._config = get_config(__name__)
self._solver = solver
@property
def include_last_type(self):
return [
SignalType.CRISIS_ONE,
SignalType.CRISIS_TWO,
SignalType.MARKET_RIGHT,
SignalType.LOW_BUY,
SignalType.INIT
]
@property
def signal_type(self) -> SignalType:
return SignalType.HIGH_BUY
def get_threshold(self, risk: PortfoliosRisk):
threshold = self._config['threshold']
if isinstance(threshold, dict):
threshold = threshold[f'ft{risk.value}']
return threshold
def is_trigger(self, day, risk: PortfoliosRisk) -> bool:
last_re = rrs.get_last_one(max_date=day, risk=risk, effective=True)
if last_re is None or SignalType(last_re['type']) not in self.include_last_type:
return False
drift = self._solver.get_drift(day, risk)
threshold = self.get_threshold(risk)
return drift >= threshold[1]
@component(bean_name='low-buy')
class LowBuySignal(HighBuySignal):
@property
def include_last_type(self):
return [
SignalType.CRISIS_ONE,
SignalType.CRISIS_TWO,
SignalType.MARKET_RIGHT,
SignalType.INIT
]
@property
def signal_type(self) -> SignalType:
return SignalType.LOW_BUY
def is_trigger(self, day, risk: PortfoliosRisk) -> bool:
last_re = rrs.get_last_one(max_date=day, risk=risk, effective=True)
if last_re is None or SignalType(last_re['type']) not in self.include_last_type:
return False
drift = self._solver.get_drift(day, risk)
threshold = self.get_threshold(risk)
return threshold[0] <= drift < threshold[1]
from py_jftech import component, autowired
from api import PortfoliosRisk, SignalType, RoboExecutor
from rebalance.base_signal import BaseRebalanceSignal
from rebalance.dao import robo_rebalance_signal as rrs
@component(bean_name='init')
class InitSignalBuilder(BaseRebalanceSignal):
@autowired(names={'executor': RoboExecutor.use_name()})
def __init__(self, executor: RoboExecutor = None):
super(InitSignalBuilder, self).__init__()
self._executor = executor
@property
def signal_type(self) -> SignalType:
return SignalType.INIT
def get_signal(self, day, risk: PortfoliosRisk):
signal = rrs.get_last_one(max_date=day, risk=risk, type=SignalType.INIT)
if signal:
return None if signal['effective'] else signal
return super().get_signal(self._executor.start_date, risk)
def is_trigger(self, day, risk: PortfoliosRisk) -> bool:
return True
import pandas as pd
from py_jftech import component, autowired, get_config
from scipy.stats import norm
from api import SignalType, PortfoliosRisk, Navs
from rebalance.base_signal import BaseRebalanceSignal
from rebalance.dao import robo_rebalance_signal as rrs
@component(bean_name='market-right')
class MarketRight(BaseRebalanceSignal):
@autowired
def __init__(self, navs: Navs = None):
super().__init__()
self._navs = navs
self._config = get_config(__name__)
@property
def rtn_days(self):
return self._config['rtn-days']
@property
def min_threshold(self):
return self._config['min-threshold']
@property
def coef(self):
return self._config['coef']
@property
def signal_type(self) -> SignalType:
return SignalType.MARKET_RIGHT
@property
def cvar_min_volume(self):
return self._config['cvar-min-volume']
@property
def exclude_last_type(self):
return [
SignalType.CRISIS_ONE,
SignalType.CRISIS_TWO,
SignalType.INIT,
SignalType.MARKET_RIGHT
]
def is_trigger(self, day, risk: PortfoliosRisk) -> bool:
last_re = rrs.get_last_one(risk=risk, max_date=day, effective=True)
if last_re is not None and SignalType(last_re['type']) in self.exclude_last_type:
return False
spx = self.load_spx_close_rtns(day)
if spx[-1]['rtn'] > self.min_threshold:
return False
cvar = self.get_cvar(day, risk, spx=spx)
return cvar is not None and spx[-1]['rtn'] < cvar
def get_cvar(self, day, risk: PortfoliosRisk, spx=None):
if spx is None:
spx = self.load_spx_close_rtns(day)
start_date = self.find_cvar_start_date(day, risk, spx=spx)
if start_date:
spx = pd.DataFrame(spx)
spx = spx[(spx.date >= start_date) & (spx.date <= day)]
if len(spx) >= self.cvar_min_volume:
alpha = round(1 - self.coef, 2)
mean = spx.rtn.mean()
std = spx.rtn.std()
return mean - std * norm.pdf(norm.ppf(alpha)) / alpha
return None
def find_cvar_start_date(self, day, risk: PortfoliosRisk, spx=None):
if spx is None:
spx = self.load_spx_close_rtns(day)
spx = pd.DataFrame(spx)
last_right = rrs.get_last_one(type=(SignalType.MARKET_RIGHT, SignalType.INIT), max_date=day, risk=risk,
effective=True)
last_buy = rrs.get_first_after(type=(SignalType.LOW_BUY, SignalType.HIGH_BUY), risk=risk, effective=True,
min_date=last_right['date'])
if not last_buy or not last_right or last_buy['date'] <= last_right['date']:
return None
spx = spx[(spx['date'] >= last_right['date']) & (spx['date'] <= last_buy['date'])]
if not spx.empty and len(spx) > 2:
return spx.loc[spx.close.idxmin()].date
return None
def load_spx_close_rtns(self, day):
spx = pd.DataFrame(self._navs.get_index_close(ticker='SPX Index', max_date=day))
spx.sort_values('date', inplace=True)
spx['rtn'] = spx['close'] / spx['close'].shift(self.rtn_days) - 1
spx.dropna(inplace=True)
spx = spx[['date', 'close', 'rtn']]
return spx.to_dict('records')
import logging
import unittest
from dateutil.relativedelta import relativedelta
from py_jftech import autowired, parse_date, to_str, next_workday
from api import RebalanceSignal, PortfoliosRisk, RebalanceRuler, RoboReportor
logger = logging.getLogger(__name__)
class RebalanceTest(unittest.TestCase):
@autowired(names={'builder': 'crisis_one'})
def test_crisis_one(self, builder: RebalanceSignal = None):
start = parse_date('2008-03-12')
end = start + relativedelta(years=3)
while start < end:
signal = builder.is_trigger(start, PortfoliosRisk.FT9)
if signal:
logger.info(start)
start = next_workday(start)
@autowired(names={'builder': 'crisis_two'})
def test_crisis_two(self, builder: RebalanceSignal = None):
start = parse_date('2008-01-02')
end = start + relativedelta(years=3)
while start < end:
signal = builder.is_trigger(start, PortfoliosRisk.FT9)
if signal:
logger.info(start)
start = next_workday(start)
@autowired(names={'builder': 'market-right'})
def test_market_right(self, builder: RebalanceSignal = None):
signal = builder.get_signal(parse_date('2008-01-07'), PortfoliosRisk.FT9)
logger.info(signal)
@autowired(names={'builder': 'curve-drift'})
def test_curve_drift(self, builder: RebalanceSignal = None):
signal = builder.get_signal(parse_date('2022-11-07'), PortfoliosRisk.FT3)
logger.info(signal)
@autowired(names={'builder': 'high-buy'})
def test_high_buy(self, builder: RebalanceSignal = None):
builder.get_signal(parse_date('2022-09-10'), PortfoliosRisk.FT3)
@autowired
def test_rebalance_builder(self, builder: RebalanceRuler = None):
builder.take_next_signal(parse_date('2020-04-29'), PortfoliosRisk.FT9)
@autowired(names={'reportor': 'signal-report'})
def test_signal_report(self, reportor: RoboReportor = None):
result = reportor.load_report()
logger.info(to_str(result, show_line=10))
@autowired(names={'reportor': 'daily-signal-report'})
def test_daily_signal_report(self, reportor: RoboReportor = None):
result = reportor.load_report(max_date=parse_date('2022-11-21'))
logger.info(to_str(result, show_line=10))
@autowired
def test_clear_signal(self, ruler: RebalanceRuler = None):
ruler.clear_signal()
if __name__ == '__main__':
unittest.main()
from datetime import datetime as dt, timedelta
from typing import List
import pandas as pd
from py_jftech import component, autowired, prev_workday, filter_weekend, next_workday, get_config
from api import RoboReportor, PortfoliosRisk, RoboExecutor, Navs, Datum, DatumType
from reports.dao import robo_benckmark as rb
@component(bean_name='benckmark-report')
class BenchmarkReportor(RoboReportor):
@autowired(names={'executor': RoboExecutor.use_name()})
def __init__(self, executor: RoboExecutor = None, navs: Navs = None, datum: Datum = None):
self._exec = executor
self._navs = navs
self._datum = datum
self._config = get_config(__name__)
@property
def report_name(self) -> str:
return 'benchmark'
@property
def risks(self):
return self._config['stock-rate'].keys()
@property
def init_amount(self):
return self._config['init-amount']
def stock_rate(self, risk):
return self._config['stock-rate'][risk]
def load_nav_rtn(self, risk, day):
last = rb.get_last_one(risk=risk, max_date=day, re=True)
start_date = next_workday(last['date']) if last else self._exec.start_date
datums = {x['id']: x for x in self._datum.get_datums(type=DatumType.FUND)}
navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=tuple(datums.keys()), min_date=prev_workday(start_date - timedelta(10)), max_date=day))
navs = navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
navs.fillna(method='ffill', inplace=True)
nav_index = navs.shape[1]
for i in range(nav_index):
navs[f'rtn_{navs.columns[i]}'] = navs[navs.columns[i]] / navs[navs.columns[i]].shift() - 1
navs = navs[navs.index >= start_date]
return navs, nav_index
def find_datum_asset(self):
return {x['id']: x['assetType'] for x in self._datum.get_datums(type=DatumType.FUND)}
def build_benchmark(self, risk, day=dt.today()):
nav_rtn, nav_index = self.load_nav_rtn(risk=risk, day=day)
asset_types = {x['id']: x['assetType'] for x in self._datum.get_datums(type=DatumType.FUND)}
last = rb.get_last_one(risk=risk, max_date=day, re=True)
init_amount = last['nav'] if last else self.init_amount
stock_rate = self.stock_rate(risk)
other_rate = 1 - stock_rate
five_rtn = 0
last_day = None
fund_ids = None
for index, row in nav_rtn.iterrows():
if last_day is None or fund_ids is None:
fund_ids = list(row.iloc[:nav_index].dropna().index)
stock_count = len([x for x in fund_ids if asset_types[x] == 'STOCK'])
stock_average = init_amount * stock_rate / stock_count
other_average = init_amount * other_rate / (len(fund_ids) - stock_count)
nav_rtn.loc[index, f'{risk}_result'] = init_amount
nav_rtn.loc[index, f'{risk}_re'] = 1
for fund_id in fund_ids:
if fund_id and asset_types[fund_id] == 'STOCK':
nav_rtn.loc[index, f'stock_{fund_id}'] = stock_average
else:
nav_rtn.loc[index, f'other_{fund_id}'] = other_average
else:
for fund_id in fund_ids:
if fund_id and asset_types[fund_id] == 'STOCK':
nav_rtn.loc[index, f'stock_{fund_id}'] = nav_rtn.loc[last_day, f'stock_{fund_id}'] * (
1 + nav_rtn.loc[index, f'rtn_{fund_id}'])
else:
nav_rtn.loc[index, f'other_{fund_id}'] = nav_rtn.loc[last_day, f'other_{fund_id}'] * (
1 + nav_rtn.loc[index, f'rtn_{fund_id}'])
nav_rtn.loc[index, f'{risk}_result'] = nav_rtn.loc[index][-len(fund_ids):].sum()
nav_rtn.loc[index, f'{risk}_re'] = 0
if five_rtn == 5:
five_rtn = 0
fund_ids = list(row.iloc[:nav_index].dropna().index)
stock_count = len([x for x in fund_ids if asset_types[x] == 'STOCK'])
stock_average = nav_rtn.loc[index, f'{risk}_result'] * stock_rate / stock_count
other_average = nav_rtn.loc[index, f'{risk}_result'] * other_rate / (len(fund_ids) - stock_count)
nav_rtn.loc[index, f'{risk}_re'] = 1
for fund_id in fund_ids:
if fund_id and asset_types[fund_id] == 'STOCK':
nav_rtn.loc[index, f'stock_{fund_id}'] = stock_average
else:
nav_rtn.loc[index, f'other_{fund_id}'] = other_average
five_rtn += 1
last_day = index
result = nav_rtn.reindex(columns=[f'{risk}_result', f'{risk}_re'])
result.reset_index(inplace=True)
result['risk'] = risk
result.rename(columns={f'{risk}_result': 'nav', f'{risk}_re': 're', 'nav_date': 'date'}, inplace=True)
result['nav'] = round(result['nav'], 4)
return result.to_dict('records')
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
for risk in self.risks:
last = rb.get_last_one(max_date=max_date, risk=risk)
if not last or last['date'] < filter_weekend(max_date):
benchmarks = pd.DataFrame(self.build_benchmark(risk=risk, day=max_date))
if last:
benchmarks = benchmarks[benchmarks.date > last['date']]
if not benchmarks.empty:
rb.batch_insert(benchmarks.to_dict('records'))
result = pd.DataFrame(rb.get_list(max_date=max_date, min_date=min_date))
result = result.pivot_table(index='date', columns='risk', values='nav')
result.reset_index(inplace=True)
return result.to_dict('records')
from datetime import datetime as dt
from typing import List
import pandas as pd
from py_jftech import component, autowired
from empyrical import annual_return, annual_volatility, max_drawdown, sharpe_ratio
from api import RoboReportor, Navs
@component(bean_name='combo-report')
class ComboDatasReport(RoboReportor):
@autowired(names={'hold_reportor': 'hold-report', 'benchmark': 'benckmark-report'})
def __init__(self, hold_reportor: RoboReportor = None, benchmark: RoboReportor = None, navs: Navs = None):
self._hold_reportor = hold_reportor
self._benchmark = benchmark
self._navs = navs
@property
def report_name(self) -> str:
return '混合数据'
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
holds = pd.DataFrame(self._hold_reportor.load_report(max_date=max_date, min_date=min_date))
if not holds.empty:
holds['risk'] = holds.apply(lambda row: row.risk, axis=1)
datas = holds.pivot_table(index='date', columns='risk', values='nav')
benchmark = pd.DataFrame(self._benchmark.load_report(max_date=max_date, min_date=min_date))
datas = datas.join(benchmark.set_index('date'))
spx = pd.DataFrame(self._navs.get_index_close(ticker='SPX Index', min_date=min_date, max_date=max_date))
spx = spx.pivot_table(index='date', columns='index_id', values='close')
spx.columns = ['SPX']
datas = datas.join(spx)
datas.fillna(method='ffill', inplace=True)
datas.reset_index(inplace=True)
return datas.to_dict('records')
return []
CREATE TABLE IF NOT EXISTS robo_benchmark
(
rb_id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
rb_date DATETIME NOT NULL COMMENT '日期',
rb_risk VARCHAR(255) NOT NULL COMMENT '风险等级',
rb_nav DOUBLE(16, 4) NOT NULL COMMENT '资产值',
rb_re TINYINT NOT NULL DEFAULT 0 COMMENT '是否再分配',
rb_create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
rb_update_time DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (rb_id),
UNIQUE INDEX (rb_date, rb_risk),
INDEX (rb_risk)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT 'BENCHMARK数据表';
\ No newline at end of file
from py_jftech import read, write, where, mapper_columns, format_date
__COLUMNS__ = {
'rb_id': 'id',
'rb_date': 'date',
'rb_risk': 'risk',
'rb_nav': 'nav',
'rb_re': 're',
}
@write
def batch_insert(datas):
datas = [mapper_columns(x, __COLUMNS__) for x in datas]
values = ','.join([f'''({','.join([(f"'{x[j]}'" if j in x and x[j] is not None else 'null') for j in __COLUMNS__.keys() if j != 'rb_id'])})''' for x in datas])
return f'''insert into robo_benchmark({','.join([x for x in __COLUMNS__.keys() if x != 'rb_id'])}) values {values}'''
@read(one=True)
def get_last_one(max_date=None, risk=None, re: bool = None):
sql = f"rb_date <= '{format_date(max_date)}'" if max_date else None
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_benchmark
{where(sql, rb_risk=risk, rb_re=re)} order by rb_date desc limit 1
'''
@read
def get_list(max_date=None, min_date=None, risk=None, re: bool = None):
sqls = []
if max_date:
sqls.append(f"rb_date <= '{format_date(max_date)}'")
if min_date:
sqls.append(f"rb_date >= '{format_date(min_date)}'")
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_benchmark
{where(*sqls, rb_risk=risk, rb_re=re)} order by rb_risk, rb_date
'''
import os
from datetime import datetime as dt
from typing import List
from abc import abstractmethod, ABCMeta
from tempfile import TemporaryDirectory
from shutil import copyfile
from copy import deepcopy
import pandas as pd
from py_jftech import component, autowired, get_config, get_instance_name, get_project_path, format_date, sendmail
from api import RoboReportor, RoboExportor
def include_report():
return get_config(__name__)['include-report']
class DefaultExportor(RoboExportor):
@autowired
def __init__(self, reportors: List[RoboReportor] = None):
self._reportors = {get_instance_name(x): x for x in reportors}
def export(self, max_date=dt.today(), min_date=None):
if not self.include_report:
return None
with TemporaryDirectory() as tmpdir:
filename = f"{self.file_name}_{format_date(max_date)}.xlsx"
if min_date:
filename = f"{self.file_name}_{format_date(min_date)}_to_{format_date(max_date)}.xlsx"
filepath = os.path.join(tmpdir, filename)
with pd.ExcelWriter(filepath) as writer:
for reportor_name in self.include_report:
reportor = self._reportors[reportor_name]
datas = pd.DataFrame(reportor.load_report(max_date=max_date, min_date=min_date))
if not datas.empty:
datas.to_excel(writer, sheet_name=reportor.report_name, index=False)
email = self.get_email(filepath)
if email:
receives = email['receives']
copies = email['copies'] if 'copies' in email else []
attach_paths = [filepath]
subject = email['subject'].format(today=format_date(dt.today()), max_date=max_date, min_date=min_date)
content = email['content'].format(today=format_date(dt.today()), max_date=max_date, min_date=min_date)
sendmail(receives=receives, copies=copies, attach_paths=attach_paths, subject=subject, content=content)
if self.save_path is not None:
os.makedirs(self.save_path, exist_ok=True)
save_file = os.path.join(self.save_path, filename)
copyfile(filepath, save_file)
def get_email(self, file):
return deepcopy(self.config['email']) if 'email' in self.config else None
@property
def save_path(self):
if 'save-path' not in self.config:
return None
save_path: str = self.config['save-path']
if save_path.startswith('.'):
return os.path.abspath(os.path.join(os.path.dirname(__file__), save_path))
elif save_path.startswith('/'):
return os.path.abspath(save_path)
return os.path.abspath(os.path.join(get_project_path(), save_path))
@property
def exist_build(self):
return self.config['exist-build'] if 'exist-build' in self.config else False
@property
def file_name(self):
return self.config['file-name'] if 'file-name' in self.config else 'export'
@property
def include_report(self):
return self.config['include-report'] if 'include-report' in self.config else []
@property
@abstractmethod
def config(self):
pass
@component(bean_name='backtest-export')
class BacktestExportor(DefaultExportor):
def __init__(self):
super(BacktestExportor, self).__init__()
self.__config = deepcopy(get_config(__name__))
@property
def config(self):
return self.__config['backtest']
@component(bean_name='real-daily-export')
class RealDailyExportor(DefaultExportor):
@autowired(names={'signal_reportor': 'daily-signal-report'})
def __init__(self, signal_reportor: RoboReportor = None):
super(RealDailyExportor, self).__init__()
self.__config = get_config(__name__)
self._signal_reportor = signal_reportor
def get_email(self, file):
result = super(RealDailyExportor, self).get_email(file)
if result is None:
return None
content = pd.read_excel(file, sheet_name=None)
if self._signal_reportor.report_name in content:
result['subject'] = str(result['subject']['rebalance'])
result['content'] = result['content']['rebalance']
else:
result['subject'] = result['subject']['default']
result['content'] = result['content']['rebalance']
return result
@property
def config(self):
return self.__config['real-daily']
from datetime import datetime as dt
from typing import List
import pandas as pd
from py_jftech import component, autowired, get_config, format_date, filter_weekend
from api import RoboReportor
@component(bean_name='fixed-range-report')
class FixedRangeReport(RoboReportor):
@autowired(names={'combo': 'combo-report'})
def __init__(self, combo: RoboReportor = None):
self._combo = combo
self._config = get_config(__name__)
@property
def report_name(self) -> str:
return '固定区间收益率'
@property
def range_dates(self):
return self._config['range-dates']
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
datas = pd.DataFrame(self._combo.load_report(max_date=max_date, min_date=min_date))
datas.set_index('date', inplace=True)
if not datas.empty:
result = pd.DataFrame(columns=datas.columns)
for range in self.range_dates:
start = filter_weekend(range['start'])
end = filter_weekend(range['end'])
if not datas[start:end].empty:
row_name = f"{format_date(start)}~{format_date(end)}"
result.loc[row_name] = datas[start:end].values[-1] / datas[start:end].values[0] - 1
result = round(result, 4) * 100
result.reset_index(inplace=True)
result.rename(columns={'index': 'range-date'}, inplace=True)
return result.to_dict('records')
return []
from datetime import datetime as dt
from typing import List
import pandas as pd
from empyrical import annual_return, annual_volatility, max_drawdown, sharpe_ratio
from py_jftech import component, autowired
from api import RoboReportor
@component(bean_name='indicators-report')
class IndicatorsReportor(RoboReportor):
@autowired(names={'combo': 'combo-report'})
def __init__(self, combo: RoboReportor = None):
self._combo = combo
@property
def report_name(self) -> str:
return '指标'
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
datas = pd.DataFrame(self._combo.load_report(max_date=max_date, min_date=min_date))
datas.set_index('date', inplace=True)
if not datas.empty:
returns = round(datas.pct_change(), 5)
indicators = {
'annual_return': list(annual_return(returns, period='daily', annualization=None) * 100),
'annual_volatility': annual_volatility(returns, period='daily', annualization=None) * 100,
'max_drawdown': max_drawdown(returns, out=None) * 100,
'sharp': sharpe_ratio(returns, risk_free=0, period='daily', annualization=None),
}
indicators['calmar'] = abs(indicators['annual_return'] / indicators['max_drawdown'])
result = pd.DataFrame(indicators.values(), index=indicators.keys(), columns=list(returns.columns)).round(2)
result.reset_index(inplace=True)
result.rename(columns={'index': 'indicators'}, inplace=True)
return result.to_dict('records')
return []
from datetime import datetime as dt
from typing import List
import pandas as pd
from dateutil.relativedelta import relativedelta
from py_jftech import component, autowired, get_config, format_date, filter_weekend
from api import RoboReportor
@component(bean_name='relative-range-report')
class RelativeRangeReport(RoboReportor):
@autowired(names={'combo': 'combo-report'})
def __init__(self, combo: RoboReportor = None):
self._combo = combo
self._config = get_config(__name__)
@property
def report_name(self) -> str:
return '相对区间收益率'
@property
def range_dates(self):
return self._config['range-dates']
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
datas = pd.DataFrame(self._combo.load_report(max_date=max_date, min_date=min_date))
datas.set_index('date', inplace=True)
if not datas.empty:
result = pd.DataFrame(columns=datas.columns)
for range in self.range_dates:
kwargs = range.copy()
del kwargs['name']
start = filter_weekend(max_date - relativedelta(**kwargs))
end = filter_weekend(max_date)
row_name = f"{range['name']}({format_date(start)}~{format_date(end)})"
result.loc[row_name] = datas[start:end].values[-1] / datas[start:end].values[0] - 1
result = round(result, 4) * 100
result.reset_index(inplace=True)
result.rename(columns={'index': 'range-date'}, inplace=True)
return result.to_dict('records')
return []
import unittest
import logging
import tempfile
from datetime import datetime as dt
from py_jftech import autowired, to_str, parse_date, prev_workday
from api import RoboReportor, RoboExportor
logger = logging.getLogger(__name__)
class ReportTest(unittest.TestCase):
@autowired(names={'reportor': 'benckmark-report'})
def test_benchmark_report(self, reportor: RoboReportor = None):
result = reportor.load_report(max_date=parse_date('2022-11-01'))
logger.info(to_str(result))
@autowired(names={'reportor': 'indicators-report'})
def test_indicator_report(self, reportor: RoboReportor = None):
result = reportor.load_report(max_date=parse_date('2022-11-01'))
logger.info(to_str(result))
@autowired(names={'reportor': 'fixed-range-report'})
def test_fixed_report(self, reportor: RoboReportor = None):
result = reportor.load_report(max_date=parse_date('2022-11-01'))
logger.info(to_str(result))
@autowired(names={'reportor': 'relative-range-report'})
def test_relative_report(self, reportor: RoboReportor = None):
result = reportor.load_report(max_date=parse_date('2022-11-01'))
logger.info(to_str(result))
@autowired(names={'exportor': 'backtest-export'})
def test_backtest_export(self, exportor: RoboExportor = None):
exportor.export(max_date=parse_date('2022-11-01'))
@autowired(names={'exportor': 'real-daily-export'})
def test_daliy_export(self, exportor: RoboExportor = None):
exportor.export(max_date=prev_workday(dt.today()))
if __name__ == '__main__':
unittest.main()
import logging
import os
import sys
from concurrent.futures import wait
from datetime import datetime as dt
......@@ -9,12 +8,12 @@ from typing import List
import pandas as pd
from py_jftech import (
component, autowired, get_config, filter_weekend, asynchronized,
workday_range, format_date, prev_workday, parse_date, is_workday
parse_date
)
from api import (
RoboExecutor, AssetRisk, Datum, AssetPool, PortfoliosBuilder, DatumType, RoboExportor,
PortfoliosRisk, PortfoliosHolder, PortfoliosType, RebalanceRuler, DataSync
RoboExecutor, Datum, AssetPool, PortfoliosBuilder,
PortfoliosRisk, PortfoliosHolder, DataSync
)
logger = logging.getLogger(__name__)
......@@ -22,10 +21,9 @@ logger = logging.getLogger(__name__)
@unique
class BacktestStep(Enum):
EWMA_VALUE = 1
ASSET_POOL = 2
NORMAL_PORTFOLIO = 3
HOLD_PORTFOLIO = 4
ASSET_POOL = 1
NORMAL_PORTFOLIO = 2
HOLD_PORTFOLIO = 3
def within(self, step: Enum):
return self.value <= step.value
......@@ -37,19 +35,29 @@ class BacktestStep(Enum):
@component(bean_name='backtest')
class BacktestExecutor(RoboExecutor):
@autowired(names={'export': 'backtest-export'})
def __init__(self, risk: AssetRisk = None, datum: Datum = None, pool: AssetPool = None, syncs: List[DataSync] = None,
builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None, rule: RebalanceRuler = None, export: RoboExportor = None):
self._risk = risk
@autowired(names={'optimize': 'dividend'})
def __init__(self, datum: Datum = None, pool: AssetPool = None,
syncs: List[DataSync] = None,
builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None):
self._datum = datum
self._pool = pool
self._builder = builder
self._hold = hold
self._rule = rule
self._syncs = syncs
self._export = export
self._config = get_config(__name__)['backtest']
@staticmethod
def get_first_business_day(start_date, end_date):
# 生成日期范围并转换为DataFrame
dates = pd.date_range(start_date, end_date, freq='MS')
df = pd.DataFrame({'dates': dates})
# 提取每个月的第一个工作日
df['first_business_day'] = df['dates'].apply(
lambda x: pd.date_range(start=x, end=x + pd.offsets.MonthEnd(0), freq='B')[0]
)
# 返回第一个工作日列表
return list(df['first_business_day'])
@property
def start_date(self):
return pd.to_datetime(filter_weekend(self._config['start-date']))
......@@ -75,19 +83,16 @@ class BacktestExecutor(RoboExecutor):
return self._config['clean-up'] if 'clean-up' in self._config else True
def clear_datas(self):
if self.start_step.within(BacktestStep.EWMA_VALUE) and self.end_step.without(BacktestStep.EWMA_VALUE):
logger.info('start to clear fund ewma value'.center(50, '-'))
self._risk.clear()
if self.start_step.within(BacktestStep.ASSET_POOL) and self.end_step.without(BacktestStep.ASSET_POOL):
logger.info('start to clear asset pool'.center(50, '-'))
self._pool.clear()
if self.start_step.within(BacktestStep.NORMAL_PORTFOLIO) and self.end_step.without(BacktestStep.NORMAL_PORTFOLIO):
if self.start_step.within(BacktestStep.NORMAL_PORTFOLIO) and self.end_step.without(
BacktestStep.NORMAL_PORTFOLIO):
logger.info('start to clear normal portfolios'.center(50, '-'))
self._builder.clear()
if self.start_step.within(BacktestStep.HOLD_PORTFOLIO) and self.end_step.without(BacktestStep.HOLD_PORTFOLIO):
logger.info('start to clear hold portfolios'.center(50, '-'))
self._hold.clear()
self._rule.clear_signal()
def start_exec(self):
if self.is_sync_data:
......@@ -95,24 +100,19 @@ class BacktestExecutor(RoboExecutor):
sync.do_sync()
if self.is_clean_up:
self.clear_datas()
if self.start_step.within(BacktestStep.EWMA_VALUE) and self.end_step.without(BacktestStep.EWMA_VALUE):
logger.info("start to build fund ewma value.".center(50, '-'))
now = dt.now()
wait([self.async_build_risk_date(x['id']) for x in self._datum.get_datums(type=DatumType.FUND, risk=(3, 4, 5))])
logger.info(f"build fund ewma value success, use[{(dt.now() - now).seconds}s]")
if self.start_step.within(BacktestStep.ASSET_POOL) and self.end_step.without(BacktestStep.ASSET_POOL):
logger.info("start to build asset pool".center(50, '-'))
now = dt.now()
workdays = workday_range(self.start_date, self.end_date)
for date in workdays:
self._risk.get_risk_pool(date)
workdays = self.get_first_business_day(self.start_date, self.end_date)
for date in workdays:
self._pool.get_pool(date)
logger.info(f"build asset pool success, use[{(dt.now() - now).seconds}s]")
if self.start_step.within(BacktestStep.NORMAL_PORTFOLIO) and self.end_step.without(BacktestStep.NORMAL_PORTFOLIO):
logger.info(f"build asset pool success, use[{(dt.now() - now).seconds}s]")
if self.start_step.within(BacktestStep.NORMAL_PORTFOLIO) and self.end_step.without(
BacktestStep.NORMAL_PORTFOLIO):
logger.info("start to build normal portfolios".center(50, '-'))
now = dt.now()
wait([self.async_build_portfolios(day, risk) for risk in PortfoliosRisk for day in workday_range(self.start_date, self.end_date)])
wait([self.async_build_portfolios(day, risk) for risk in PortfoliosRisk for day in
self.get_first_business_day(self.start_date, self.end_date)])
logger.info(f"build normal portfolios success, use[{(dt.now() - now).seconds}s]")
if self.start_step.within(BacktestStep.HOLD_PORTFOLIO) and self.end_step.without(BacktestStep.HOLD_PORTFOLIO):
logger.info("start to build hold portfolios".center(50, '-'))
......@@ -120,9 +120,6 @@ class BacktestExecutor(RoboExecutor):
wait([self.async_build_hold(x) for x in PortfoliosRisk])
logger.info(f"build hold portfolios success, use[{(dt.now() - now).seconds}s]")
logger.info("start to export report".center(50, '-'))
now = dt.now()
file = self._export.export(max_date=self.end_date, min_date=self.start_date)
logger.info(f"report file[{os.path.basename(file)}] exported successfully. use[{(dt.now() - now).seconds}s].")
@asynchronized(isolate=True)
def async_build_risk_date(self, asset_id):
......@@ -141,11 +138,11 @@ class BacktestExecutor(RoboExecutor):
class RealExecutor(RoboExecutor):
@autowired
def __init__(self, builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None, syncs: List[DataSync] = None, ruler: RebalanceRuler = None):
def __init__(self, builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None, syncs: List[DataSync] = None,
):
self._builder = builder
self._hold = hold
self._syncs = syncs
self._ruler = ruler
self._config = get_config(__name__)['real']
@property
......@@ -173,18 +170,3 @@ class RealExecutor(RoboExecutor):
if self.is_sync_data:
for sync in self._syncs:
sync.do_sync()
date = self.curt_date
if is_workday(date) or date in self.include_date:
date = prev_workday(date)
for risk in PortfoliosRisk:
logger.info(f"start to build risk[{risk.name}] real for date[{format_date(date)}]".center(50, '-'))
now = dt.now()
# 因为每天都必须有NORMAL最优投组,不管用不用
self._builder.get_portfolios(date, risk)
self._hold.build_hold_portfolio(date, risk)
self._ruler.take_next_signal(date, risk)
# 如果当前持仓为风控投组,则还要计算风控投组,不管用不用
p_type = self._hold.get_portfolio_type(date, risk)
if p_type is not PortfoliosType.NORMAL:
self._builder.get_portfolios(date, risk, type=p_type)
logger.info(f"build risk[{risk.name}] real for date[{format_date(date)}] success, use[{(dt.now() - now).seconds}s]")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment