Commit f6fff7b4 authored by stephen.wang's avatar stephen.wang

Merge remote-tracking branch 'origin/dev-dividend' into dev-dividend

parents 1a0b6814 4a5dbfe8
from typing import List
from py_jftech import autowired
from ai.dao.robo_datas import get_base_info
from ai.data_access import DataAccess
from ai.model_trainer import ModelTrainer
from ai.noticer import upload_predict
from ai.training_data_builder import TrainingDataBuilder
from api import DataSync
# 截止日期
max_date = None
# max_date = '2023-12-01'
# 待预测指数
# PREDICT_LIST = [67]
PREDICT_LIST = [67, 121, 122, 123]
eco = [65, 66, 74, 134]
index = [67, 68, 69, 70, 71, 72, 73, 75, 116, 117, 138, 139, 142, 143, 140, 141, 144, 145, 146]
fund = [121, 122, 123]
@autowired
def sync(syncs: List[DataSync] = None):
for s in syncs:
# if isinstance(s, (IndexSync, EcoSync)):
s.do_sync()
def predictionFromMoel(the_model, scaledX_forecast, predict_item, indexDict: dict):
prediction = the_model.predict(scaledX_forecast)
predictionStr = 'DOWN'
if (prediction > 0.5):
predictionStr = 'UP'
content = f"""\n On day {forecastDay.strftime("%m/%d/%Y")}, the model predicts {predict_item} to be {predictionStr} in {str(numForecastDays)} business days. \n"""
print(content)
# 上传预测结果
# key = [k for k, v in indexDict.items() if v == predict_item]
# index_info = get_base_info(key)[0]
# upload_predict(index_info['ticker'], forecastDay, predictionStr)
# send(content)
return prediction
########################################
if __name__ == '__main__':
sync()
toForecast = False # False means test, True means forecast
# define some parameters
win1W = 5 # 1 week
win1M = 21 # 1 Month
win1Q = 63 # 1 Quarter
numForecastDays = 21 # business days, 21 business days means one month
theThreshold = 0.0
indexDict = {
65: "CPI_YOY",
66: "FDTR",
67: "SPX",
68: "USGG10YR",
69: "USGG2YR",
70: "MXWO", # not use now
71: "MXWD", # not use now
72: "CCMP",
73: "TWSE", # not use now
74: "CPURNSA",
75: "VIX",
76: "US0001M",
77: "US0012M",
# FUND
121: "IEF_US",
122: "TLT_US",
123: "UUP_US",
139: "COI_TOTL",
138: "LEI_TOTL",
116: "MID",
134: "NAPMPMI",
142: "OE4EKLAC",
143: "OEA5KLAC",
146: "OECNKLAC",
145: "OEJPKLAC",
141: "OEOTGTAC",
144: "OEUSKLAC",
117: "SML",
140: "USRINDEX"
}
###################
# Step 1: Prepare X and y (features and labels)
# 准备基础数据
data_access = DataAccess(index, eco, fund, max_date, indexDict)
indexData = data_access.get_index_datas()
ecoData = data_access.get_eco_datas()
fundData = data_access.get_fund_datas()
# 指数数据准备
vixData = data_access.get_vix(indexData)
indexOtherData = data_access.get_other_index(indexData)
# 经济指标数据准备
cpiData = data_access.get_cpi(ecoData)
FDTRData = data_access.get_fdtr(ecoData)
# 新增指标 NAPMPMI :美國的ISM製造業指數 (Monthly)
NAPMPMIData = data_access.get_napmpmi(ecoData)
builder = TrainingDataBuilder(index, eco, fund, indexDict, toForecast, win1W, win1M, win1Q, numForecastDays,
theThreshold)
for pid in PREDICT_LIST:
print(f'{indexDict[pid]} start '.center(50, '='))
t_data = indexData if pid in index else fundData
X_train, X_test, y_train, y_test, scaledX_forecast, forecastDay = \
builder.build_train_test(pid, t_data, vixData, indexOtherData, cpiData, FDTRData, NAPMPMIData)
trainer = ModelTrainer(toForecast)
rf_model = trainer.train_random_forest(X_train, y_train, X_test, y_test)
gbt_model = trainer.train_GBT(X_train, y_train, X_test, y_test)
svc_model = trainer.train_SVC(X_train, y_train, X_test, y_test)
ensemble_model = trainer.ensemble_model(rf_model, gbt_model, svc_model, X_train, y_train, X_test, y_test)
if (toForecast):
predictionFromMoel(ensemble_model, scaledX_forecast, indexDict[pid], indexDict)
from py_jftech import read, to_tuple, where
@read
def get_index_list(index_ids=None, min_date=None, max_date=None):
sqls = []
if min_date:
sqls.append(f"rid_date >= '{min_date}'")
if max_date:
sqls.append(f"rid_date <= '{max_date}'")
return f'''
select * from robo_index_datas
{where(*sqls, rid_index_id=to_tuple(index_ids))} order by rid_index_id, rid_date
'''
@read
def get_eco_list(eco_ids=None, min_date=None, max_date=None):
sqls = []
if min_date:
sqls.append(f"red_date >= '{min_date}'")
if max_date:
sqls.append(f"red_date <= '{max_date}'")
return f'''
select * from robo_eco_datas
{where(*sqls, red_eco_id=to_tuple(eco_ids))} order by red_eco_id, red_date
'''
@read
def get_fund_list(fund_ids=None, min_date=None, max_date=None):
sqls = []
if min_date:
sqls.append(f"rfn_date >= '{min_date}'")
if max_date:
sqls.append(f"rfn_date <= '{max_date}'")
return f'''
select * from robo_fund_navs
{where(*sqls, rfn_fund_id=to_tuple(fund_ids))} order by rfn_fund_id, rfn_date
'''
@read
def get_base_info(ids=None):
sqls = []
return f"""
SELECT rbd_id id,v_rbd_bloomberg_ticker ticker,v_rbd_type type FROM `robo_base_datum`
{where(*sqls,rbd_id=to_tuple(ids))}
"""
\ No newline at end of file
from abc import ABC
import pandas as pd
from ai.dao.robo_datas import get_eco_list, get_fund_list, get_index_list
class DataAccess(ABC):
def __init__(self, index, eco, fund, max_date, indexDict) -> None:
super().__init__()
self._index = index
self._eco = eco
self._fund = fund
self._max_date = max_date
self._indexDict = indexDict
def get_index_datas(self):
indexData = pd.DataFrame(
get_index_list(index_ids=self._index, max_date=self._max_date))
# todo erp 没有数据 "rid_erp",
indexData = indexData[
["rid_index_id", "rid_date", "rid_high", "rid_open", "rid_low", "rid_close", "rid_pe", "rid_pb",
"rid_volume", "rid_frdpe", "rid_frdpes", "rid_pc"]]
indexData.rename(columns={"rid_date": 'date'}, inplace=True) # please use 'date'
indexData["rid_index_id"] = indexData["rid_index_id"].map(self._indexDict)
indexData['rid_frdpe'].ffill(inplace=True)
return indexData
def get_eco_datas(self):
ecoData = pd.DataFrame(
get_eco_list(eco_ids=self._eco, max_date=self._max_date))
ecoData = ecoData[["red_eco_id", "red_date", "red_indicator"]]
ecoData.rename(columns={"red_date": 'date'}, inplace=True) # please use 'date'
ecoData["red_eco_id"] = ecoData["red_eco_id"].map(self._indexDict)
return ecoData
def get_fund_datas(self):
fundData = pd.DataFrame(
get_fund_list(fund_ids=self._fund, max_date=self._max_date))
fundData = fundData[["rfn_fund_id", "rfn_date", "rfn_nav_cal"]]
fundData.rename(columns={"rfn_date": 'date'}, inplace=True) # please use 'date'
fundData["rfn_fund_id"] = fundData["rfn_fund_id"].map(self._indexDict)
return fundData
def get_vix(self, indexData):
# VIX:芝加哥期权交易所SPX波动率指
vixData = indexData[indexData['rid_index_id'] == "VIX"].copy()
vixData = vixData[["date", "rid_high", "rid_open", "rid_low", "rid_close"]]
vixData.rename(
columns={"rid_high": 'vix_high', 'rid_open': 'vix_open', "rid_low": 'vix_low', "rid_close": 'vix_close'},
inplace=True)
vixData.set_index('date', inplace=True)
vixData.index = pd.to_datetime(vixData.index)
return vixData
def get_other_index(self, indexData):
other_index = ["USGG10YR", "USGG2YR", "CCMP", "US0001M", "US0012M", "COI_TOTL", "LEI_TOTL", "MID",
"OE4EKLAC", "OEA5KLAC", "OECNKLAC", "OEJPKLAC", "OEOTGTAC", "OEUSKLAC", "USRINDEX", "SPX"]
cols = ['date', 'rid_close', 'rid_pe', 'rid_pb', 'rid_volume', 'rid_frdpe', 'rid_frdpes', 'rid_pc']
indexOtherData = pd.DataFrame()
idxs = [self._indexDict[i] for i in self._index]
for idx in other_index:
if idx in idxs:
idx_data = indexData[indexData['rid_index_id'] == idx].copy()
idx_data = idx_data[cols]
idx_data.rename(
columns={"rid_close": f'{idx}_close', 'rid_pe': f'{idx}_pe', 'rid_pb': f'{idx}_pb',
'rid_volume': f'{idx}_volume', 'rid_frdpe': f'{idx}_frdpe', 'rid_frdpes': f'{idx}_frdpes',
'rid_pc': f'{idx}_pc'},
inplace=True)
idx_data.set_index('date', inplace=True)
idx_data.index = pd.to_datetime(idx_data.index)
if indexOtherData.size > 0:
indexOtherData = pd.merge(indexOtherData, idx_data, how='outer', on='date')
else:
indexOtherData = idx_data
indexOtherData.ffill(inplace=True)
indexOtherData.bfill(inplace=True)
indexOtherData = indexOtherData.dropna(axis=1)
return indexOtherData
def get_cpi(self, ecoData):
# CPI_YOY:美国城镇消费物价指数同比未经季 CPURNSA:美国消费者物价指数未经季调
cpiData = ecoData[(ecoData['red_eco_id'] == "CPI_YOY") | (ecoData['red_eco_id'] == "CPURNSA")].copy()
cpiData = cpiData.pivot(index='date', columns='red_eco_id', values='red_indicator')
cpiData['CPI_MOM'] = (cpiData['CPURNSA'] / cpiData['CPURNSA'].shift(
1) - 1.0) * 100 * 12 # Annualized Percentage
cpiData['CPI_MOM_Diff'] = cpiData['CPURNSA'] - cpiData['CPURNSA'].shift(1)
cpiData.index = pd.to_datetime(cpiData.index)
return cpiData
def get_fdtr(self, ecoData):
# FDTR 美国联邦基金目标利率
FDTRData = ecoData[ecoData['red_eco_id'] == "FDTR"].copy()
del (FDTRData['red_eco_id'])
FDTRData.rename(columns={"red_indicator": 'FDTR'}, inplace=True)
FDTRData.set_index('date', inplace=True)
FDTRData.index = pd.to_datetime(FDTRData.index)
return FDTRData
def get_napmpmi(self, ecoData):
# 新增指标 NAPMPMI :美國的ISM製造業指數 (Monthly)
NAPMPMIData = ecoData[ecoData['red_eco_id'] == "NAPMPMI"].copy()
del (NAPMPMIData['red_eco_id'])
NAPMPMIData.rename(columns={"red_indicator": 'NAPMPMI'}, inplace=True)
NAPMPMIData.set_index('date', inplace=True)
NAPMPMIData.index = pd.to_datetime(NAPMPMIData.index)
return NAPMPMIData
from abc import ABC
import matplotlib.pyplot as plt
from lightgbm import LGBMClassifier
from sklearn import svm
from sklearn.ensemble import RandomForestClassifier, VotingClassifier
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay, accuracy_score
class ModelTrainer(ABC):
"""
模型训练类
"""
def __init__(self, toForecast) -> None:
super().__init__()
self._toForecast = toForecast
###################
# Step 3: Train the model
def test_model(self, strMethod, classifier, X_test, y_test):
print(strMethod + " ====== test results ======")
y_pred = classifier.predict(X_test)
result0 = confusion_matrix(y_test, y_pred, labels=[0, 1])
print(strMethod + " Confusion Matrix:")
print(result0)
result1 = classification_report(y_test, y_pred, zero_division=1.0)
print(strMethod + " Classification Report:")
print(result1)
result2 = accuracy_score(y_test, y_pred)
print(strMethod + " Accuracy:", result2)
cm_display = ConfusionMatrixDisplay(confusion_matrix=result0, display_labels=['Down', 'Up'])
cm_display.plot()
plt.title(strMethod + ' Accuracy: ' + f'{result2:.0%}')
plt.show()
def train_random_forest(self, X_train, y_train, X_test, y_test):
classifier = RandomForestClassifier()
classifier.fit(X_train, y_train)
if not self._toForecast:
self.test_model('Random Forest', classifier, X_test, y_test)
return classifier
def train_GBT(self, X_train, y_train, X_test, y_test):
# Gradient Boosted Tree
classifierGBT = LGBMClassifier()
classifierGBT.fit(X_train, y_train)
if not self._toForecast:
self.test_model('Gradient Boosted Tree', classifierGBT, X_test, y_test)
return classifierGBT
def train_SVC(self, X_train, y_train, X_test, y_test):
# Support Vector Machines
classifierSVC = svm.SVC()
classifierSVC.fit(X_train, y_train)
if not self._toForecast:
self.test_model('Support Vector Machines', classifierSVC, X_test, y_test)
return classifierSVC
def ensemble_model(self, rf_model, gbt_model, svc_model, X_train, y_train, X_test, y_test):
# Create a dictionary of our models
estimators = [('rf', rf_model), ('gbt', gbt_model), ('svc', svc_model)]
# Create our voting classifier, inputting our models
ensemble = VotingClassifier(estimators, voting='hard')
# fit model to training data
ensemble.fit(X_train, y_train)
if not self._toForecast:
self.test_model('Ensemble Model', ensemble, X_test, y_test)
return ensemble
from datetime import datetime
import requests
from py_jftech import sendmail, format_date
# 预测发送邮箱
email = ['wenwen.tang@thizgroup.com']
jrp_domain = 'https://jrp.jfquant.com/api/v1.0'
# jrp_domain = 'http://localhost:7090/jrp'
def send(content):
receives = email
subject = '预测_{today}'.format(today=format_date(datetime.today()))
sendmail(receives=receives, copies=[], attach_paths=[], subject=subject, content=content)
def upload_predict(ticker, predictDate, predict):
predict_data = {
"aiPredict": {
"predictDate": format_date(predictDate),
"predict": 1 if predict == 'UP' else -1
},
"bloombergTicker": ticker
}
headers = {"X-AUTH-token": "rt7297LwQvyAYTke2iD8Vg"}
response = requests.post(url=f'{jrp_domain}/ai/predict', json=predict_data, headers=headers)
if response.status_code != 200:
print("上传ai预测结果失败,请重试")
from abc import ABC
import numpy as np
import pandas as pd
from finta import TA
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
def imp():
print(TA)
class TrainingDataBuilder(ABC):
def __init__(self, index, eco, fund, indexDict, toForecast, win1W, win1M, win1Q, numForecastDays,
theThreshold) -> None:
super().__init__()
self._index = index
self._eco = eco
self._fund = fund
self._indexDict = indexDict
self._toForecast = toForecast
self._win1W = win1W # 1 week
self._win1M = win1M # 1 Month
self._win1Q = win1Q # 1 Quarter
self._numForecastDays = numForecastDays # business days, 21 business days means one month
self._theThreshold = theThreshold
# List of symbols for technical indicators
# INDICATORS = ['RSI', 'MACD', 'STOCH','ADL', 'ATR', 'MOM', 'MFI', 'ROC', 'OBV', 'CCI', 'EMV', 'VORTEX']
# Note that '14 period MFI' and '14 period EMV' is not available for forecast
self.INDICATORS = ['RSI', 'MACD', 'STOCH', 'ADL', 'ATR', 'MOM', 'ROC', 'OBV', 'CCI', 'VORTEX']
self.FUND_INDICATORS = []
def get_indicator_data(self, data, pid):
"""
Function that uses the finta API to calculate technical indicators used as the features
"""
def indicator_calcu(data, indicators):
"""
指数和基金不同,基金只有收盘价,生成指标会变少
@param data:
@param indicators:
@return:
"""
for indicator in indicators:
ind_data = eval('TA.' + indicator + '(data)')
if not isinstance(ind_data, pd.DataFrame):
ind_data = ind_data.to_frame()
data = data.merge(ind_data, left_index=True, right_index=True)
return data
if pid in self._index:
data = indicator_calcu(data, self.INDICATORS)
# Instead of using the actual volume value (which changes over time), we normalize it with a moving volume average
data['normVol'] = data['volume'] / data['volume'].ewm(5).mean()
# get relative values
data['relativeOpen'] = data['open'] / data['close'].shift(1)
data['relativeHigh'] = data['high'] / data['close'].shift(1)
data['relativeLow'] = data['low'] / data['close'].shift(1)
# Remove columns that won't be used as features
# data['close'] are still needed and will be deleted later
data.drop(['open', 'high', 'low', 'volume'], axis=1, inplace=True)
elif pid in self._fund:
indicator_calcu(data, self.FUND_INDICATORS)
# Also calculate moving averages for features
data['ema50'] = data['close'] / data['close'].ewm(50).mean()
data['ema21'] = data['close'] / data['close'].ewm(21).mean()
data['ema15'] = data['close'] / data['close'].ewm(15).mean()
data['ema5'] = data['close'] / data['close'].ewm(5).mean()
data['relativeClose'] = data['close'] / data['close'].shift(1)
return data
def build_predict_data(self, indexData, pid):
"""
@param pid: 需要预测的指数或基金id
@return:
"""
if pid in self._index:
###### get individual data from raw data
predictData = indexData[indexData['rid_index_id'] == self._indexDict[pid]].copy()
del (predictData['rid_index_id'])
###### Additional preparing SPX Data
# finta expects properly formated ohlc DataFrame, with column names in lowercase:
# ["open", "high", "low", close"] and ["volume"] for indicators that expect ohlcv input.
predictData.rename(
columns={"rid_high": 'high', 'rid_open': 'open', "rid_low": 'low', "rid_close": 'close',
'rid_volume': 'volume',
"rid_pe": "SPX_pe", "rid_pb": "SPX_pb"},
inplace=True)
elif pid in self._fund:
predictData = indexData[indexData['rfn_fund_id'] == self._indexDict[pid]].copy()
del (predictData['rfn_fund_id'])
predictData.rename(columns={"rfn_nav_cal": 'close'}, inplace=True)
predictData.set_index('date', inplace=True)
predictData.index = pd.to_datetime(predictData.index)
predictData.sort_index(inplace=True)
predictData.reset_index(inplace=True)
# Calculate the indicator data
predictData = self.get_indicator_data(predictData, pid)
# Calculate Historical Return and Volatility
predictData['R1W'] = np.log(predictData['close'] / predictData['close'].shift(self._win1W))
predictData['R1M'] = np.log(predictData['close'] / predictData['close'].shift(self._win1M))
predictData['R1Q'] = np.log(predictData['close'] / predictData['close'].shift(self._win1Q))
price_list = predictData['close']
rollist = price_list.rolling(self._win1W)
predictData['Vol_1W'] = rollist.std(ddof=0)
rollist = price_list.rolling(self._win1M)
predictData['Vol_1M'] = rollist.std(ddof=0)
rollist = price_list.rolling(self._win1Q)
predictData['Vol_1Q'] = rollist.std(ddof=0)
# The following uses future info for the y label, to be deleted later
predictData['futureR'] = np.log(predictData['close'].shift(-self._numForecastDays) / predictData['close'])
# predictData = predictData[predictData['futureR'].notna()]
predictData['yLabel'] = (predictData['futureR'] >= self._theThreshold).astype(int)
spxDataCloseSave = predictData[['date', 'close']]
del (predictData['close'])
return predictData
def build_train_test(self, pid, indexData, vixData, indexOtherData, cpiData, FDTRData, NAPMPMIData):
###### Merge Data to one table
predictData = self.build_predict_data(indexData, pid)
forecastDay = None
if (self._toForecast):
forecastDay = predictData['date'].iloc[-1]
DataAll = pd.merge(predictData, vixData, how='outer', on='date')
DataAll = pd.merge(DataAll, indexOtherData, how='outer', on='date')
DataAll = pd.merge(DataAll, cpiData, how='outer', on='date')
DataAll = pd.merge(DataAll, FDTRData, how='outer', on='date')
DataAll = pd.merge(DataAll, NAPMPMIData, how='outer', on='date')
DataAll.set_index('date', inplace=True)
DataAll.sort_index(inplace=True)
DataAll.reset_index(inplace=True)
###### fill eco data
for col in ['CPI_YOY', 'CPURNSA', 'CPI_MOM', 'CPI_MOM_Diff']:
DataAll[col].bfill(inplace=True)
for col in ['FDTR']:
DataAll[col].ffill(inplace=True)
# 新增指数NAPMPMI :美國的ISM製造業指數 (Monthly)
for col in ['NAPMPMI']:
DataAll[col].bfill(inplace=True)
DataAll[col].ffill(inplace=True)
if (self._toForecast):
# 处理CPI_YOY:美国城镇消费物价指数同比未经季 CPURNSA:美国消费者物价指数未经季调
DataAllCopy = DataAll.copy()
for col in ['CPI_YOY', 'CPURNSA']:
DataAllCopy[col].ffill(inplace=True)
for col in ['CPI_MOM', 'CPI_MOM_Diff']:
DataAllCopy[col] = DataAllCopy[col].fillna(0)
DataAllCopy.drop(['futureR', 'yLabel'], axis=1, inplace=True)
forecastDayIndex = DataAllCopy.index[DataAllCopy['date'] == forecastDay]
forecastData = DataAllCopy.iloc[forecastDayIndex.to_list(), 1:]
X_forecast = forecastData.to_numpy()
del DataAllCopy
###### clean NaN
DataAll.dropna(inplace=True)
DataAll.reset_index(inplace=True, drop=True)
###### get X and y
y = DataAll['yLabel'].to_numpy(copy=True)
# delete future information
DataAll.drop(['futureR', 'yLabel'], axis=1, inplace=True)
X = DataAll.iloc[:, 1:].values
###################
# scale data
scaler = MinMaxScaler(feature_range=(0, 1))
# scaledX = scaler.fit_transform(X)
DataScaler = scaler.fit(X)
scaledX = DataScaler.transform(X)
scaledX_forecast = None
if (self._toForecast):
scaledX_forecast = DataScaler.transform(X_forecast)
X_train = scaledX
y_train = y
X_test = []
y_test = []
else:
# Step 2: Split data into train set and test set
X_train, X_test, y_train, y_test = train_test_split(scaledX, y, test_size=0.02, shuffle=False)
# To avoid data leak, test set should start from numForecastDays later
X_test = X_test[self._numForecastDays:]
y_test = y_test[self._numForecastDays:]
return X_train, X_test, y_train, y_test, scaledX_forecast, forecastDay
......@@ -227,10 +227,10 @@ class AssetOptimize(ABC):
'''
@abstractmethod
def find_optimize(self, ids, day):
def find_optimize(self, fund_ids, day):
'''
从多id中,选出指定日期最优的id
:param ids: 待选id列表
:param fund_ids: 待选id列表
:param day: 指定日期
:return: 最优的id
'''
......@@ -305,6 +305,22 @@ class PortfoliosBuilder(ABC):
pass
class PortfoliosChecker(ABC):
'''
投组组合检测器
'''
@abstractmethod
def check(self, day=None, portfolios=None):
"""
检测避免出现最优投组同时出现全部是ft或美盛基金的情况,增加一步替换动作。
@param day:
@param portfolios:
@return:
"""
pass
class Solver(ABC):
'''
解算器
......
......@@ -110,7 +110,7 @@ class FundDividendSortinoAssetOptimize(SortinoAssetOptimize):
sortino['score'] = sortino.apply(lambda r: sum([x['weight'] * r[x['name']] for x in self._config]), axis=1)
sortino.sort_values('score', ascending=False, inplace=True)
# 取得分数高的前optimize_count个
return pct_change.columns[sortino.index[0:self.optimize_count]].values
return pct_change.columns[sortino.index[0:self.optimize_count]].values,sortino['score']
def get_optimize_pool(self, day):
opt_pool = rop.get_one(day=day, type=AssetPoolType.OPTIMIZE)
......@@ -125,7 +125,7 @@ class FundDividendSortinoAssetOptimize(SortinoAssetOptimize):
for fund_group in self.get_groups():
fund_group = [x for x in fund_group if min_dates[x] <= max_incept_date]
if len(fund_group) > self.optimize_count:
pool.extend(self.find_optimize(tuple(fund_group), day))
pool.extend(self.find_optimize(tuple(fund_group), day)[0])
elif len(fund_group) <= self.optimize_count:
pool.extend(fund_group)
rop.insert(day, AssetPoolType.OPTIMIZE, sorted(pool))
......
......@@ -58,21 +58,24 @@ CREATE TABLE IF NOT EXISTS `robo_exrate`
CREATE TABLE IF NOT EXISTS `robo_index_datas`
(
`rid_index_id` BIGINT UNSIGNED NOT NULL COMMENT '指标id',
`rid_date` DATETIME NOT NULL COMMENT '指标数据日期',
`rid_high` DOUBLE DEFAULT NULL COMMENT '最高价',
`rid_open` DOUBLE DEFAULT NULL COMMENT '最高价',
`rid_low` DOUBLE DEFAULT NULL COMMENT '最高价',
`rid_close` DOUBLE NOT NULL COMMENT '收盘价',
`rid_pe` DOUBLE DEFAULT NULL COMMENT '市盈率',
`rid_pb` DOUBLE DEFAULT NULL COMMENT '市净率',
`rid_volume` DOUBLE DEFAULT NULL COMMENT '成交量',
`rid_create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`rid_update_time` DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`rid_index_id`, `rid_date`),
INDEX (`rid_date`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT '指标数据表';
`rid_index_id` bigint(20) UNSIGNED NOT NULL COMMENT '指标id',
`rid_date` datetime NOT NULL COMMENT '指标数据日期',
`rid_high` double NULL DEFAULT NULL COMMENT '最高价',
`rid_open` double NULL DEFAULT NULL COMMENT '最高价',
`rid_low` double NULL DEFAULT NULL COMMENT '最高价',
`rid_close` double NOT NULL COMMENT '收盘价',
`rid_pe` double NULL DEFAULT NULL COMMENT '市盈率',
`rid_pb` double NULL DEFAULT NULL COMMENT '市净率',
`rid_volume` double NULL DEFAULT NULL COMMENT '成交量',
`rid_frdpe` double NULL DEFAULT NULL COMMENT '预期P/E',
`rid_frdpes` double NULL DEFAULT NULL COMMENT '预期EPS',
`rid_erp` double NULL DEFAULT NULL COMMENT '股票风险溢价',
`rid_pc` double NULL DEFAULT NULL COMMENT '涨跌期权比率',
`rid_create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`rid_update_time` datetime NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`rid_index_id`, `rid_date`) USING BTREE,
INDEX `rid_date`(`rid_date`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '指标数据表' ROW_FORMAT = Dynamic;
CREATE TABLE IF NOT EXISTS `robo_eco_datas`
......
......@@ -10,6 +10,11 @@ __COLUMNS__ = {
'rid_pe': 'pe',
'rid_pb': 'pb',
'rid_volume': 'volume',
'rid_frdpe': 'frdpe',
'rid_frdpes': 'frdpes',
'rid_erp': 'erp',
'rid_pc': 'pc',
}
......
......@@ -54,7 +54,7 @@ class JDCDataSync(DataSync, ABC):
while True:
url = self.build_urls(datum=datum, page=page, start_date=start_date)
if url is None:
raise Exception(f'''request data {datum['id']} not exist!''')
break
response = requests.get(url).json()
if not response['success']:
raise Exception(f'''request indictor failed: {response['status']}''')
......@@ -180,6 +180,7 @@ class IndexSync(JDCDataSync):
return f'http://jdcprod.thiztech.com/api/datas/index-value?page={page}&size=200&sourceCode={quote(datum["bloombergTicker"])}&sourceType=BLOOMBERG&startDate={format_date(start_date)}'
def store_date(self, datumid, datas: List[dict]):
# add frdpe,frdpes,erp,pc
save_datas = [{
'index_id': datumid,
'date': dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d'),
......@@ -190,6 +191,10 @@ class IndexSync(JDCDataSync):
'pe': x['peRatio'] if 'peRatio' in x else None,
'pb': x['pbRatio'] if 'pbRatio' in x else None,
'volume': x['volume'] if 'volume' in x else None,
'frdpe': x['forwardPe'] if 'forwardPe' in x else None,
'frdpes': x['forwardEps'] if 'forwardEps' in x else None,
'erp': x['erp'] if 'erp' in x else None,
'pc': x['pcRatio'] if 'pcRatio' in x else None,
} for x in datas if is_workday(dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai'))) and 'close' in x]
if save_datas:
rid.batch_insert(save_datas)
......@@ -229,7 +234,7 @@ class FundNavSync(JDCDataSync):
def __init__(self):
super(FundNavSync, self).__init__()
self._subject_keys = self.find_jdc_subject_key()
self._jdc_querys = self.find_jdc_querys()
@property
def datum_type(self) -> DatumType:
......@@ -240,20 +245,31 @@ class FundNavSync(JDCDataSync):
return next_workday(last['nav_date']) if last else self.start_date
def build_urls(self, datum, start_date, page=0) -> str:
if datum['id'] not in self._subject_keys:
if datum['id'] not in self._jdc_querys:
return None
key = self._subject_keys[datum['id']]
return f'http://jdcprod.thiztech.com/api/datas/asset-value?subjectKeys={key}&page={page}&size=200&sourceType=TW&startDate={format_date(start_date)}'
def find_jdc_subject_key(self):
funds = {x['isin']: x for x in self._datum.get_datums(type=DatumType.FUND)}
querys = self._jdc_querys[datum['id']]
query_str = '&'.join([f'{x[0]}={quote(str(x[1]).encode())}' for x in querys.items()])
return f'http://jdcprod.thiztech.com/api/datas/asset-value?page={page}&size=200&startDate={format_date(start_date)}&{query_str}'
def find_jdc_querys(self):
funds = self._datum.get_datums(type=DatumType.FUND, exclude=False)
urls = {x['id']: {
'sourceCode': x['bloombergTicker'],
'sourceType': 'BLOOMBERG'
} for x in funds if 'ftTicker' not in x and 'bloombergTicker' in x}
ft_tickers = {x['ftTicker']: x for x in funds if 'ftTicker' in x}
response = requests.get('http://jdcprod.thiztech.com/api/subject?busiField=TW&sourceType=TW&subjectType=FUND')
response = response.json()
if not response['success']:
raise CollectError(f'''find fund subject failed: {response['status']}''')
content = response['body']['content']
content = [x for x in content if x.get('isin')]
return {funds[x['isin']]['id']: x['key'] for x in content if x['isin'] in funds}
return {**urls, **{
ft_tickers[x['fundId']]['id']: {
'subjectKeys': x['key'],
'sourceType': 'TW'
} for x in response['body']['content'] if x['fundId'] in ft_tickers
}}
def store_date(self, datumid, datas: List[dict]):
save_navs = [{
......
......@@ -107,6 +107,9 @@ portfolios: # 投组模块
# high-weight: [ 1 ] # 最高权重比例,可给一个值,也可以给多个值,当多个值时,第一个表示只有一个资产时权重,第二个表示只有两个资产时权重,以此类推,最后一个表示其他资产个数时的权重
poem: # poem相关
cvar-scale-factor: 0.1 # 计算时用到的系数
checker: #投组检测模块
switch: off #是否开启检查
custom-type-priority: [ 3,2,1,4 ] # 检测优先级
reports: # 报告模块相关
navs:
type: FUND
......
......@@ -86,6 +86,7 @@ portfolios: # 投组模块
dividend-date: 15 #配息日,每月15号
dividend-adjust-day: [1,4,7,10] #每年的首个季度调整配息
warehouse-frequency: 1 #每隔1个月调一次仓
warehouse-transfer-date: 1 #调仓日
solver: # 解算器相关
tol: 1E-10 # 误差满足条件
navs: # 净值要求
......@@ -111,6 +112,9 @@ portfolios: # 投组模块
# high-weight: [ 1 ] # 最高权重比例,可给一个值,也可以给多个值,当多个值时,第一个表示只有一个资产时权重,第二个表示只有两个资产时权重,以此类推,最后一个表示其他资产个数时的权重
poem: # poem相关
cvar-scale-factor: 0.1 # 计算时用到的系数
checker: #投组检测模块
switch: off #是否开启检查
custom-type-priority: [ 3,2,1,4 ] # 检测优先级
reports: # 报告模块相关
navs:
type: FUND
......
......@@ -51,7 +51,7 @@ py-jftech:
max-workers: ${MAX_PROCESS:4}
basic: # 基础信息模块
sync:
start-date: 1990-01-01 # 同步数据开始日期
start-date: 2018-08-26 # 同步数据开始日期
datum: # 资料模块
change:
date: ${DATUM_CHANGE_DATE}
......@@ -85,11 +85,12 @@ portfolios: # 投组模块
dividend-date: 15 #配息日,每月15号
dividend-adjust-day: [1,4,7,10] #每年的首个季度调整配息
warehouse-frequency: 1 #每隔1个月调一次仓
warehouse-transfer-date: 1 #调仓日
redeem-list: [ 'TEUSAAU LX Equity', 'LIGTRAA ID Equity', 'TEMFHAC LX Equity', 'LUSHUAA ID Equity' ] #从持仓中的低风险资产“直接”按序赎回
solver: # 解算器相关
model: arc # 结算模型 ARC ,PRR, ~ 标准解算器
model: prr # 结算模型 ARC ,PRR, ~ 标准解算器
arc: on #是否开启ARC
brr: 0.01 #误差补偿值
brr: 0.02 #误差补偿值
trr: 3
tol: 1E-10 # 误差满足条件
navs: # 净值要求
......@@ -98,9 +99,9 @@ portfolios: # 投组模块
max-nan: # 最大缺失净值条件
asset: 8 # 单一资产最多缺少多少交易日数据,则踢出资产池
day: 0.5 # 单一交易日最多缺少百分之多少净值,则删除该交易日
risk: [] # 资产风险等级要求,可分开写也可以合并写,e.g. risk:[ 2, 3 ] 则表示 所有投组资产风险等级都是 2 或 3
LARC: [0.5, 0.1, 0.1, 0.1] #低阈值
UARC: [0.7, 0.25, 0.25, 0.25] #高阈值
risk: [ ] # 资产风险等级要求,可分开写也可以合并写,e.g. risk:[ 2, 3 ] 则表示 所有投组资产风险等级都是 2 或 3
LARC: [ 0.30, 0.00, 0.00 ] #低阈值
UARC: [ 0.70, 0.70, 0.70 ] #高阈值
matrix-rtn-days: 20 # 计算回报率矩阵时,回报率滚动天数
asset-count: [5,5] # 投组资产个数。e.g. count 或 [min, max] 分别表示 最大最小都为count 或 最小为min 最大为max,另外这里也可以类似上面给不同风险等级分别配置
mpt: # mpt计算相关
......@@ -110,6 +111,9 @@ portfolios: # 投组模块
high-weight: [ 0.35 ] # 最高权重比例,可给一个值,也可以给多个值,当多个值时,第一个表示只有一个资产时权重,第二个表示只有两个资产时权重,以此类推,最后一个表示其他资产个数时的权重
poem: # poem相关
cvar-scale-factor: 0.1 # 计算时用到的系数
checker: #投组检测模块
switch: on #是否开启检查
custom-type-priority: [ 3,2,1,4 ] # 检测优先级
reports: # 报告模块相关
navs:
type: FUND
......@@ -234,12 +238,12 @@ robo-executor: # 执行器相关
use: ${ROBO_EXECUTOR:backtest} # 执行哪个执行器,优先取系统环境变量ROBO_EXECUTOR的值,默认backtest
sync-data: ${SYNC_DATA:off} # 是否开启同步资料数据
backtest: # 回测执行器相关
start-date: 2022-02-16 # 回测起始日期
end-date: 2023-01-03 # 回测截止日期
start-date: 2018-11-26 # 回测起始日期
end-date: 2019-01-13 # 回测截止日期
sealing-period: 10 #调仓封闭期
start-step: ${BACKTEST_START_STEP:3} # 回测从哪一步开始执行 1:计算资产池;2:计算最优投组:3:计算再平衡信号以及持仓投组
start-step: ${BACKTEST_START_STEP:1} # 回测从哪一步开始执行 1:计算资产池;2:计算最优投组:3:计算再平衡信号以及持仓投组
end-step: ${BACKTEST_END_STEP:3} # 回测从哪一步执行完成后结束执行 1:计算资产池;2:计算最优投组:3:计算再平衡信号以及持仓投组
clean-up: on
clean-up: off
real: # 实盘执行器
export: ${EXPORT_ENABLE:off} # 是否开启报告
start-date: 2023-05-08 # 实盘开始时间
......
This diff is collapsed.
......@@ -4,7 +4,8 @@ import logging
from py_jftech import component, autowired, format_date
from pymysql import IntegrityError, constants
from api import PortfoliosBuilder, PortfoliosRisk, AssetPool, Navs, PortfoliosType, Datum, SolveType, SolverFactory
from api import PortfoliosBuilder, PortfoliosRisk, AssetPool, Navs, PortfoliosType, Datum, SolveType, SolverFactory, \
PortfoliosChecker
from portfolios.dao import robo_mpt_portfolios as rmp
logger = logging.getLogger(__name__)
......@@ -14,11 +15,13 @@ logger = logging.getLogger(__name__)
class MptPortfoliosBuilder(PortfoliosBuilder):
@autowired
def __init__(self, assets: AssetPool = None, navs: Navs = None, datum: Datum = None, factory: SolverFactory = None):
def __init__(self, assets: AssetPool = None, navs: Navs = None, datum: Datum = None, factory: SolverFactory = None,
checker: PortfoliosChecker = None):
self._assets = assets
self._navs = navs
self._datum = datum
self._factory = factory
self._checker = checker
def get_portfolios(self, day, risk: PortfoliosRisk, type: PortfoliosType = PortfoliosType.NORMAL):
try:
......@@ -26,6 +29,7 @@ class MptPortfoliosBuilder(PortfoliosBuilder):
if not portfolio:
result = self.build_portfolio(day, type)
for build_risk, datas in result.items():
datas['portfolio'] = self._checker.check(day, json.loads(datas['portfolio']))
try:
rmp.insert({
**datas,
......@@ -44,7 +48,7 @@ class MptPortfoliosBuilder(PortfoliosBuilder):
return None
except Exception as e:
logger.exception(
f"build protfolio of type[{type.name}] and risk[{risk.name}] with date[{format_date(day)}] failure.", e)
f"build portfolio of type[{type.name}] and risk[{risk.name}] with date[{format_date(day)}] failure.", e)
raise e
def build_portfolio(self, day, type: PortfoliosType):
......@@ -119,6 +123,7 @@ class MptARCPortfoliosBuilder(MptPortfoliosBuilder):
if not portfolio:
result, detail = self.build_portfolio(day, type)
for build_risk, datas in result.items():
datas['portfolio'] = self._checker.check(day, json.loads(datas['portfolio']))
try:
rmp.insert({
**datas,
......
import logging
from py_jftech import autowired, component, get_config
from api import AssetOptimize, PortfoliosChecker, Datum, Navs, DatumType
logger = logging.getLogger(__name__)
@component(bean_name='checker')
class DefaultPortfoliosChecker(PortfoliosChecker):
@autowired
def __init__(self, asset: AssetOptimize = None, navs: Navs = None, datum: Datum = None):
self._asset = asset
self._navs = navs
self._datum = datum
self._config = get_config(__name__)
def check(self, day=None, portfolios: dict = None):
if not self._config.get('switch'):
return portfolios
funds = self._datum.get_datums(type=DatumType.FUND)
company = {f"{fund['id']}": fund['companyType'] for fund in funds}
customType = {f"{fund['id']}": fund['customType'] for fund in funds}
companies = set(company[key] for key in portfolios.keys())
# 同时出现全部是ft或美盛基金的情况
if len(companies) == 1:
# step1: 检查原始投组的customType。检查顺序用列表呈现,依序进行
priority = self._config.get('custom-type-priority')
for p in priority:
keys = [key for key in portfolios.keys() if customType[key] == p]
# 若存在匹配值则执行后跳出循环
if len(keys) > 0:
ids = [fund['id'] for fund in funds if fund['companyType'] != list(companies)[0]]
best = self.find_highest_score(ids, day)
# 若刚好有一个匹配,直接替换
if len(keys) == 1:
portfolios[best] = portfolios[keys[0]]
# 删除原始键
del portfolios[keys[0]]
else:
# 算分,把分低的替换掉
scores = self.do_score(keys, day)
weight_scores = {key: scores[key]*portfolios[key] for key in keys}
lowest = min(scores, key=lambda k: weight_scores[k])
portfolios[best] = portfolios[lowest]
# 删除原始键
del portfolios[lowest]
break
return portfolios
def do_score(self, ids, day):
optimize = self._asset.find_optimize(fund_ids=ids, day=day)
scores = optimize[1].to_dict()
id_score = {}
for k, v in scores.items():
id_score[f'{ids[k]}'] = v
return id_score
def find_highest_score(self, ids, day):
optimize = self._asset.find_optimize(fund_ids=ids, day=day)
return optimize[0][0]
......@@ -20,23 +20,25 @@ CREATE TABLE IF NOT EXISTS robo_mpt_portfolios
CREATE TABLE IF NOT EXISTS robo_hold_portfolios
(
`rhp_id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT,
`rhp_id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT,
`rhp_date` datetime NOT NULL COMMENT '日期',
`rhp_risk` tinyint(4) NOT NULL COMMENT '风险等级',
`rhp_rrs_id` bigint(20) UNSIGNED NULL DEFAULT NULL COMMENT '调仓信号id',
`rhp_rebalance` tinyint(4) NOT NULL DEFAULT 0 COMMENT '是否调仓',
`rhp_portfolios` json NOT NULL COMMENT '投组信息',
`rhp_nav` double(12, 4) NOT NULL COMMENT '基金投组净值',
`rhp_asset_nav` double(12, 4) NOT NULL COMMENT '产品净值',
`rhp_div` double(12, 4) NOT NULL COMMENT '配息滚动金额',
`rhp_div_acc` double(12, 4) NOT NULL COMMENT '累计配息金额',
`rhp_fund_div` double(12, 4) NOT NULL COMMENT '基金配息金额',
`rhp_fund_av` double(12, 4) NOT NULL COMMENT '投组原始净值,sum(个股原始净值*对应份额)',
`rhp_fund_nav` double(12, 4) NOT NULL DEFAULT 0.0000 COMMENT '基金被动配息做配股',
`rhp_nav` double(12, 4) NOT NULL COMMENT '复权净值',
`rhp_asset_nav` double(12, 4) NOT NULL COMMENT '产品净值,投顾模式:fund_av',
`rhp_div_forecast` double(12, 4) NOT NULL DEFAULT 0.0000 COMMENT '预配息金额',
`rhp_div_acc` double(12, 4) NOT NULL COMMENT '累计配息金额,投顾:acc(port_div + fund_div)',
`rhp_port_div` double(12, 4) NOT NULL COMMENT '主动配息',
`rhp_cash` double(12, 4) NOT NULL DEFAULT 0.0000 COMMENT '现金(产品的现金账户)',
`rhp_fund_div` double(12, 4) NOT NULL COMMENT '持有基金配息sum(个股每股配息*对应份额)',
`rhp_create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`rhp_update_time` datetime NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
`v_nav_div_acc` double(12, 4) GENERATED ALWAYS AS (((`rhp_div_acc` + `rhp_nav`) + `rhp_fund_div`)) VIRTUAL COMMENT '配息金额+净值+当日基金配息' NOT NULL,
PRIMARY KEY (rhp_id),
UNIQUE INDEX (rhp_date, rhp_risk),
INDEX (rhp_risk)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT '持仓投组表';
\ No newline at end of file
`v_nav_div_acc` double(12, 4) GENERATED ALWAYS AS ((`rhp_asset_nav` + `rhp_div_acc`)) VIRTUAL COMMENT '产品累计净值 asset_nav+ acc_div' NOT NULL,
PRIMARY KEY (`rhp_id`) USING BTREE,
UNIQUE INDEX `rhp_date`(`rhp_date`, `rhp_risk`) USING BTREE,
INDEX `rhp_risk`(`rhp_risk`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '持仓投组表' ROW_FORMAT = Dynamic;
\ No newline at end of file
import datetime
import json
import logging
from datetime import datetime as dt, date
......@@ -199,12 +200,16 @@ class DividendPortfoliosHolder(PortfoliosHolder):
})
def get_navs_and_div(self, day, fund_ids):
navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=fund_ids, max_date=day))
navs = pd.DataFrame(
self._navs.get_fund_navs(fund_ids=fund_ids, max_date=day, min_date=day - datetime.timedelta(22)))
dividend = navs.pivot_table(index='nav_date', columns='fund_id', values='dividend')
nav_cal = navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
navs = navs.pivot_table(index='nav_date', columns='fund_id', values='av')
navs.fillna(method='ffill', inplace=True)
dividend.fillna(method='ffill', inplace=True)
return dict(navs.iloc[-1]), dict(dividend.iloc[-1])
nav_cal.fillna(method='ffill', inplace=True)
dividend.fillna(value=0, inplace=True)
dividend = dividend.reindex(pd.date_range(start=dividend.index.min(), end=day, freq='D'), fill_value=0)
return dict(navs.iloc[-1]), dict(dividend.iloc[-1]), dict(nav_cal.iloc[-1])
def clear(self, day=None, risk: PortfoliosRisk = None):
rhp.delete(min_date=day, risk=risk)
......@@ -240,10 +245,13 @@ class InvTrustPortfoliosHolder(DividendPortfoliosHolder):
if last_nav:
# 若非首次配息
share = {int(x): y for x, y in json.loads(last_nav['portfolios'])['share'].items()}
# 参与配息的基金份额
share_nav = {int(x): y for x, y in json.loads(last_nav['portfolios'])['share_nav'].items()}
share_nodiv_nav = {int(x): y for x, y in json.loads(last_nav['portfolios'])['share_nodiv_nav'].items()}
fund_div_tuple = self.get_navs_and_div(fund_ids=tuple(set(weight) | set(share)), day=day)
navs = fund_div_tuple[0]
fund_dividend = fund_div_tuple[1]
nav_cals = fund_div_tuple[2]
fund_dividend_nav = sum(
map(lambda k: share_nav[k] * fund_dividend[k], filter(lambda k: k in fund_dividend, share_nav.keys())))
fund_dividend = sum(
......@@ -251,16 +259,19 @@ class InvTrustPortfoliosHolder(DividendPortfoliosHolder):
dividend_acc = last_nav['div_acc'] + fund_dividend
fund_av = round(sum([navs[x] * y for x, y in share.items()]), 4)
fund_nav = round(sum([navs[x] * y for x, y in share_nav.items()]), 4)
nav = round(sum([nav_cals[x] * y for x, y in share_nodiv_nav.items()]), 4)
fund_nav += fund_dividend_nav
asset_nav = fund_av
share = {x: fund_av * w / navs[x] for x, w in weight.items()}
# 若调仓当日,有基金产生配息
share_nav = {x: fund_nav * w / navs[x] for x, w in weight.items()}
if self.is_first_workday(day):
share_nodiv_nav = {x: nav * w / nav_cals[x] for x, w in weight.items()}
if self.is_transfer_workday(day):
div_forecast = asset_nav * self.month_dividend
else:
fund_av = self.init_nav
asset_nav = self.init_nav
nav = self.init_nav
fund_div_tuple = self.get_navs_and_div(fund_ids=tuple(weight), day=day)
navs = fund_div_tuple[0]
# 首次配息金额,做记录
......@@ -268,7 +279,10 @@ class InvTrustPortfoliosHolder(DividendPortfoliosHolder):
funds = self._datum.get_datums(type=DatumType.FUND)
funds_subscription_rate = {fund['id']: fund.get('subscriptionRate', 0) for fund in funds}
share = {x: (1 - funds_subscription_rate[x]) * (fund_av * w) / navs[x] for x, w in weight.items()}
nav_cals = fund_div_tuple[2]
share_nav = share
# 不考虑配息
share_nodiv_nav = {x: (1 - funds_subscription_rate[x]) * (fund_av * w) / nav_cals[x] for x, w in weight.items()}
# 初始买入扣手续费
fee = sum(funds_subscription_rate[x] * (fund_av * w) for x, w in weight.items())
fund_av = fund_av - fee
......@@ -285,20 +299,23 @@ class InvTrustPortfoliosHolder(DividendPortfoliosHolder):
'portfolios': {
'weight': weight,
'weight_nav': weight,
'weight_nodiv_nav': weight,
'share': share,
'share_nav': share_nav,
'share_nodiv_nav': share_nodiv_nav
},
'fund_av': fund_av,
'fund_nav': fund_nav,
'nav': 0,
'nav': nav,
'port_div': 0,
'asset_nav': asset_nav,
})
def is_first_workday(self, day):
# 获取当月第一天的日期
first_day = date(day.year, day.month, 1)
first_work_day = first_day if is_workday(first_day) else next_workday(first_day)
def is_transfer_workday(self, day):
transfer_date = self._config['warehouse-transfer-date']
# 获取当月第n天的日期
transfer_date = date(day.year, day.month, transfer_date)
first_work_day = transfer_date if is_workday(transfer_date) else next_workday(transfer_date)
return day.day == first_work_day.day
def no_rebalance(self, day, risk: PortfoliosRisk, last_nav):
......@@ -306,9 +323,11 @@ class InvTrustPortfoliosHolder(DividendPortfoliosHolder):
dividend_acc = last_nav['div_acc']
share = {int(x): y for x, y in json.loads(last_nav['portfolios'])['share'].items()}
share_nav = {int(x): y for x, y in json.loads(last_nav['portfolios'])['share_nav'].items()}
share_nodiv_nav = {int(x): y for x, y in json.loads(last_nav['portfolios'])['share_nodiv_nav'].items()}
fund_div_tuple = self.get_navs_and_div(fund_ids=tuple(share), day=day)
navs = fund_div_tuple[0]
fund_dividend = fund_div_tuple[1]
nav_cals = fund_div_tuple[2]
# 配息当天配股
for k in share_nav.keys():
if k in fund_dividend:
......@@ -324,14 +343,17 @@ class InvTrustPortfoliosHolder(DividendPortfoliosHolder):
map(lambda k: share[k] * fund_dividend[k], filter(lambda k: k in fund_dividend, share.keys())))
dividend_acc = dividend_acc + port_div + fund_dividend
fund_av = round(sum([navs[x] * y for x, y in share.items()]), 4)
nav = round(sum([nav_cals[x] * y for x, y in share_nodiv_nav.items()]), 4)
fund_nav = round(sum([navs[x] * y for x, y in share_nav.items()]), 4)
weight = {x: round(y * navs[x] / fund_av, 2) for x, y in share.items()}
weight_nodiv_nav = {x: round(y * nav_cals[x] / nav, 2) for x, y in share_nav.items()}
weight_nav = {x: round(y * navs[x] / fund_av, 2) for x, y in share_nav.items()}
weight = format_weight(weight)
weight_nav = format_weight(weight_nav)
weight_nodiv_nav = format_weight(weight_nodiv_nav)
asset_nav = fund_av
div_forecast = last_nav['div_forecast']
if self.is_first_workday(day):
if self.is_transfer_workday(day):
div_forecast = asset_nav * self.month_dividend
rhp.insert({
'date': day,
......@@ -344,12 +366,14 @@ class InvTrustPortfoliosHolder(DividendPortfoliosHolder):
'portfolios': {
'weight': weight,
'weight_nav': weight_nav,
'weight_nodiv_nav': weight_nodiv_nav,
'share': share,
'share_nav': share_nav
'share_nav': share_nav,
'share_nodiv_nav': share_nodiv_nav
},
'fund_av': fund_av,
'fund_nav': fund_nav,
'nav': 0,
'nav': nav,
'port_div': port_div,
'asset_nav': asset_nav,
})
......@@ -379,7 +403,6 @@ class DivHoldReportor(RoboReportor):
if not holds.empty:
holds['signal_type'] = 'INIT'
holds['real_av'] = holds['asset_nav']
holds['nav'] = holds['acc_av']
holds = holds[
['date', 'signal_type', 'fund_av', 'fund_nav', 'fund_div', 'cash', 'real_av', 'port_div', 'div_acc',
'acc_av', 'nav']]
......
......@@ -6,7 +6,7 @@ from functools import reduce
from typing import List
import pandas as pd
from py_jftech import component, autowired, get_config, prev_workday
from py_jftech import component, autowired, get_config, prev_workday, workday_range
from py_jftech import is_workday
from api import PortfoliosBuilder
......@@ -48,7 +48,13 @@ class BaseRebalanceSignal(RebalanceSignal, ABC):
signal = rrs.get_last_one(day, risk, SignalType.NORMAL, effective=None)
if signal:
frequency = get_config('portfolios')['holder']['warehouse-frequency']
date = pd.to_datetime(day.replace(day=1)) + pd.DateOffset(months=frequency)
transfer_date = get_config('portfolios')['holder']['warehouse-transfer-date']
date = pd.to_datetime(signal['date'].replace(day=transfer_date))
# 说明发生了跨月份问题
if signal['date'].day > transfer_date:
if rrs.get_count(risk=PortfoliosRisk.FT3, effective=True) > 1:
date = date + pd.DateOffset(months=1)
date = date + pd.DateOffset(months=frequency)
date = date - timedelta(days=1)
# 指定周期末的工作日
date = date if is_workday(date) else prev_workday(date)
......
......@@ -44,7 +44,7 @@ class BenchmarkAlligamReportor(RoboReportor):
if start_date:
params['startDate'] = format_date(start_date)
while True:
response = requests.get(f'https://jdcprod.thiztech.com/api/datas/asset-value?{urlencode(params)}').json()
response = requests.get(f'http://jdcprod.thiztech.com/api/datas/asset-value?{urlencode(params)}').json()
if not response['success']:
raise Exception(f'''request jdc alligam failed: {response['status']}''')
rb.batch_insert([{
......
......@@ -37,12 +37,13 @@ class BacktestExecutor(RoboExecutor):
@staticmethod
def get_last_business_day(start_date, end_date):
start_date = prev_workday(start_date)
transfer_date = get_config('portfolios')['holder']['warehouse-transfer-date']
# 生成日期范围并转换为DataFrame
dates = pd.date_range(start_date, end_date, freq='M')
if dates[0] != start_date:
dates = dates.insert(0, start_date)
dates = pd.date_range(start_date, end_date, freq='MS', closed='right')
dates = [pd.to_datetime(f"{date.year}-{date.month}-{transfer_date}") for date in dates]
dates.insert(0, start_date)
df = pd.DataFrame({'dates': dates})
df['dates'] = df['dates'].apply(lambda x: prev_workday(x))
result = []
for i in range(0, len(df), get_config('portfolios')['holder']['warehouse-frequency']):
result.append(df.iloc[i]['dates'])
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment