Commit 1a0b6814 authored by stephen.wang's avatar stephen.wang

Merge remote-tracking branch 'origin/dev-dividend' into dev-dividend

parents 4988594f 22ebf8c5
...@@ -55,6 +55,14 @@ class LoggerType(Enum): ...@@ -55,6 +55,14 @@ class LoggerType(Enum):
SIGNAL = 'signal' SIGNAL = 'signal'
@unique
class SignalType(Enum):
NORMAL = 1
SignalType.NORMAL.p_type = PortfoliosType.NORMAL
class DataSync(ABC): class DataSync(ABC):
''' '''
数据同步服务,需要同步数据的服务,可以实现该接口 数据同步服务,需要同步数据的服务,可以实现该接口
...@@ -99,7 +107,6 @@ class Cleanable(ABC): ...@@ -99,7 +107,6 @@ class Cleanable(ABC):
pass pass
# TODO: 动态更新资料,控制更新基金池
class Datum(ABC): class Datum(ABC):
''' '''
基础资料服务,基金资料数据,各种指数,指标资料数据 基础资料服务,基金资料数据,各种指数,指标资料数据
...@@ -297,14 +304,6 @@ class PortfoliosBuilder(ABC): ...@@ -297,14 +304,6 @@ class PortfoliosBuilder(ABC):
''' '''
pass pass
@abstractmethod
def get_all_portfolios(self, risk: PortfoliosRisk = None):
"""
查询所有优选基金
@param risk:
"""
pass
class Solver(ABC): class Solver(ABC):
''' '''
...@@ -446,11 +445,12 @@ class PortfoliosHolder(ABC): ...@@ -446,11 +445,12 @@ class PortfoliosHolder(ABC):
pass pass
@abstractmethod @abstractmethod
def build_hold_portfolio(self, day, risk: PortfoliosRisk): def build_hold_portfolio(self, day, risk: PortfoliosRisk, force_mpt=False):
''' '''
构建指定日期,指定风险等级的持仓投组,以day为截止日期,会持续补满 构建指定日期,指定风险等级的持仓投组,以day为截止日期,会持续补满
:param day: 指定日期 :param day: 指定日期
:param risk: 指定风险等级 :param risk: 指定风险等级
:param force_mpt: 如果为True,则强制计算当天mpt,否则不强制计算
:return: :return:
''' '''
pass pass
...@@ -465,6 +465,15 @@ class PortfoliosHolder(ABC): ...@@ -465,6 +465,15 @@ class PortfoliosHolder(ABC):
''' '''
pass pass
@abstractmethod
def get_rebalance_date_by_signal(self, signal_id):
'''
获取指定调仓信号触发的实际调仓日期
:param signal_id: 指定的调仓信号
:return: 实际调仓日期
'''
pass
@property @property
@abstractmethod @abstractmethod
def interval_days(self): def interval_days(self):
...@@ -578,7 +587,6 @@ class RoboExportor(ABC): ...@@ -578,7 +587,6 @@ class RoboExportor(ABC):
pass pass
class DataLogger(ABC): class DataLogger(ABC):
@abstractmethod @abstractmethod
...@@ -604,3 +612,46 @@ class DataLogger(ABC): ...@@ -604,3 +612,46 @@ class DataLogger(ABC):
:return: 日志数据列表 :return: 日志数据列表
''' '''
pass pass
class RebalanceSignal(ABC):
'''
控制信号,发起是否调仓服务
'''
@abstractmethod
def get_signal(self, day, risk: PortfoliosRisk):
'''
根据日期和风险等级,返回当天的调仓信号,如果没有则返回None
:param day: 指定的日期,净值日
:param risk: 指定的风险等级
:return: 如果有信号,则返回信号数据,否则返回None
'''
pass
@property
@abstractmethod
def signal_type(self) -> SignalType:
'''
返回信号类型
:return: 信号类型
'''
pass
@abstractmethod
def get_last_signal(self, day, risk: PortfoliosRisk):
'''
根据日期和风险等级,返回最近的调仓信号,如果没有则返回None
:param day: 指定的日期,净值日
:param risk: 指定的风险等级
:return: 如果有信号,则返回信号数据,否则返回None
'''
@abstractmethod
def clear(self, min_date=None, risk: PortfoliosRisk = None):
'''
清理指定的数据
:param min_date: 指定的起始时间
:param risk: 指定的风险等级
'''
pass
...@@ -146,9 +146,13 @@ class FundDividendSortinoAssetOptimize(SortinoAssetOptimize): ...@@ -146,9 +146,13 @@ class FundDividendSortinoAssetOptimize(SortinoAssetOptimize):
def get_groups(self): def get_groups(self):
funds = pd.DataFrame(self.get_filtered_funds()) funds = pd.DataFrame(self.get_filtered_funds())
result = [] result = []
include = list(self.asset_include.keys())[0] if self.asset_include:
for key, fund_group in funds.groupby(by=include): include = list(self.asset_include.keys())[0]
if key in self.asset_include[include]: for key, fund_group in funds.groupby(by=include):
if key in self.asset_include[include]:
result.append(tuple(fund_group['id']))
else:
for (category, asset_type), fund_group in funds.groupby(by=['category', 'assetType']):
result.append(tuple(fund_group['id'])) result.append(tuple(fund_group['id']))
return result return result
......
from py_jftech import read, write, where, mapper_columns, format_date from py_jftech import read, write, where, mapper_columns, format_date,to_tuple
__COLUMNS__ = { __COLUMNS__ = {
'red_eco_id': 'eco_id', 'red_eco_id': 'eco_id',
...@@ -24,7 +24,7 @@ def get_list(eco_ids=None, min_date=None, max_date=None): ...@@ -24,7 +24,7 @@ def get_list(eco_ids=None, min_date=None, max_date=None):
sqls.append(f"red_date <= '{format_date(max_date)}'") sqls.append(f"red_date <= '{format_date(max_date)}'")
return f''' return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_eco_datas select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_eco_datas
{where(*sqls, red_eco_id=to_tuple(index_ids))} order by red_eco_id, red_date {where(*sqls, red_eco_id=to_tuple(eco_ids))} order by red_eco_id, red_date
''' '''
......
from datetime import datetime as dt
from typing import List
import pandas as pd import pandas as pd
from py_jftech import get_config, component, autowired, to_tuple from py_jftech import get_config, component, autowired, to_tuple
...@@ -20,19 +17,20 @@ class DefaultNavs(Navs): ...@@ -20,19 +17,20 @@ class DefaultNavs(Navs):
navs = rfn.get_navs(fund_id=fund_ids, min_date=min_date, max_date=max_date) navs = rfn.get_navs(fund_id=fund_ids, min_date=min_date, max_date=max_date)
if navs and 'exrate' in self._config: if navs and 'exrate' in self._config:
navs = pd.DataFrame(navs) navs = pd.DataFrame(navs)
navs = navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
for exrate_config in self._config['exrate']: for exrate_config in self._config['exrate']:
exrate = pd.DataFrame(re.get_exrates(ticker=exrate_config['ticker'], min_date=navs.index.min(), exrate = pd.DataFrame(re.get_exrates(ticker=exrate_config['ticker'], min_date=min_date,
max_date=navs.index.max())) max_date=max_date))
exrate = exrate[['date', 'close']] exrate.rename(columns={'date': 'nav_date'}, inplace=True)
exrate.set_index('date', inplace=True) exrate = exrate[['nav_date', 'close']]
for fund in self._datum.get_datums(type=DatumType.FUND, crncy=exrate_config['from']): fund_ids = [x['id'] for x in self._datum.get_datums(type=DatumType.FUND, crncy=exrate_config['from'])]
if fund['id'] in navs.columns: merged = pd.merge(navs, exrate, on='nav_date', how='left')
navs[fund['id']] = round(navs[fund['id']] * exrate['close'], 4) merged.loc[merged['fund_id'].isin(fund_ids), 'nav_cal'] *= merged['close']
navs = navs.reset_index().melt(id_vars='nav_date', value_name='nav_cal') merged.loc[merged['fund_id'].isin(fund_ids), 'av'] *= merged['close']
navs.dropna(inplace=True) merged.loc[merged['fund_id'].isin(fund_ids), 'dividend'] *= merged['close']
navs = navs[['fund_id', 'nav_date', 'nav_cal']] merged['nav_cal'] = merged['nav_cal'].round(4) # 四舍五入保留四位小数
navs.sort_values(by=['fund_id', 'nav_date'], inplace=True) merged['av'] = merged['av'].round(4) # 四舍五入保留四位小数
merged['dividend'] = merged['dividend'].round(4) # 四舍五入保留四位小数
navs = merged.drop('close', axis=1)
navs = navs.to_dict('records') navs = navs.to_dict('records')
return navs return navs
...@@ -72,7 +70,7 @@ class DefaultNavs(Navs): ...@@ -72,7 +70,7 @@ class DefaultNavs(Navs):
def get_eco_values(self, datum_ids=None, min_date=None, max_date=None, ticker=None, by_release_date=False): def get_eco_values(self, datum_ids=None, min_date=None, max_date=None, ticker=None, by_release_date=False):
datum_ids = to_tuple(datum_ids) datum_ids = to_tuple(datum_ids)
if ticker: if ticker:
datums = self._datum.get_datums(type=DatumType.INDEX, ticker=ticker) datums = self._datum.get_datums(type=DatumType.ECO, ticker=ticker)
datum_ids = tuple(set(list(datum_ids or []) | {x['id'] for x in datums})) datum_ids = tuple(set(list(datum_ids or []) | {x['id'] for x in datums}))
return red.get_list(eco_ids=datum_ids, min_date=min_date, max_date=max_date, by_release_date=by_release_date) return red.get_list(eco_ids=datum_ids, min_date=min_date, max_date=max_date, by_release_date=by_release_date)
...@@ -87,4 +85,3 @@ class DefaultNavs(Navs): ...@@ -87,4 +85,3 @@ class DefaultNavs(Navs):
return red.get_last_one(eco_id=datum_id, max_date=max_date, by_release_date=by_release_date) return red.get_last_one(eco_id=datum_id, max_date=max_date, by_release_date=by_release_date)
else: else:
return red.get_last(eco_id=datum_id, max_date=max_date, count=count, by_release_date=by_release_date) return red.get_last(eco_id=datum_id, max_date=max_date, count=count, by_release_date=by_release_date)
import base64
import hashlib
import logging import logging
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import datetime as dt from datetime import datetime as dt, timedelta
from dateutil.relativedelta import relativedelta
from typing import List from typing import List
from urllib.parse import quote from urllib.parse import quote
import pytz
import requests import requests
from dateutil.relativedelta import relativedelta
from py_jftech import format_date, is_workday, component, autowired, get_config, filter_weekend, next_workday from py_jftech import format_date, is_workday, component, autowired, get_config, filter_weekend, next_workday
from api import DatumType, DataSync, Datum from api import DatumType, DataSync, Datum
...@@ -51,7 +54,7 @@ class JDCDataSync(DataSync, ABC): ...@@ -51,7 +54,7 @@ class JDCDataSync(DataSync, ABC):
while True: while True:
url = self.build_urls(datum=datum, page=page, start_date=start_date) url = self.build_urls(datum=datum, page=page, start_date=start_date)
if url is None: if url is None:
break raise Exception(f'''request data {datum['id']} not exist!''')
response = requests.get(url).json() response = requests.get(url).json()
if not response['success']: if not response['success']:
raise Exception(f'''request indictor failed: {response['status']}''') raise Exception(f'''request indictor failed: {response['status']}''')
...@@ -66,6 +69,98 @@ class JDCDataSync(DataSync, ABC): ...@@ -66,6 +69,98 @@ class JDCDataSync(DataSync, ABC):
page += 1 page += 1
class TWDataSync(DataSync, ABC):
@autowired
def __init__(self, datum: Datum = None):
self._datum = datum
self._config = get_config(__name__)
def do_sync(self, max_date=dt.today()):
logger.info(f'start sync datas for type[{self.datum_type}]')
response = self.get_all_data(self.start_date)
for datum in self._datum.get_datums(type=self.datum_type):
logger.debug(f'start sync ticker[{datum["ftTicker"]}]')
try:
self.store_date(datum['id'], datum["ftTicker"], response)
except Exception as e:
logger.exception(f'''{datum['id']} store data failed''')
raise e
@property
def start_date(self):
return filter_weekend(dt.today() - timedelta(days=5))
@abstractmethod
def last_datum(self, datum_id):
pass
@abstractmethod
def get_all_data(self, start_date=dt.today()):
pass
@property
@abstractmethod
def datum_type(self) -> DatumType:
pass
@abstractmethod
def store_date(self, datumid, ft_ticker, datas: List[dict]):
pass
@component(bean_name='navs-sync')
class TWFundNavSync(TWDataSync):
def get_all_data(self, start_date=dt.today()):
authori = 'chifufund' + dt.today().strftime('%Y%m%d') + 'FTDMNAVDATE'
authdest = base64.b64encode(hashlib.sha256(authori.encode()).digest()).decode()
req = requests.session()
req.headers['user'] = 'chifufund'
req.headers['Authorization'] = 'Basic ' + authdest
for i in range(30):
try:
resp = req.get('http://210.202.243.106:1688/api/public/NAV?parameter.modifyDate=' + start_date.strftime(
'%Y-%m-%d'), timeout=30)
return resp.json()
except Exception as e:
logger.error(str(e))
def last_datum(self, datum_id):
last = rfn.get_last_one(fund_id=datum_id)
return last
@property
def datum_type(self) -> DatumType:
return DatumType.FUND
def store_date(self, datumid, ft_ticker, datas: List[dict]):
last = self.last_datum(datum_id=datumid)
start_date = next_workday(last['nav_date']) if last else self.start_date
save_navs = []
last_av = last['av']
last_nav_cal = last['nav_cal']
for data in datas:
if dt.strptime(data['Nav_Date'], "%Y-%m-%dT%H:%M:%S") >= start_date and data['Fund_Id'] == ft_ticker:
nav = {
'fund_id': datumid,
'nav_date': dt.strptime(data['Nav_Date'], "%Y-%m-%dT%H:%M:%S"),
'av': data['Nav_P'],
'div': data['Nav_T_Div'],
'split': data['Nav_Spilt'],
'accrue_split': data['Nav_Spilt'],
'av_p': data['Nav_P'],
'div_p': data['Nav_T_Div'],
# 当日/上日
'nav_cal': round(((data['Nav_P'] + data['Nav_T_Div']) * data['Nav_Unit']) / (
last_av * data['Nav_Unit']) * last_nav_cal, 4)
}
last_av = nav['av']
last_nav_cal = nav['nav_cal']
save_navs.append(nav)
if save_navs:
rfn.batch_insert(save_navs)
@component(bean_name='index-sync') @component(bean_name='index-sync')
class IndexSync(JDCDataSync): class IndexSync(JDCDataSync):
...@@ -87,7 +182,7 @@ class IndexSync(JDCDataSync): ...@@ -87,7 +182,7 @@ class IndexSync(JDCDataSync):
def store_date(self, datumid, datas: List[dict]): def store_date(self, datumid, datas: List[dict]):
save_datas = [{ save_datas = [{
'index_id': datumid, 'index_id': datumid,
'date': dt.fromtimestamp(x['date'] / 1000).strftime('%Y-%m-%d'), 'date': dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d'),
'close': x['close'], 'close': x['close'],
'open': x['open'] if 'open' in x else None, 'open': x['open'] if 'open' in x else None,
'high': x['high'] if 'high' in x else None, 'high': x['high'] if 'high' in x else None,
...@@ -95,7 +190,7 @@ class IndexSync(JDCDataSync): ...@@ -95,7 +190,7 @@ class IndexSync(JDCDataSync):
'pe': x['peRatio'] if 'peRatio' in x else None, 'pe': x['peRatio'] if 'peRatio' in x else None,
'pb': x['pbRatio'] if 'pbRatio' in x else None, 'pb': x['pbRatio'] if 'pbRatio' in x else None,
'volume': x['volume'] if 'volume' in x else None, 'volume': x['volume'] if 'volume' in x else None,
} for x in datas if is_workday(dt.fromtimestamp(x['date'] / 1000)) and 'close' in x] } for x in datas if is_workday(dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai'))) and 'close' in x]
if save_datas: if save_datas:
rid.batch_insert(save_datas) rid.batch_insert(save_datas)
...@@ -121,9 +216,9 @@ class EcoSync(JDCDataSync): ...@@ -121,9 +216,9 @@ class EcoSync(JDCDataSync):
def store_date(self, datumid, datas: List[dict]): def store_date(self, datumid, datas: List[dict]):
save_datas = [{ save_datas = [{
'eco_id': datumid, 'eco_id': datumid,
'date': dt.fromtimestamp(x['date'] / 1000), 'date': dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai')),
'indicator': x['close'], 'indicator': x['close'],
'release_date': dt.fromtimestamp(x['releaseDate'] / 1000), 'release_date': dt.fromtimestamp(x['releaseDate'] / 1000, tz=pytz.timezone('Asia/Shanghai')),
} for x in datas if 'releaseDate' in x] } for x in datas if 'releaseDate' in x]
if save_datas: if save_datas:
red.batch_insert(save_datas) red.batch_insert(save_datas)
...@@ -151,18 +246,19 @@ class FundNavSync(JDCDataSync): ...@@ -151,18 +246,19 @@ class FundNavSync(JDCDataSync):
return f'http://jdcprod.thiztech.com/api/datas/asset-value?subjectKeys={key}&page={page}&size=200&sourceType=TW&startDate={format_date(start_date)}' return f'http://jdcprod.thiztech.com/api/datas/asset-value?subjectKeys={key}&page={page}&size=200&sourceType=TW&startDate={format_date(start_date)}'
def find_jdc_subject_key(self): def find_jdc_subject_key(self):
funds = self._datum.get_datums(type=DatumType.FUND) funds = {x['isin']: x for x in self._datum.get_datums(type=DatumType.FUND)}
funds = {x['ftTicker']: x for x in self._datum.get_datums(type=DatumType.FUND)}
response = requests.get('http://jdcprod.thiztech.com/api/subject?busiField=TW&sourceType=TW&subjectType=FUND') response = requests.get('http://jdcprod.thiztech.com/api/subject?busiField=TW&sourceType=TW&subjectType=FUND')
response = response.json() response = response.json()
if not response['success']: if not response['success']:
raise CollectError(f'''find fund subject failed: {response['status']}''') raise CollectError(f'''find fund subject failed: {response['status']}''')
return {funds[x['fundId']]['id']: x['key'] for x in response['body']['content'] if x['fundId'] in funds} content = response['body']['content']
content = [x for x in content if x.get('isin')]
return {funds[x['isin']]['id']: x['key'] for x in content if x['isin'] in funds}
def store_date(self, datumid, datas: List[dict]): def store_date(self, datumid, datas: List[dict]):
save_navs = [{ save_navs = [{
'fund_id': datumid, 'fund_id': datumid,
'nav_date': dt.fromtimestamp(x['date'] / 1000), 'nav_date': dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai')),
'av': x['originValue'], 'av': x['originValue'],
'div': x['dividend'] if 'dividend' in x else 0, 'div': x['dividend'] if 'dividend' in x else 0,
'split': x['split'] if 'split' in x else 1, 'split': x['split'] if 'split' in x else 1,
...@@ -170,7 +266,7 @@ class FundNavSync(JDCDataSync): ...@@ -170,7 +266,7 @@ class FundNavSync(JDCDataSync):
'av_p': x['postValue'], 'av_p': x['postValue'],
'div_p': x['postDividend'] if 'postDividend' in x else 0, 'div_p': x['postDividend'] if 'postDividend' in x else 0,
'nav_cal': x['calibrateValue'] 'nav_cal': x['calibrateValue']
} for x in datas if is_workday(dt.fromtimestamp(x['date'] / 1000))] } for x in datas if is_workday(dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai')))]
if save_navs: if save_navs:
rfn.batch_insert(save_navs) rfn.batch_insert(save_navs)
...@@ -207,7 +303,7 @@ class ExrateSync(DataSync): ...@@ -207,7 +303,7 @@ class ExrateSync(DataSync):
try: try:
save_dates = [{ save_dates = [{
'ticker': ticker, 'ticker': ticker,
'date': dt.fromtimestamp(x['date'] / 1000), 'date': dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai')),
'close': x['close'], 'close': x['close'],
} for x in response['body']['content']] } for x in response['body']['content']]
if save_dates: if save_dates:
......
...@@ -42,6 +42,7 @@ py-jftech: ...@@ -42,6 +42,7 @@ py-jftech:
hold-report: portfolios.holder.DivHoldReportor hold-report: portfolios.holder.DivHoldReportor
mpt: portfolios.builder.PoemPortfoliosBuilder mpt: portfolios.builder.PoemPortfoliosBuilder
dividend-holder: portfolios.holder.DividendPortfoliosHolder dividend-holder: portfolios.holder.DividendPortfoliosHolder
navs-sync: basic.sync.FundNavSync
email: email:
server: smtphz.qiye.163.com server: smtphz.qiye.163.com
user: jft-ra@thizgroup.com user: jft-ra@thizgroup.com
......
...@@ -42,6 +42,7 @@ py-jftech: ...@@ -42,6 +42,7 @@ py-jftech:
hold-report: portfolios.holder.DivHoldReportor hold-report: portfolios.holder.DivHoldReportor
mpt: portfolios.builder.PoemPortfoliosBuilder mpt: portfolios.builder.PoemPortfoliosBuilder
dividend-holder: portfolios.holder.DividendPortfoliosHolder dividend-holder: portfolios.holder.DividendPortfoliosHolder
navs-sync: basic.sync.FundNavSync
email: email:
server: smtphz.qiye.163.com server: smtphz.qiye.163.com
user: jft-ra@thizgroup.com user: jft-ra@thizgroup.com
...@@ -234,8 +235,8 @@ robo-executor: # 执行器相关 ...@@ -234,8 +235,8 @@ robo-executor: # 执行器相关
use: ${ROBO_EXECUTOR:backtest} # 执行哪个执行器,优先取系统环境变量ROBO_EXECUTOR的值,默认backtest use: ${ROBO_EXECUTOR:backtest} # 执行哪个执行器,优先取系统环境变量ROBO_EXECUTOR的值,默认backtest
sync-data: ${SYNC_DATA:off} # 是否开启同步资料数据 sync-data: ${SYNC_DATA:off} # 是否开启同步资料数据
backtest: # 回测执行器相关 backtest: # 回测执行器相关
start-date: 2012-10-16 # 回测起始日期 start-date: 2022-10-25 # 回测起始日期
end-date: 2023-03-01 # 回测截止日期 end-date: 2023-06-01 # 回测截止日期
sealing-period: 10 #调仓封闭期 sealing-period: 10 #调仓封闭期
start-step: ${BACKTEST_START_STEP:1} # 回测从哪一步开始执行 1:计算资产池;2:计算最优投组:3:计算再平衡信号以及持仓投组 start-step: ${BACKTEST_START_STEP:1} # 回测从哪一步开始执行 1:计算资产池;2:计算最优投组:3:计算再平衡信号以及持仓投组
end-step: ${BACKTEST_END_STEP:3} # 回测从哪一步执行完成后结束执行 1:计算资产池;2:计算最优投组:3:计算再平衡信号以及持仓投组 end-step: ${BACKTEST_END_STEP:3} # 回测从哪一步执行完成后结束执行 1:计算资产池;2:计算最优投组:3:计算再平衡信号以及持仓投组
......
...@@ -40,8 +40,9 @@ py-jftech: ...@@ -40,8 +40,9 @@ py-jftech:
backtest: robo_executor.BacktestExecutor backtest: robo_executor.BacktestExecutor
datum: basic.datum.DefaultDatum datum: basic.datum.DefaultDatum
hold-report: portfolios.holder.DivHoldReportor hold-report: portfolios.holder.DivHoldReportor
mpt: portfolios.builder.PoemPortfoliosBuilder mpt: portfolios.builder.PoemARCPortfoliosBuilder
dividend-holder: portfolios.holder.InvTrustPortfoliosHolder dividend-holder: portfolios.holder.InvTrustPortfoliosHolder
navs-sync: basic.sync.FundNavSync
email: email:
server: smtphz.qiye.163.com server: smtphz.qiye.163.com
user: jft-ra@thizgroup.com user: jft-ra@thizgroup.com
...@@ -50,21 +51,21 @@ py-jftech: ...@@ -50,21 +51,21 @@ py-jftech:
max-workers: ${MAX_PROCESS:4} max-workers: ${MAX_PROCESS:4}
basic: # 基础信息模块 basic: # 基础信息模块
sync: sync:
start-date: 2007-01-01 # 同步数据开始日期 start-date: 1990-01-01 # 同步数据开始日期
datum: # 资料模块 datum: # 资料模块
change: change:
date: ${DATUM_CHANGE_DATE} date: ${DATUM_CHANGE_DATE}
file: ${DATUM_CHANGE_FILE} file: ${DATUM_CHANGE_FILE}
excludes: # 排除的资料彭博ticker excludes: # 排除的资料彭博ticker
backtest: # backtest:
- 'TEMUSGI LX Equity' # - 'TEMUSGI LX Equity'
real: real:
- 'FGFSACU LX Equity' - 'FGFSACU LX Equity'
- 'TEMUSGI LX Equity' - 'TEMUSGI LX Equity'
# navs: # 净值模块 navs: # 净值模块
# exrate: # 汇率,如果不开启,整个这块注释掉 exrate: # 汇率,如果不开启,整个这块注释掉
# - from: EUR # 需要转换的货币类型 - from: EUR # 需要转换的货币类型
# ticker: EURUSD BGN Curncy # 汇率值的彭博ticker ticker: EURUSD BGN Curncy # 汇率值的彭博ticker
asset-pool: # 资产池模块 asset-pool: # 资产池模块
asset-optimize: # 资产优选模块 asset-optimize: # 资产优选模块
sortino-weight: # sortino计算需要的权重,下面每一条为一次计算,e.g. months: 3, weight: 0.5 表示 3个月数据使用权重0.5来计算分值 sortino-weight: # sortino计算需要的权重,下面每一条为一次计算,e.g. months: 3, weight: 0.5 表示 3个月数据使用权重0.5来计算分值
...@@ -74,7 +75,7 @@ asset-pool: # 资产池模块 ...@@ -74,7 +75,7 @@ asset-pool: # 资产池模块
weight: 0.3 weight: 0.3
- years: 1 - years: 1
weight: 0.2 weight: 0.2
asset-include: {'category':['US_STOCK','US_IG_BOND','US_HY_BOND']} asset-include: {'customType':[1,2,3,4]}
optimize-count: 3 #基金优选个数 optimize-count: 3 #基金优选个数
portfolios: # 投组模块 portfolios: # 投组模块
holder: # 持仓投组相关 holder: # 持仓投组相关
...@@ -82,9 +83,14 @@ portfolios: # 投组模块 ...@@ -82,9 +83,14 @@ portfolios: # 投组模块
min-interval-days: 10 # 两次实际调仓最小间隔期,单位交易日 min-interval-days: 10 # 两次实际调仓最小间隔期,单位交易日
dividend-rate: 0.09 #设定年化配息率 dividend-rate: 0.09 #设定年化配息率
dividend-date: 15 #配息日,每月15号 dividend-date: 15 #配息日,每月15号
dividend-adjust-day: [1,4,7,10] #每年的首个季度调整配息
warehouse-frequency: 1 #每隔1个月调一次仓 warehouse-frequency: 1 #每隔1个月调一次仓
redeem-list: [ 'TEUSAAU LX Equity', 'LIGTRAA ID Equity', 'TEMFHAC LX Equity', 'LUSHUAA ID Equity' ] #从持仓中的低风险资产“直接”按序赎回 redeem-list: [ 'TEUSAAU LX Equity', 'LIGTRAA ID Equity', 'TEMFHAC LX Equity', 'LUSHUAA ID Equity' ] #从持仓中的低风险资产“直接”按序赎回
solver: # 解算器相关 solver: # 解算器相关
model: arc # 结算模型 ARC ,PRR, ~ 标准解算器
arc: on #是否开启ARC
brr: 0.01 #误差补偿值
trr: 3
tol: 1E-10 # 误差满足条件 tol: 1E-10 # 误差满足条件
navs: # 净值要求 navs: # 净值要求
range: # 需要净值数据的区间, days: 90 表示90自然日,months: 3 表示3个自然月 range: # 需要净值数据的区间, days: 90 表示90自然日,months: 3 表示3个自然月
...@@ -92,21 +98,16 @@ portfolios: # 投组模块 ...@@ -92,21 +98,16 @@ portfolios: # 投组模块
max-nan: # 最大缺失净值条件 max-nan: # 最大缺失净值条件
asset: 8 # 单一资产最多缺少多少交易日数据,则踢出资产池 asset: 8 # 单一资产最多缺少多少交易日数据,则踢出资产池
day: 0.5 # 单一交易日最多缺少百分之多少净值,则删除该交易日 day: 0.5 # 单一交易日最多缺少百分之多少净值,则删除该交易日
normal-ratio: #US_STOCK:US_HY_BOND:US_IG_BOND三者分别对应低中高风险所占比率 risk: [] # 资产风险等级要求,可分开写也可以合并写,e.g. risk:[ 2, 3 ] 则表示 所有投组资产风险等级都是 2 或 3
US_STOCK: [ 0.5, 0.5, 0.7 ] LARC: [0.5, 0.1, 0.1, 0.1] #低阈值
US_HY_BOND: [ 0.4, 0.4, 0.2 ] UARC: [0.7, 0.25, 0.25, 0.25] #高阈值
US_IG_BOND: [ 0.1, 0.1, 0.1 ]
riskctl-ratio:
US_STOCK: [ 0.2, 0.4, 0.6 ]
US_HY_BOND: [ 0.5, 0.3, 0.1 ]
US_IG_BOND: [ 0.3, 0.3, 0.3 ]
matrix-rtn-days: 20 # 计算回报率矩阵时,回报率滚动天数 matrix-rtn-days: 20 # 计算回报率矩阵时,回报率滚动天数
asset-count: [3,3] # 投组资产个数。e.g. count 或 [min, max] 分别表示 最大最小都为count 或 最小为min 最大为max,另外这里也可以类似上面给不同风险等级分别配置 asset-count: [5,5] # 投组资产个数。e.g. count 或 [min, max] 分别表示 最大最小都为count 或 最小为min 最大为max,另外这里也可以类似上面给不同风险等级分别配置
mpt: # mpt计算相关 mpt: # mpt计算相关
cvar-beta: 0.2 # 计算Kbeta 需要用到 cvar-beta: 0.2 # 计算Kbeta 需要用到
quantile: 0.9 # 分位点,也可以给不同风险等级分别配置 quantile: 0.9 # 分位点,也可以给不同风险等级分别配置
low-weight: 0.05 # 最低权重 low-weight: 0.05 # 最低权重
# high-weight: [ 1 ] # 最高权重比例,可给一个值,也可以给多个值,当多个值时,第一个表示只有一个资产时权重,第二个表示只有两个资产时权重,以此类推,最后一个表示其他资产个数时的权重 high-weight: [ 0.35 ] # 最高权重比例,可给一个值,也可以给多个值,当多个值时,第一个表示只有一个资产时权重,第二个表示只有两个资产时权重,以此类推,最后一个表示其他资产个数时的权重
poem: # poem相关 poem: # poem相关
cvar-scale-factor: 0.1 # 计算时用到的系数 cvar-scale-factor: 0.1 # 计算时用到的系数
reports: # 报告模块相关 reports: # 报告模块相关
...@@ -230,18 +231,18 @@ reports: # 报告模块相关 ...@@ -230,18 +231,18 @@ reports: # 报告模块相关
subject: "SVROBO6-实盘版-每日监测_{today}" subject: "SVROBO6-实盘版-每日监测_{today}"
content: "Dear All: 附件是今天生成的监测数据,請驗收,謝謝! 注>:該郵件為自動發送,如有問題請聯繫矽谷團隊 telan_qian@chifufund.com" content: "Dear All: 附件是今天生成的监测数据,請驗收,謝謝! 注>:該郵件為自動發送,如有問題請聯繫矽谷團隊 telan_qian@chifufund.com"
robo-executor: # 执行器相关 robo-executor: # 执行器相关
use: ${ROBO_EXECUTOR:real} # 执行哪个执行器,优先取系统环境变量ROBO_EXECUTOR的值,默认backtest use: ${ROBO_EXECUTOR:backtest} # 执行哪个执行器,优先取系统环境变量ROBO_EXECUTOR的值,默认backtest
sync-data: ${SYNC_DATA:on} # 是否开启同步资料数据 sync-data: ${SYNC_DATA:off} # 是否开启同步资料数据
backtest: # 回测执行器相关 backtest: # 回测执行器相关
start-date: 2022-09-30 # 回测起始日期 start-date: 2022-02-16 # 回测起始日期
end-date: 2023-03-01 # 回测截止日期 end-date: 2023-01-03 # 回测截止日期
sealing-period: 10 #调仓封闭期 sealing-period: 10 #调仓封闭期
start-step: ${BACKTEST_START_STEP:3} # 回测从哪一步开始执行 1:计算资产池;2:计算最优投组:3:计算再平衡信号以及持仓投组 start-step: ${BACKTEST_START_STEP:3} # 回测从哪一步开始执行 1:计算资产池;2:计算最优投组:3:计算再平衡信号以及持仓投组
end-step: ${BACKTEST_END_STEP:3} # 回测从哪一步执行完成后结束执行 1:计算资产池;2:计算最优投组:3:计算再平衡信号以及持仓投组 end-step: ${BACKTEST_END_STEP:3} # 回测从哪一步执行完成后结束执行 1:计算资产池;2:计算最优投组:3:计算再平衡信号以及持仓投组
clean-up: true clean-up: on
real: # 实盘执行器 real: # 实盘执行器
export: ${EXPORT_ENABLE:on} # 是否开启报告 export: ${EXPORT_ENABLE:off} # 是否开启报告
start-date: 2023-01-01 # 实盘开始时间 start-date: 2023-05-08 # 实盘开始时间
include-date: [] include-date: []
......
import json import json
import logging import logging
from datetime import datetime as dt, timedelta
from typing import List
import pandas as pd from py_jftech import component, autowired, format_date
from py_jftech import component, autowired, format_date, prev_workday, is_workday
from pymysql import IntegrityError, constants from pymysql import IntegrityError, constants
from api import PortfoliosBuilder, PortfoliosRisk, AssetPool, Navs, PortfoliosType, Datum, SolveType, SolverFactory, \ from api import PortfoliosBuilder, PortfoliosRisk, AssetPool, Navs, PortfoliosType, Datum, SolveType, SolverFactory
RoboReportor, DatumType
from portfolios.dao import robo_mpt_portfolios as rmp from portfolios.dao import robo_mpt_portfolios as rmp
from portfolios.dao.robo_mpt_portfolios import get_list
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -87,11 +82,8 @@ class MptPortfoliosBuilder(PortfoliosBuilder): ...@@ -87,11 +82,8 @@ class MptPortfoliosBuilder(PortfoliosBuilder):
def clear(self, day=None, risk: PortfoliosRisk = None): def clear(self, day=None, risk: PortfoliosRisk = None):
rmp.delete(min_date=day, risk=risk) rmp.delete(min_date=day, risk=risk)
def get_all_portfolios(self, risk: PortfoliosRisk = None):
return get_list(risk=risk)
@component(bean_name='mpt')
@component(bean_name='poem')
class PoemPortfoliosBuilder(MptPortfoliosBuilder): class PoemPortfoliosBuilder(MptPortfoliosBuilder):
def build_portfolio(self, day, type: PortfoliosType): def build_portfolio(self, day, type: PortfoliosType):
...@@ -118,104 +110,88 @@ class PoemPortfoliosBuilder(MptPortfoliosBuilder): ...@@ -118,104 +110,88 @@ class PoemPortfoliosBuilder(MptPortfoliosBuilder):
return result return result
@component(bean_name='signal-report') @component(bean_name='mpt')
class SignalReportor(RoboReportor): class MptARCPortfoliosBuilder(MptPortfoliosBuilder):
@autowired
def __init__(self, datum: Datum = None):
self._datum = datum
@property
def report_name(self) -> str:
return '调仓信号'
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
result = []
datums = {str(x['id']): x for x in self._datum.get_datums(type=DatumType.FUND, exclude=False)}
for signal in rmp.get_list(max_date=max_date, min_date=min_date):
for fund_id, weight in json.loads(signal['portfolio']).items():
result.append({
'risk': PortfoliosRisk(signal['risk']).name,
'rebalance_date': signal['date'],
'portfolio_type': PortfoliosType.NORMAL.name,
'ft_ticker': datums[fund_id]['ftTicker'],
'blooberg_ticker': datums[fund_id]['bloombergTicker'],
'fund_name': datums[fund_id]['chineseName'],
'weight': weight
})
return result
@component(bean_name='daily-hold-report')
class DailyHoldReportor(RoboReportor):
@autowired def get_portfolios(self, day, risk: PortfoliosRisk, type: PortfoliosType = PortfoliosType.NORMAL):
def __init__(self, datum: Datum = None): try:
self._datum = datum portfolio = rmp.get_one(day, type, risk)
if not portfolio:
result, detail = self.build_portfolio(day, type)
for build_risk, datas in result.items():
try:
rmp.insert({
**datas,
'risk': build_risk,
'type': type,
'date': day
})
except IntegrityError as e:
code, msg = e.args
if code != constants.ER.DUP_ENTRY:
raise e
portfolio = rmp.get_one(day, type, risk)
if SolveType(portfolio['solve']) is not SolveType.INFEASIBLE:
result = json.loads(portfolio['portfolio'])
return {int(x[0]): x[1] for x in result.items()}
return None
except Exception as e:
logger.exception(f"build protfolio of type[{type.name}] and risk[{risk.name}] with date[{format_date(day)}] failure.", exc_info=e)
raise e
@property def build_portfolio(self, day, type: PortfoliosType):
def report_name(self) -> str:
return '每日持仓信息'
def load_report(self, max_date=prev_workday(dt.today()), min_date=None) -> List[dict]:
# 月初调仓,实际相当于调仓信号在上月月末
first_day = max_date.replace(day=1)
prev_month = first_day - timedelta(days=1)
prev_month.replace(day=prev_month.day)
prev_month = prev_month if is_workday(prev_month) else prev_workday(prev_month)
portfolio = rmp.get_one(prev_month, type=PortfoliosType.NORMAL, risk=PortfoliosRisk.FT3)
result = {} result = {}
if portfolio: detail = {}
datum_ids = list(json.loads(portfolio['portfolio']).keys()) risk = PortfoliosRisk.FT3
datums = pd.DataFrame(self._datum.get_datums(type=DatumType.FUND, datum_ids=datum_ids)) logger.info(
datums.set_index('id', inplace=True) f"start to build protfolio of type[{type.name}] and risk[{risk.name}] with date[{format_date(day)}]")
solver = self._factory.create_solver(risk, type)
result['risk'] = [portfolio['risk'] for i in datum_ids] solver.reset_navs(day)
result['rebalance_type'] = [portfolio['type'] for i in datum_ids] logger.debug({
'Khist': len(solver.rtn_history),
result['weight'] = [format(i, '.0%') for i in json.loads(portfolio['portfolio']).values()] 'beta': solver.get_config('mpt.cvar-beta'),
result['asset_ids'] = [datums.loc[int(i)]['ftTicker'] for i in datum_ids] 'Kbeta': solver.k_beta,
result['name'] = [datums.loc[int(i)]['chineseName'] for i in datum_ids] })
result['lipper_id'] = [datums.loc[int(i)]['lipperKey'] for i in datum_ids] max_rtn, max_var, minCVaR_whenMaxR = solver.solve_max_rtn()
result['date'] = [max_date for i in datum_ids] min_rtn, min_var, maxCVaR_whenMinV = solver.solve_min_rtn()
result['rebalance_date'] = [portfolio['date'] for i in datum_ids] portfolio, cvar = solver.solve_mpt(min_rtn, max_rtn)
result = pd.DataFrame(result) result[risk] = {
result = result[ 'solve': SolveType.MPT,
['lipper_id', 'asset_ids', 'name', 'weight', 'risk', 'date', 'rebalance_type', 'rebalance_date']] 'portfolio': json.dumps(portfolio),
return result.to_dict('records') 'cvar': cvar
} if portfolio else {
return [] 'solve': SolveType.INFEASIBLE
}
detail[risk] = {
@component(bean_name='daily-signal-report') 'max_rtn': max_rtn,
class DailySignalReportor(RoboReportor): 'max_var': max_var,
'minCVaR_whenMaxR': minCVaR_whenMaxR,
@autowired 'min_rtn': min_rtn,
def __init__(self, datum: Datum = None): 'min_var': min_var,
self._datum = datum 'maxCVaR_whenMinV': maxCVaR_whenMinV,
}
return result, detail
@property
def report_name(self) -> str:
return '每日调仓信号'
def load_report(self, max_date=prev_workday(dt.today()), min_date=None) -> List[dict]: @component(bean_name='mpt')
portfolio = rmp.get_one(max_date, type=PortfoliosType.NORMAL, risk=PortfoliosRisk.FT3) class PoemARCPortfoliosBuilder(MptARCPortfoliosBuilder):
result = {} def build_portfolio(self, day, type: PortfoliosType):
result, detail = super(PoemARCPortfoliosBuilder, self).build_portfolio(day, type)
risk = PortfoliosRisk.FT3
# if result[risk]['solve'] is SolveType.INFEASIBLE:
# continue
solver = self._factory.create_solver(risk, type)
solver.reset_navs(day)
min_rtn = detail[risk]['min_rtn']
max_rtn = detail[risk]['max_rtn']
mpt_cvar = result[risk]['cvar']
maxCVaR_whenMinV = detail[risk]['maxCVaR_whenMinV']
portfolio, cvar = solver.solve_poem(min_rtn, max_rtn, mpt_cvar, maxCVaR_whenMinV)
if portfolio: if portfolio:
datum_ids = list(json.loads(portfolio['portfolio']).keys()) result[risk] = {
datums = pd.DataFrame(self._datum.get_datums(type=DatumType.FUND, datum_ids=datum_ids)) 'solve': SolveType.POEM,
datums.set_index('id', inplace=True) 'portfolio': json.dumps(portfolio),
'cvar': cvar
result['risk'] = [portfolio['risk'] for i in datum_ids] }
result['rebalance_type'] = [portfolio['type'] for i in datum_ids] detail[risk]['mpt_cvar'] = mpt_cvar
return result, detail
result['weight'] = [format(i, '.0%') for i in json.loads(portfolio['portfolio']).values()]
result['asset_ids'] = [datums.loc[int(i)]['ftTicker'] for i in datum_ids]
result['name'] = [datums.loc[int(i)]['chineseName'] for i in datum_ids]
result['lipper_id'] = [datums.loc[int(i)]['lipperKey'] for i in datum_ids]
result['date'] = [max_date for i in datum_ids]
result = pd.DataFrame(result)
result = result[['lipper_id', 'asset_ids', 'name', 'weight', 'risk', 'date', 'rebalance_type']]
return result.to_dict('records')
return []
...@@ -6,14 +6,16 @@ __COLUMNS__ = { ...@@ -6,14 +6,16 @@ __COLUMNS__ = {
'rhp_id': 'id', 'rhp_id': 'id',
'rhp_date': 'date', 'rhp_date': 'date',
'rhp_risk': 'risk', 'rhp_risk': 'risk',
'rhp_div': 'dividend',
'rhp_div_acc': 'div_acc', 'rhp_div_acc': 'div_acc',
'rhp_rrs_id': 'signal_id', 'rhp_rrs_id': 'signal_id',
'rhp_rebalance': 'rebalance', 'rhp_rebalance': 'rebalance',
'rhp_portfolios': 'portfolios', 'rhp_portfolios': 'portfolios',
'rhp_nav': 'nav', 'rhp_nav': 'nav',
'rhp_cash': 'cash',
'rhp_fund_av': 'fund_av', 'rhp_fund_av': 'fund_av',
'rhp_fund_nav': 'fund_nav',
'rhp_fund_div': 'fund_div', 'rhp_fund_div': 'fund_div',
'rhp_div_forecast': 'div_forecast',
'rhp_asset_nav': 'asset_nav', 'rhp_asset_nav': 'asset_nav',
'rhp_port_div': 'port_div', 'rhp_port_div': 'port_div',
'v_nav_div_acc': 'acc_av', 'v_nav_div_acc': 'acc_av',
...@@ -40,7 +42,7 @@ def get_one(day, risk: PortfoliosRisk): ...@@ -40,7 +42,7 @@ def get_one(day, risk: PortfoliosRisk):
@read(one=True) @read(one=True)
def get_last_one(risk: PortfoliosRisk = None, max_date=None, rebalance: bool = None, signal_id=None): def get_last_one(risk: PortfoliosRisk = None, max_date=None, rebalance: bool = None, signal_id=None):
sql = "rhp_date <= '{format_date(max_date)}'" if max_date else None sql = f"rhp_date <= '{format_date(max_date)}'" if max_date else None
return f''' return f'''
select {','.join([f'{x[0]} as {x[1]}' for x in __COLUMNS__.items()])} from robo_hold_portfolios select {','.join([f'{x[0]} as {x[1]}' for x in __COLUMNS__.items()])} from robo_hold_portfolios
{where(sql, rhp_risk=risk, rhp_rrs_id=signal_id, rhp_rebalance=rebalance)} {where(sql, rhp_risk=risk, rhp_rrs_id=signal_id, rhp_rebalance=rebalance)}
......
...@@ -10,6 +10,7 @@ __COLUMNS__ = { ...@@ -10,6 +10,7 @@ __COLUMNS__ = {
'rmp_rolve': 'solve', 'rmp_rolve': 'solve',
'rmp_portfolio': 'portfolio', 'rmp_portfolio': 'portfolio',
'rmp_cvar': 'cvar', 'rmp_cvar': 'cvar',
'rmp_create_time': 'create_time'
} }
...@@ -51,3 +52,14 @@ def get_list(max_date=None, min_date=None, type: PortfoliosType = None, risk: Po ...@@ -51,3 +52,14 @@ def get_list(max_date=None, min_date=None, type: PortfoliosType = None, risk: Po
{where(*sqls, rmp_risk=risk, rmp_type=type)} {where(*sqls, rmp_risk=risk, rmp_type=type)}
order by rmp_date order by rmp_date
''' '''
@read(one=True)
def get_last_one(date=None, type: PortfoliosType = None, risk: PortfoliosRisk = None):
sqls = []
if date:
sqls.append(f"rmp_date <= '{format_date(date)}'")
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_mpt_portfolios
{where(*sqls, rmp_risk=risk, rmp_type=type)}
order by rmp_date desc limit 1
'''
\ No newline at end of file
This diff is collapsed.
import math
import os import os
import sys import sys
from logging import DEBUG, getLogger from logging import DEBUG, getLogger
import numpy as np
import pandas as pd import pandas as pd
from dateutil.relativedelta import relativedelta from dateutil.relativedelta import relativedelta
from numpy import NAN from numpy import NAN
from py_jftech import component, autowired, get_config from py_jftech import component, autowired, get_config, filter_weekend
from pyomo.environ import * from pyomo.environ import *
from api import SolverFactory as Factory, PortfoliosRisk, PortfoliosType, AssetPool, Navs, Solver, Datum, DatumType from api import SolverFactory as Factory, PortfoliosRisk, PortfoliosType, AssetPool, Navs, Solver, Datum, DatumType
...@@ -27,14 +29,26 @@ def create_solver(): ...@@ -27,14 +29,26 @@ def create_solver():
@component @component
class DefaultFactory(Factory): class DefaultFactory(Factory):
def create_solver(self, risk: PortfoliosRisk, type: PortfoliosType = PortfoliosType.NORMAL) -> Solver: def __init__(self):
return DefaultSolver(risk, type) self._config = get_config(__name__)
@property
def solver_model(self):
return self._config['model'].upper() if 'model' in self._config and self._config['model'] is not None else None
def create_solver(self, risk: PortfoliosRisk = None, type: PortfoliosType = PortfoliosType.NORMAL) -> Solver:
if self.solver_model == 'ARC':
return ARCSolver(type=type, risk=risk)
if self.solver_model == 'PRR':
if risk == PortfoliosRisk.FT3:
return PRRSolver(type=type, risk=risk)
return DefaultSolver(type=type, risk=risk)
class DefaultSolver(Solver): class DefaultSolver(Solver):
@autowired @autowired
def __init__(self, risk: PortfoliosRisk, type: PortfoliosType, assets: AssetPool = None, navs: Navs = None, def __init__(self, type: PortfoliosType, risk: PortfoliosRisk, assets: AssetPool = None, navs: Navs = None,
datum: Datum = None): datum: Datum = None):
self._category = None self._category = None
self._transfer_type = None self._transfer_type = None
...@@ -284,3 +298,159 @@ class DefaultSolver(Solver): ...@@ -284,3 +298,159 @@ class DefaultSolver(Solver):
'port_CVaR': self.calc_port_cvar(model) 'port_CVaR': self.calc_port_cvar(model)
}) })
logger.debug('-------------------------------') logger.debug('-------------------------------')
class ARCSolver(DefaultSolver):
def __init__(self, type: PortfoliosType, risk: PortfoliosRisk, assets: AssetPool = None, navs: Navs = None,
datum: Datum = None):
super().__init__(type, risk)
self.__date = None
@property
def date(self):
return self.__date
def calc_port_weight(self, model):
id_list = self.navs.columns
weight_list = [model.w[i]._value * model.z[i]._value for i in model.indices]
df_w = pd.DataFrame(data=weight_list, index=id_list, columns=['weight'])
df_w.replace(0, math.nan, inplace=True)
df_w.dropna(axis=0, inplace=True)
df_w['weight'] = pd.Series(format_weight(dict(df_w['weight'])))
dict_w = df_w.to_dict()['weight']
return dict_w
@property
def max_count(self):
count = self.get_config('asset-count')
return count[1] if isinstance(count, list) else count
@property
def min_count(self):
count = self.get_config('asset-count')
return min(count[0] if isinstance(count, list) else count, len(self.rtn_annualized))
def create_model(self):
low_weight = self.get_config('mpt.low-weight')
high_weight = self.get_config('mpt.high-weight')
if isinstance(high_weight, list):
high_weight = high_weight[min(len(self.navs.columns), self.min_count, len(high_weight)) - 1]
model = ConcreteModel()
model.indices = range(0, len(self.navs.columns))
model.w = Var(model.indices, domain=NonNegativeReals)
model.z = Var(model.indices, domain=Binary)
model.cons_sum_weight = Constraint(expr=sum([model.w[i] for i in model.indices]) == 1)
model.cons_num_asset = Constraint(
expr=inequality(self.min_count, sum([model.z[i] for i in model.indices]), self.max_count, strict=False))
model.cons_bounds_low = Constraint(model.indices, rule=lambda m, i: m.z[i] * low_weight <= m.w[i])
model.cons_bounds_up = Constraint(model.indices, rule=lambda m, i: m.z[i] * high_weight >= m.w[i])
if self._config['arc']:
LARC = self._config['LARC']
UARC = self._config['UARC']
numARC = len(LARC) # this is the M in the doc
numAsset = len(self.navs.columns)
# This should from DB. We just fake value here for developing the code
datums = self._datum.get_datums(type=DatumType.FUND, datum_ids=list(self.navs.columns))
AssetARC = np.array([x['customType'] for x in datums], dtype=int)
# the above are input data from either config file or DB
# the following are POEM / MPT code
A = np.zeros((numARC, numAsset), dtype=int)
for i in range(numAsset):
A[AssetARC[i] - 1, i] = 1
model.cons_arc_low = Constraint(range(numARC),
rule=lambda m, i: LARC[i] <= sum([A[i, j] * m.w[j] for j in m.indices]))
model.cons_arc_up = Constraint(range(numARC),
rule=lambda m, i: UARC[i] >= sum([A[i, j] * m.w[j] for j in m.indices]))
return model
def reset_navs(self, day):
self.__date = filter_weekend(day)
asset_ids = self._assets.get_pool(self.date)
asset_risk = self.get_config('navs.risk')
datum = self._datum.get_datums(type=DatumType.FUND, datum_ids=asset_ids, risk=asset_risk)
exclude = self.get_config('navs.exclude-asset-type') or []
asset_ids = list(set(asset_ids) & set([x['id'] for x in datum if x['assetType'] not in exclude]))
min_date = self.date - relativedelta(**self.get_config('navs.range'))
navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=asset_ids, max_date=self.date, min_date=min_date))
navs = navs[navs['nav_date'].dt.day_of_week < 5]
navs['nav_date'] = pd.to_datetime(navs['nav_date'])
navs = navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
navs = navs.sort_index()
navs_nan = navs.isna().sum()
navs.drop(columns=[x for x in navs_nan.index if navs_nan.loc[x] >= self.get_config('navs.max-nan.asset')],
inplace=True)
navs_nan = navs.apply(lambda r: r.isna().sum() / len(r), axis=1)
navs.drop(index=[x for x in navs_nan.index if navs_nan.loc[x] >= self.get_config('navs.max-nan.day')],
inplace=True)
navs.fillna(method='ffill', inplace=True)
if navs.iloc[0].isna().sum() > 0:
navs.fillna(method='bfill', inplace=True)
self.set_navs(navs)
class PRRSolver(ARCSolver):
def __init__(self, type: PortfoliosType, risk: PortfoliosRisk, assets: AssetPool = None, navs: Navs = None,
datum: Datum = None):
super().__init__(type, risk)
self.__risk = None
def create_model(self):
model = super(PRRSolver, self).create_model()
# print(self.risks)
# 创建一个空列表来存储第二列的值
RR = []
# 遍历字典的键值对
for key, value in self.risks.items():
# 将值添加到列表中
RR.append(value)
# 打印第二列的值
# print(RR)
minRRweightWithinTRR = 0.7 + self._config['brr']
TRR = self._config['trr']
# RR = np.zeros(len(self.navs.columns), dtype=int)
# # Please note, RR should come from DB with real values. Here, we just assign fake values for coding
# for i in range(len(self.navs.columns)):
# RR[i] = math.ceil((i + 1) / len(self.navs.columns) * 5)
# the following code are real model code ::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
model.cons_TRR = Constraint(expr=sum([model.w[i] * RR[i] for i in model.indices]) <= TRR)
RR_LE_TRR = np.zeros(len(self.navs.columns), dtype=int)
RR_in_1_5 = np.zeros(len(self.navs.columns), dtype=int)
RR_EQ_5 = np.zeros(len(self.navs.columns), dtype=int)
for i in range(len(self.navs.columns)):
if RR[i] <= TRR:
RR_LE_TRR[i] = 1
if RR[i] > 1 and RR[i] < 5:
RR_in_1_5[i] = 1
elif RR[i] == 5:
RR_EQ_5[i] = 1
model.cons_RR_LE_TRR = Constraint(
expr=sum([model.w[i] * RR_LE_TRR[i] for i in model.indices]) >= minRRweightWithinTRR)
if TRR < 5:
model.cons_RR_in_1_5 = Constraint(
expr=sum([model.z[i] * (RR_in_1_5[i] * self.max_count - RR_EQ_5[i]) for i in model.indices]) >= 0)
return model
def reset_navs(self, day):
super(PRRSolver, self).reset_navs(day=day)
datums = self._datum.get_datums(type=DatumType.FUND, datum_ids=list(self.navs.columns))
self.__risk = {x['id']: x['risk'] for x in datums}
# self.__risk = {x['risk'] for x in datums}
@property
def risks(self):
return self.__risk
import json
from abc import ABC
from datetime import datetime as dt
from datetime import timedelta
from functools import reduce
from typing import List
import pandas as pd
from py_jftech import component, autowired, get_config, prev_workday
from py_jftech import is_workday
from api import PortfoliosBuilder
from api import (
PortfoliosRisk, RebalanceSignal, SignalType, PortfoliosType, PortfoliosHolder,
RoboReportor, Datum, DatumType
)
from rebalance.dao import robo_rebalance_signal as rrs
@component(bean_name='base-signal')
class BaseRebalanceSignal(RebalanceSignal, ABC):
@autowired
def __init__(self, builder: PortfoliosBuilder = None):
self._builder = builder
def get_signal(self, day, risk: PortfoliosRisk):
signal = rrs.get_one(type=self.signal_type, risk=risk, date=day)
if signal:
return signal
trigger = self.need_rebalance(day, risk)
if trigger:
portfolio_type = self.portfolio_type
portfolio = self._builder.get_portfolios(day, risk, portfolio_type)
id = rrs.insert({
'date': day,
'type': self.signal_type,
'risk': risk,
'portfolio_type': portfolio_type,
'portfolio': portfolio,
'effective': 1
})
return rrs.get_by_id(id)
return None
def need_rebalance(self, day, risk: PortfoliosRisk) -> bool:
# 若记录为空则,将传入日期作为初始日期,进行build
signal = rrs.get_last_one(day, risk, SignalType.NORMAL, effective=None)
if signal:
frequency = get_config('portfolios')['holder']['warehouse-frequency']
date = pd.to_datetime(day.replace(day=1)) + pd.DateOffset(months=frequency)
date = date - timedelta(days=1)
# 指定周期末的工作日
date = date if is_workday(date) else prev_workday(date)
if date == day:
return True
elif signal['date'] == day:
return True
else:
return False
else:
return True
@property
def portfolio_type(self):
return self.signal_type.p_type
@property
def signal_type(self) -> SignalType:
return SignalType.NORMAL
def get_last_signal(self, day, risk: PortfoliosRisk):
last_re = rrs.get_last_one(max_date=day, risk=risk, effective=True)
return last_re
def clear(self, min_date=None, risk: PortfoliosRisk = None):
rrs.delete(min_date=min_date, risk=risk)
@component(bean_name='signal-report')
class SignalReportor(RoboReportor):
@autowired
def __init__(self, hold: PortfoliosHolder = None, datum: Datum = None):
self._hold = hold
self._datum = datum
@property
def report_name(self) -> str:
return '调仓信号'
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
result = []
datums = {str(x['id']): x for x in self._datum.get_datums(type=DatumType.FUND, exclude=False)}
for signal in rrs.get_list(max_date=max_date, min_date=prev_workday(min_date), effective=True):
rebalance_date = self._hold.get_rebalance_date_by_signal(signal['id'])
if rebalance_date:
for fund_id, weight in json.loads(signal['portfolio']).items():
result.append({
'risk': PortfoliosRisk(signal['risk']).name,
'type': SignalType(signal['type']).name,
'signal_date': signal['date'],
'rebalance_date': rebalance_date,
'portfolio_type': PortfoliosType(signal['portfolio_type']).name,
'ft_ticker': datums[fund_id]['ftTicker'],
'blooberg_ticker': datums[fund_id]['bloombergTicker'],
'fund_name': datums[fund_id]['chineseName'],
'weight': weight
})
return result
@component(bean_name='daily-signal-report')
class DailySignalReportor(RoboReportor):
@autowired
def __init__(self, hold: PortfoliosHolder = None, datum: Datum = None):
self._hold = hold
self._datum = datum
@property
def report_name(self) -> str:
return '每日调仓信号'
def load_report(self, max_date=prev_workday(dt.today()), min_date=None) -> List[dict]:
signals = pd.DataFrame(rrs.get_list(max_date=max_date, min_date=min_date))
signals = signals[(signals['date'].dt.date == max_date.date())]
if not signals.empty:
datum_ids = reduce(lambda x, y: x | y, signals['portfolio'].apply(lambda x: set(json.loads(x).keys())))
datums = pd.DataFrame(self._datum.get_datums(type=DatumType.FUND, datum_ids=datum_ids))
datums.set_index('id', inplace=True)
signals['risk'] = signals.apply(lambda row: PortfoliosRisk(row['risk']).name, axis=1)
signals['rebalance_type'] = signals.apply(lambda row: SignalType(row['type']).name, axis=1)
signals['portfolio_type'] = signals.apply(lambda row: PortfoliosType(row['portfolio_type']).name, axis=1)
signals['portfolio'] = signals.apply(lambda row: [x for x in json.loads(row['portfolio']).items()], axis=1)
signals = signals.explode('portfolio', ignore_index=True)
signals['weight'] = signals.apply(lambda row: format(row['portfolio'][1], '.0%'), axis=1)
signals['asset_ids'] = signals.apply(lambda row: datums.loc[int(row['portfolio'][0])]['ftTicker'], axis=1)
signals['name'] = signals.apply(lambda row: datums.loc[int(row['portfolio'][0])]['chineseName'], axis=1)
signals['lipper_id'] = signals.apply(lambda row: datums.loc[int(row['portfolio'][0])]['lipperKey'], axis=1)
signals = signals[['lipper_id', 'asset_ids', 'name', 'weight', 'risk', 'date', 'rebalance_type']]
return signals.to_dict('records')
return []
CREATE TABLE IF NOT EXISTS robo_rebalance_signal
(
rrs_id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
rrs_date DATETIME NOT NULL COMMENT '信号日期',
rrs_type TINYINT NOT NULL COMMENT '信号类型',
rrs_risk TINYINT NOT NULL COMMENT '风险等级',
rrs_p_type VARCHAR(255) DEFAULT NULL COMMENT '投组类型',
rrs_p_weight JSON DEFAULT NULL COMMENT '投组信息',
rrs_effective TINYINT NOT NULL DEFAULT 0 COMMENT '是否生效',
rrs_create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
rrs_update_time DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (rrs_id),
INDEX (rrs_date),
INDEX (rrs_type),
INDEX (rrs_risk)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT '再平衡信号表';
from py_jftech import read, write, where, format_date, mapper_columns, to_tuple
from api import SignalType, PortfoliosRisk
__COLUMNS__ = {
'rrs_id': 'id',
'rrs_date': 'date',
'rrs_type': 'type',
'rrs_risk': 'risk',
'rrs_p_type': 'portfolio_type',
'rrs_p_weight': 'portfolio',
'rrs_effective': 'effective',
}
@read
def get_list(min_date=None, max_date=None, risk: PortfoliosRisk = None, type: SignalType = None, effective: bool = None):
sqls = []
if min_date:
sqls.append(f"rrs_date >= '{format_date(min_date)}'")
if max_date:
sqls.append(f"rrs_date <= '{format_date(max_date)}'")
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
{where(*sqls, rrs_risk=risk, rrs_type=type, rrs_effective=effective)} order by rrs_risk, rrs_date
'''
@read
def get_by_ids(ids):
return f'''select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal {where(rrs_id=to_tuple(ids))}'''
@read(one=True)
def get_by_id(id):
return f'''select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal {where(rrs_id=id)}'''
@read(one=True)
def get_one(type: SignalType, risk: PortfoliosRisk, date):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
{where(rrs_date=date, rrs_type=type, rrs_risk=risk)}
'''
@read(one=True)
def get_first_after(type: SignalType, risk: PortfoliosRisk, min_date, effective=None):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
{where(f"rrs_date >= '{format_date(min_date)}'", rrs_type=type, rrs_risk=risk, rrs_effective=effective)} order by rrs_date limit 1
'''
@read(one=True)
def get_last_one(max_date, risk: PortfoliosRisk, type: SignalType = None, effective=None):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
{where(f"rrs_date <= '{format_date(max_date)}'", rrs_type=type, rrs_risk=risk, rrs_effective=effective)} order by rrs_date desc limit 1
'''
def get_count(risk: PortfoliosRisk = None, day=None, effective=None):
@read(one=True)
def exec():
return f"select count(*) as `count` from robo_rebalance_signal {where(rrs_risk=risk, rrs_date=day, rrs_effective=effective)}"
result = exec()
return result['count']
@write
def insert(datas):
datas = mapper_columns(datas=datas, columns=__COLUMNS__)
return f'''
insert into robo_rebalance_signal({','.join([x for x in datas.keys()])})
values ({','.join([f"'{x[1]}'" for x in datas.items()])})
'''
@write
def update(id, datas):
datas = mapper_columns(datas=datas, columns=__COLUMNS__)
return f'''
update robo_rebalance_signal
set {','.join([f"{x[0]} = '{x[1]}'" for x in datas.items()])}
where rrs_id = {id}
'''
@write
def delete_by_id(id):
return f"delete from robo_rebalance_signal where rrs_id = {id}"
@write
def delete(min_date=None, risk: PortfoliosRisk = None):
if min_date is None and risk is None:
return 'truncate table robo_rebalance_signal'
else:
sql = f"rrs_date >= '{format_date(min_date)}'" if min_date else None
return f"delete from robo_rebalance_signal {where(sql, rrs_risk=risk)}"
...@@ -23,7 +23,7 @@ class DivAlligamComboDatasReportor(RoboReportor): ...@@ -23,7 +23,7 @@ class DivAlligamComboDatasReportor(RoboReportor):
holds = pd.DataFrame(self._hold_reportor.load_report(max_date=max_date, min_date=min_date)) holds = pd.DataFrame(self._hold_reportor.load_report(max_date=max_date, min_date=min_date))
if not holds.empty: if not holds.empty:
holds.set_index('date', inplace=True) holds.set_index('date', inplace=True)
holds = holds[['real_av', 'acc_av', 'nav']] holds = holds[['real_av', 'acc_av', 'nav', 'fund_nav']]
holds.rename(columns={'real_av': 'av', 'acc_av': 'acc'}, inplace=True) holds.rename(columns={'real_av': 'av', 'acc_av': 'acc'}, inplace=True)
benchmark = pd.DataFrame(self._benchmark.load_report(max_date=max_date, min_date=min_date)) benchmark = pd.DataFrame(self._benchmark.load_report(max_date=max_date, min_date=min_date))
......
...@@ -9,7 +9,7 @@ openpyxl==3.0.10 ...@@ -9,7 +9,7 @@ openpyxl==3.0.10
pandas==1.5.1 pandas==1.5.1
pandas-datareader==0.10.0 pandas-datareader==0.10.0
ply==3.11 ply==3.11
PyJFTech==1.2.0 PyJFTech==1.3.0
PyMySQL==1.0.2 PyMySQL==1.0.2
Pyomo==6.4.3 Pyomo==6.4.3
python-dateutil==2.8.2 python-dateutil==2.8.2
...@@ -20,3 +20,15 @@ requests==2.28.1 ...@@ -20,3 +20,15 @@ requests==2.28.1
scipy==1.9.3 scipy==1.9.3
six==1.16.0 six==1.16.0
urllib3==1.26.12 urllib3==1.26.12
fastapi==0.100.0
uvicorn==0.23.1
apscheduler==3.10.1
sklearn
finta
keras
tensorflow
matplotlib
lightgbm
\ No newline at end of file
import logging import logging
import sys import sys
from concurrent.futures import wait from concurrent.futures import wait
from datetime import datetime as dt, timedelta from datetime import datetime as dt
from typing import List from typing import List
import pandas as pd import pandas as pd
...@@ -12,7 +12,7 @@ from py_jftech import ( ...@@ -12,7 +12,7 @@ from py_jftech import (
from api import ( from api import (
RoboExecutor, Datum, AssetPool, PortfoliosBuilder, RoboExecutor, Datum, AssetPool, PortfoliosBuilder,
PortfoliosRisk, PortfoliosHolder, DataSync, RoboExportor, BacktestStep PortfoliosRisk, PortfoliosHolder, DataSync, RoboExportor, BacktestStep, RebalanceSignal
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -21,32 +21,31 @@ logger = logging.getLogger(__name__) ...@@ -21,32 +21,31 @@ logger = logging.getLogger(__name__)
@component(bean_name='backtest') @component(bean_name='backtest')
class BacktestExecutor(RoboExecutor): class BacktestExecutor(RoboExecutor):
@autowired(names={'datum': 'hk-datum'}) @autowired
def __init__(self, datum: Datum = None, pool: AssetPool = None, def __init__(self, datum: Datum = None, pool: AssetPool = None,
syncs: List[DataSync] = None, export: RoboExportor = None, syncs: List[DataSync] = None, export: RoboExportor = None,
builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None): builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None,
signal: RebalanceSignal = None):
self._datum = datum self._datum = datum
self._pool = pool self._pool = pool
self._builder = builder self._builder = builder
self._hold = hold self._hold = hold
self._syncs = syncs self._syncs = syncs
self._export = export self._export = export
self._signal = signal
self._config = get_config(__name__)['backtest'] self._config = get_config(__name__)['backtest']
@staticmethod @staticmethod
def get_first_business_day(start_date, end_date): def get_last_business_day(start_date, end_date):
start_date = prev_workday(start_date)
# 生成日期范围并转换为DataFrame # 生成日期范围并转换为DataFrame
dates = pd.date_range(start_date, end_date, freq='MS') dates = pd.date_range(start_date, end_date, freq='M')
dates = dates.insert(0, start_date) if dates[0] != start_date:
dates = dates.insert(0, start_date)
df = pd.DataFrame({'dates': dates}) df = pd.DataFrame({'dates': dates})
# 提取每个月的第一个工作日
df['first_business_day'] = df['dates'].apply(
lambda x: pd.date_range(start=x, end=x + pd.offsets.MonthEnd(0), freq='B')[0]
)
# 每隔n个月提取第一个工作日
result = [] result = []
for i in range(0, len(df), get_config('portfolios')['holder']['warehouse-frequency']): for i in range(0, len(df), get_config('portfolios')['holder']['warehouse-frequency']):
result.append(df.iloc[i]['first_business_day']) result.append(df.iloc[i]['dates'])
delta = workday_range(result[0], result[1]) delta = workday_range(result[0], result[1])
period = get_config(__name__)['backtest']['sealing-period'] period = get_config(__name__)['backtest']['sealing-period']
if len(delta) <= period: if len(delta) <= period:
...@@ -85,6 +84,7 @@ class BacktestExecutor(RoboExecutor): ...@@ -85,6 +84,7 @@ class BacktestExecutor(RoboExecutor):
BacktestStep.NORMAL_PORTFOLIO): BacktestStep.NORMAL_PORTFOLIO):
logger.info('start to clear normal portfolios'.center(50, '-')) logger.info('start to clear normal portfolios'.center(50, '-'))
self._builder.clear() self._builder.clear()
self._signal.clear()
if self.start_step.within(BacktestStep.HOLD_PORTFOLIO) and self.end_step.without(BacktestStep.HOLD_PORTFOLIO): if self.start_step.within(BacktestStep.HOLD_PORTFOLIO) and self.end_step.without(BacktestStep.HOLD_PORTFOLIO):
logger.info('start to clear hold portfolios'.center(50, '-')) logger.info('start to clear hold portfolios'.center(50, '-'))
self._hold.clear() self._hold.clear()
...@@ -98,7 +98,7 @@ class BacktestExecutor(RoboExecutor): ...@@ -98,7 +98,7 @@ class BacktestExecutor(RoboExecutor):
if self.start_step.within(BacktestStep.ASSET_POOL) and self.end_step.without(BacktestStep.ASSET_POOL): if self.start_step.within(BacktestStep.ASSET_POOL) and self.end_step.without(BacktestStep.ASSET_POOL):
logger.info("start to build asset pool".center(50, '-')) logger.info("start to build asset pool".center(50, '-'))
now = dt.now() now = dt.now()
workdays = self.get_first_business_day(self.start_date, self.end_date) workdays = self.get_last_business_day(self.start_date, self.end_date)
for date in workdays: for date in workdays:
self._pool.get_pool(date) self._pool.get_pool(date)
logger.info(f"build asset pool success, use[{(dt.now() - now).seconds}s]") logger.info(f"build asset pool success, use[{(dt.now() - now).seconds}s]")
...@@ -107,7 +107,7 @@ class BacktestExecutor(RoboExecutor): ...@@ -107,7 +107,7 @@ class BacktestExecutor(RoboExecutor):
logger.info("start to build normal portfolios".center(50, '-')) logger.info("start to build normal portfolios".center(50, '-'))
now = dt.now() now = dt.now()
wait([self.async_build_portfolios(day, risk) for risk in PortfoliosRisk for day in wait([self.async_build_portfolios(day, risk) for risk in PortfoliosRisk for day in
self.get_first_business_day(self.start_date, self.end_date)]) self.get_last_business_day(self.start_date, self.end_date)])
logger.info(f"build normal portfolios success, use[{(dt.now() - now).seconds}s]") logger.info(f"build normal portfolios success, use[{(dt.now() - now).seconds}s]")
if self.start_step.within(BacktestStep.HOLD_PORTFOLIO) and self.end_step.without(BacktestStep.HOLD_PORTFOLIO): if self.start_step.within(BacktestStep.HOLD_PORTFOLIO) and self.end_step.without(BacktestStep.HOLD_PORTFOLIO):
logger.info("start to build hold portfolios".center(50, '-')) logger.info("start to build hold portfolios".center(50, '-'))
...@@ -137,7 +137,8 @@ class RealExecutor(RoboExecutor): ...@@ -137,7 +137,8 @@ class RealExecutor(RoboExecutor):
@autowired(names={'daily_export': 'daily-real-export', 'monitor_export': 'daily-monitor-export'}) @autowired(names={'daily_export': 'daily-real-export', 'monitor_export': 'daily-monitor-export'})
def __init__(self, builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None, syncs: List[DataSync] = None, def __init__(self, builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None, syncs: List[DataSync] = None,
daily_export: RoboExportor = None, monitor_export: RoboExportor = None, pool: AssetPool = None, ): daily_export: RoboExportor = None, monitor_export: RoboExportor = None, pool: AssetPool = None,
signal: RebalanceSignal = None):
self._builder = builder self._builder = builder
self._pool = pool self._pool = pool
self._hold = hold self._hold = hold
...@@ -145,6 +146,7 @@ class RealExecutor(RoboExecutor): ...@@ -145,6 +146,7 @@ class RealExecutor(RoboExecutor):
self._daily_export = daily_export self._daily_export = daily_export
self._monitor_export = monitor_export self._monitor_export = monitor_export
self._config = get_config(__name__)['real'] self._config = get_config(__name__)['real']
self._signal = signal
@property @property
def start_date(self): def start_date(self):
...@@ -181,18 +183,11 @@ class RealExecutor(RoboExecutor): ...@@ -181,18 +183,11 @@ class RealExecutor(RoboExecutor):
for risk in PortfoliosRisk: for risk in PortfoliosRisk:
logger.info(f"start to build risk[{risk.name}] real for date[{format_date(date)}]".center(50, '-')) logger.info(f"start to build risk[{risk.name}] real for date[{format_date(date)}]".center(50, '-'))
now = dt.now() now = dt.now()
first_day = date.replace(day=1) # 因为每天都必须有NORMAL最优投组,不管用不用
prev_month = first_day - timedelta(days=1) self._builder.get_portfolios(date, risk)
prev_month.replace(day=prev_month.day) self._signal.get_signal(date, risk)
prev_month = prev_month if is_workday(prev_month) else prev_workday(prev_month) # 更新持仓
self._pool.get_pool(prev_month) self._hold.build_hold_portfolio(date, risk)
self._builder.get_portfolios(prev_month, risk)
next_month = date.replace(day=28) + timedelta(days=4)
prev_month = next_month.replace(day=1) - timedelta(days=1)
prev_month = prev_month if is_workday(prev_month) else prev_workday(prev_month)
if date.day == prev_month.day:
self._pool.get_pool(date)
self._builder.get_portfolios(date, risk)
logger.info( logger.info(
f"build risk[{risk.name}] real for date[{format_date(date)}] success, use[{(dt.now() - now).seconds}s]") f"build risk[{risk.name}] real for date[{format_date(date)}] success, use[{(dt.now() - now).seconds}s]")
if self.export: if self.export:
......
import datetime as dt
import json
from multiprocessing import Process
import uvicorn
from apscheduler.schedulers.blocking import BlockingScheduler
from fastapi import FastAPI
from py_jftech import prev_workday, filter_weekend, autowired
import main
from api import DatumType, PortfoliosRisk, Datum
app = FastAPI()
REC_GID = 'E3886FBA-123B-7890-123E-123456BEEED'
def get_today_rec():
from portfolios.dao import robo_mpt_portfolios as rmp
from api import PortfoliosType, PortfoliosRisk
day = prev_workday(filter_weekend(dt.date.today()))
portfolio = rmp.get_one(day, PortfoliosType.NORMAL, PortfoliosRisk.FT3)
return portfolio
def get_last_signal():
from rebalance.dao import robo_rebalance_signal as rrs
day = prev_workday(filter_weekend(dt.date.today()))
last_re = rrs.get_last_one(max_date=day, risk=PortfoliosRisk.FT3, effective=True)
return last_re
@autowired
def get_fund_infos(datum: Datum = None):
return datum.get_datums(DatumType.FUND)
@app.get("/recommend")
async def root():
portfolio = get_today_rec()
if portfolio:
fund_infos = get_fund_infos()
id_ticker_map = {str(info['id']): info for info in fund_infos}
funds = json.loads(portfolio['portfolio'])
rec_list = []
portfolios = {'recomm_guid': REC_GID}
data = {'recomm_guid': REC_GID}
data['data_date'] = portfolio['date'].strftime('%Y-%m-%d')
data['funds'] = [{'weight': round(weight * 100), 'fund_id': id_ticker_map[key]['ftTicker']} for key, weight in
funds.items()]
data['creat_date'] = portfolio['create_time'].strftime('%Y-%m-%d %H:%M:%S')
# todo 补全
# returns = round(datas.pct_change(), 5)
# data['cp'] = sharpe_ratio(returns, risk_free=0, period='daily', annualization=None),
data['rr'] = 0.81
data['roi'] = 0.81
data['risk'] = round(sum([id_ticker_map[key]['risk'] * weight for key, weight in funds.items()]), 2)
note = {}
sig = get_last_signal()
note['last_reg_reb'] = sig['date'].strftime('%Y-%m-%d')
data['note'] = json.dumps(note)
portfolios['data'] = data
rec_list.append(portfolios)
return rec_list
else:
return {'msg': '当日投组未产生,待10:00后获取'}
def start_robo():
# 异常情况可以重启跑当天投组
current_time = dt.datetime.now()
target_time = dt.time(10, 0)
if current_time.time() > target_time:
main.start()
# 开启定时任务,执行实盘
scheduler = BlockingScheduler()
scheduler.add_job(main.start, 'cron', day_of_week='0-4', hour=10, minute=00)
scheduler.start()
def start_web():
uvicorn.run("robo_controller:app", reload=True, port=8080)
if __name__ == "__main__":
# 开启一个进程执行start_robo()
p1 = Process(target=start_robo)
p1.start()
# 启动进程2
p2 = Process(target=start_web)
p2.start()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment