Commit 55052eb1 authored by stephen.wang's avatar stephen.wang

Merge remote-tracking branch 'origin/dev-dividend' into dev-dividend

parents bef905ae c418191a
import json
from typing import List
import numpy as np
from py_jftech import autowired, parse_date, prev_workday, format_date
from ai.config import LABEL_RANGE, LABEL_TAG
from ai.dao import robo_predict
from ai.dao.robo_datas import get_base_info, get_index_list, get_fund_list
from ai.data_access import DataAccess
from ai.model_trainer import ModelTrainer
......@@ -11,24 +14,23 @@ from ai.training_data_builder import TrainingDataBuilder
from api import DataSync
# 截止日期
# max_date = None
max_date = '2024-03-01'
max_date = None
# max_date = '2024-03-20'
# max_date = '2024-01-11'
toForecast = True # False means test, True means forecast
syncData = True # 开启会同步数据库指数及基金数据
uploadData = False # 开启会上传预测结果
uploadData = True # 开启会上传预测结果
doReport = True # 开启会生成Excel报告
# 待预测指数
# PREDICT_LIST = [67, 121, 122, 123]
PREDICT_LIST = [67, 121, 122, 123, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171,
174, 175, 177, 178]
PREDICT_LIST = [67, 121, 122, 123, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163,
164, 165, 166, 167, 168, 169, 170, 171, 174, 175, 177, 178]
eco = [65, 66, 74, 134]
index = [67, 68, 69, 70, 71, 72, 73, 75, 76, 77, 105, 106, 116, 117, 138, 139, 142, 143, 140, 141, 144, 145, 146]
fund = [121, 122, 123, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 174, 175,
177,
178]
fund = [121, 122, 123, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165,
166, 167, 168, 169, 170, 171, 174, 175, 177, 178]
@autowired
......@@ -38,22 +40,21 @@ def sync(syncs: List[DataSync] = None):
s.do_sync()
def predictionFromMoel(the_model, scaledX_forecast, predict_item, indexDict: dict):
prediction = the_model.predict(scaledX_forecast)
predictionStr = 'DOWN'
if (prediction > 0.5):
predictionStr = 'UP'
def report_prediction(label, predict_item, indexDict: dict):
prediction = label
predictionStr = LABEL_TAG.get(prediction)
content = f"""\n On day {forecastDay.strftime("%m/%d/%Y")}, the model predicts {predict_item} to be {predictionStr} in {str(numForecastDays)} business days. \n"""
print(content)
# 上传预测结果
key = [k for k, v in indexDict.items() if v == predict_item]
key = [k for k, v in indexDict.items() if v == predict_item][0]
index_info = get_base_info(key)[0]
if uploadData:
from ai.noticer import upload_predict
upload_predict(index_info['ticker'], forecastDay, predictionStr)
if doReport:
from ai.reporter import do_reporter
do_reporter()
if len(LABEL_RANGE) > 2:
data = {"rbd_id": key, "date": forecastDay, "predict": prediction}
robo_predict.insert(data)
else:
from ai.noticer import upload_predict
upload_predict(index_info['ticker'], forecastDay, predictionStr)
send(content)
return prediction
......@@ -130,12 +131,30 @@ if __name__ == '__main__':
rf_model = trainer.train_random_forest(X_train, y_train, X_test, y_test)
gbt_model = trainer.train_GBT(X_train, y_train, X_test, y_test)
svc_model = trainer.train_SVC(X_train, y_train, X_test, y_test)
ensemble_model = trainer.ensemble_model(rf_model, gbt_model, svc_model, X_train, y_train, X_test, y_test)
knn_model = trainer.train_nearest_neighbors(X_train, y_train, X_test, y_test)
ada_model = trainer.train_AdaBoost(X_train, y_train, X_test, y_test)
ensemble_model = trainer.ensemble_model(rf_model, gbt_model, svc_model,
knn_model, ada_model, X_train, y_train, X_test, y_test)
model_predict = {'forest': rf_model.predict(scaledX_forecast),
'gbt': gbt_model.predict(scaledX_forecast),
'svc': svc_model.predict(scaledX_forecast),
'knn': knn_model.predict(scaledX_forecast),
'adaboost': ada_model.predict(scaledX_forecast),
'ensemble': ensemble_model.predict(scaledX_forecast)}
print(f'预测结果:{model_predict}'.center(60, '+'))
judgement(pid, infos_type[pid], model_predict)
if toForecast:
predictionFromMoel(ensemble_model, scaledX_forecast, indexDict[pid], indexDict)
if len(LABEL_RANGE) > 2:
average = round(np.mean(list(model_predict.values())))
report_prediction(average, indexDict[pid], indexDict)
else:
report_prediction(ensemble_model.predict(scaledX_forecast), indexDict[pid], indexDict)
if doReport:
if len(LABEL_RANGE) > 2:
from ai.reporter import do_reporter2
do_reporter2()
else:
from ai.reporter import do_reporter
do_reporter()
# 预测标签
from math import inf
LABEL_RANGE = {2: [0.05, inf], 1: [0.02, 0.05], 0: [-0.02, 0.02], -1: [-0.05, -0.02], -2: [-inf, -0.05]}
LABEL_TAG = {2: 'UPUP', 1: 'UP', 0: 'NEUTRAL', -1: 'DOWN', -2: 'DOWNDOWN'}
......@@ -27,6 +27,7 @@ def get_eco_list(eco_ids=None, min_date=None, max_date=None):
{where(*sqls, red_eco_id=to_tuple(eco_ids))} order by red_eco_id, red_date
'''
@read
def get_fund_list(fund_ids=None, min_date=None, max_date=None, limit=None):
limit_sql = f'limit {limit}' if limit else ''
......@@ -40,10 +41,11 @@ def get_fund_list(fund_ids=None, min_date=None, max_date=None, limit=None):
{where(*sqls, rfn_fund_id=to_tuple(fund_ids))} order by rfn_fund_id, rfn_date {limit_sql}
'''
@read
def get_base_info(ids=None):
sqls = []
return f"""
SELECT rbd_id id,v_rbd_bloomberg_ticker ticker,v_rbd_type type, rbd_datas datas FROM `robo_base_datum`
{where(*sqls,rbd_id=to_tuple(ids))}
"""
\ No newline at end of file
{where(*sqls, rbd_id=to_tuple(ids))}
"""
from py_jftech import read, write, format_date, to_tuple, where, mapper_columns
__COLUMNS__ = {
'rp_rbd_id': 'rbd_id',
'rp_date': 'date',
'rp_predict': 'predict',
'rp_remark': 'remark',
'rp_create_time': 'create_time'
}
@write
def insert(datas):
datas = mapper_columns(datas=datas, columns=__COLUMNS__)
return f'''
replace into robo_predict({','.join([x for x in datas.keys()])})
values ({','.join([f"'{x[1]}'" for x in datas.items()])})
'''
@read
def get_list(index_ids: object = None, min_date: object = None, max_date: object = None) -> object:
sqls = []
if min_date:
sqls.append(f"rp_date >= '{format_date(min_date)}'")
if max_date:
sqls.append(f"rp_date <= '{format_date(max_date)}'")
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_predict
{where(*sqls, rid_index_id=to_tuple(index_ids))} order by rp_rbd_id, rp_date
'''
CREATE TABLE IF NOT EXISTS robo_predict (
`rp_rbd_id` bigint(20) NOT NULL,
`rp_date` datetime NOT NULL,
`rp_predict` int(11) NOT NULL,
`rp_remark` json NULL,
`rp_create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`rp_update_time` datetime NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
INDEX `rp_rbd_id`(`rp_rbd_id`, `rp_date`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
\ No newline at end of file
......@@ -3,8 +3,12 @@ from abc import ABC
import matplotlib.pyplot as plt
from lightgbm import LGBMClassifier
from sklearn import svm
from sklearn.ensemble import AdaBoostClassifier
from sklearn.ensemble import RandomForestClassifier, VotingClassifier
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay, accuracy_score
from sklearn.neighbors import KNeighborsClassifier
from ai.config import LABEL_RANGE
class ModelTrainer(ABC):
......@@ -23,7 +27,8 @@ class ModelTrainer(ABC):
print(strMethod + " ====== test results ======")
y_pred = classifier.predict(X_test)
result0 = confusion_matrix(y_test, y_pred, labels=[0, 1])
labels = list(LABEL_RANGE.keys())[::-1]
result0 = confusion_matrix(y_test, y_pred, labels=labels)
print(strMethod + " Confusion Matrix:")
print(result0)
......@@ -33,7 +38,7 @@ class ModelTrainer(ABC):
result2 = accuracy_score(y_test, y_pred)
print(strMethod + " Accuracy:", result2)
cm_display = ConfusionMatrixDisplay(confusion_matrix=result0, display_labels=['Down', 'Up'])
cm_display = ConfusionMatrixDisplay(confusion_matrix=result0, display_labels=labels)
cm_display.plot()
plt.title(strMethod + ' Accuracy: ' + f'{result2:.0%}')
plt.show()
......@@ -61,9 +66,25 @@ class ModelTrainer(ABC):
self.test_model('Support Vector Machines', classifierSVC, X_test, y_test)
return classifierSVC
def ensemble_model(self, rf_model, gbt_model, svc_model, X_train, y_train, X_test, y_test):
def train_nearest_neighbors(self, X_train, y_train, X_test, y_test):
classifier = KNeighborsClassifier()
classifier.fit(X_train, y_train)
if not self._toForecast:
self.test_model('K-Nearest Neighbors', classifier, X_test, y_test)
return classifier
def train_AdaBoost(self, X_train, y_train, X_test, y_test):
classifier = AdaBoostClassifier()
classifier.fit(X_train, y_train)
if not self._toForecast:
self.test_model('AdaBoost', classifier, X_test, y_test)
return classifier
def ensemble_model(self, rf_model, gbt_model, svc_model, knn_model,
ada_model, X_train, y_train, X_test, y_test):
# Create a dictionary of our models
estimators = [('rf', rf_model), ('gbt', gbt_model), ('svc', svc_model)]
estimators = [('rf', rf_model), ('gbt', gbt_model), ('svc', svc_model),
('knn', knn_model), ('AdaBoost', ada_model)]
# Create our voting classifier, inputting our models
ensemble = VotingClassifier(estimators, voting='hard')
# fit model to training data
......
......@@ -25,4 +25,5 @@ def upload_predict(ticker, predictDate, predict):
headers = {"X-AUTH-token": "rt7297LwQvyAYTke2iD8Vg"}
response = requests.post(url=f'{jrp_domain}/ai/predict', json=predict_data, headers=headers)
if response.status_code != 200:
print(response.text)
print("上传ai预测结果失败,请重试")
......@@ -4,6 +4,10 @@ import json
import pandas as pd
import requests
from ai.config import LABEL_TAG
from ai.dao import robo_predict
from ai.dao.robo_datas import get_base_info
symbols = ['ACWI', 'EWJ', 'MCHI', 'EEM', 'BKF', 'INDA', 'AAXJ', 'VGK', 'QQQ', 'SPY', 'SPX', 'IWN',
'IUSG', 'IWD', 'DON', 'GDX', 'TOLZ', 'XLU', 'XBI', 'ESGD', 'IGE', 'EMLC', 'IGAA',
'LQD', 'HYG', 'SHY', 'IEI', 'IEF', 'GLD', 'IYR', 'UUP', 'CEW', 'TLT']
......@@ -29,5 +33,24 @@ def do_reporter(start='2023-10-01', end=datetime.date.today()):
pf.to_excel("Forcast_Report.xlsx", index=False)
def do_reporter2():
datas = []
index_info = get_base_info()
info = {x['id']: x for x in index_info}
symbol_index_dict = {symbol: index for index, symbol in enumerate(symbols)}
records = robo_predict.get_list()
for item in records:
data = {
'Forcast On Date': item['date'],
'Ticker': info.get(item['rbd_id'])['ticker'].replace(' Index', '').replace(' Equity', ''),
'In 21 business days': LABEL_TAG.get(item['predict']),
'Ticker Name': json.loads(info.get(item['rbd_id'])['datas'])['chineseName'],
}
datas.append(data)
sorted_data = sorted(datas, key=lambda x: symbol_index_dict[x['Ticker'].split(' ')[0]])
pf = pd.DataFrame(sorted_data)
pf.to_excel("Forcast_Report.xlsx", index=False)
if __name__ == '__main__':
do_reporter()
do_reporter2()
......@@ -6,6 +6,8 @@ from finta import TA
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from ai.config import LABEL_RANGE
def imp():
print(TA)
......@@ -73,6 +75,10 @@ class TrainingDataBuilder(ABC):
return data
def build_predict_data(self, indexData, pid):
def map_to_label(ret):
for label, (lower, upper) in LABEL_RANGE.items():
if float(lower) <= ret < float(upper):
return label
"""
@param pid: 需要预测的指数或基金id
@return:
......@@ -114,8 +120,7 @@ class TrainingDataBuilder(ABC):
# The following uses future info for the y label, to be deleted later
predictData['futureR'] = np.log(predictData['close'].shift(-self._numForecastDays) / predictData['close'])
# predictData = predictData[predictData['futureR'].notna()]
predictData['yLabel'] = (predictData['futureR'] >= self._theThreshold).astype(int)
spxDataCloseSave = predictData[['date', 'close']]
predictData['yLabel'] = predictData['futureR'].apply(lambda r: map_to_label(r))
del (predictData['close'])
return predictData
......@@ -178,7 +183,8 @@ class TrainingDataBuilder(ABC):
###################
# scale data
scaler = MinMaxScaler(feature_range=(0, 1))
labels = list(LABEL_RANGE.keys())
scaler = MinMaxScaler(feature_range=(labels[-1], labels[0]))
# scaledX = scaler.fit_transform(X)
DataScaler = scaler.fit(X)
scaledX = DataScaler.transform(X)
......
......@@ -248,8 +248,8 @@ reports: # 报告模块相关
subject: "SVROBO6-实盘版-每日监测_{today}"
content: "Dear All: 附件是今天生成的监测数据,請驗收,謝謝! 注>:該郵件為自動發送,如有問題請聯繫矽谷團隊 telan_qian@chifufund.com"
robo-executor: # 执行器相关
use: ${ROBO_EXECUTOR:backtest} # 执行哪个执行器,优先取系统环境变量ROBO_EXECUTOR的值,默认backtest
sync-data: ${SYNC_DATA:off} # 是否开启同步资料数据
use: ${ROBO_EXECUTOR:real} # 执行哪个执行器,优先取系统环境变量ROBO_EXECUTOR的值,默认backtest
sync-data: ${SYNC_DATA:on} # 是否开启同步资料数据
backtest: # 回测执行器相关
start-date: 2024-03-02 # 回测起始日期
end-date: 2024-04-11 # 回测截止日期
......@@ -258,7 +258,7 @@ robo-executor: # 执行器相关
end-step: ${BACKTEST_END_STEP:3} # 回测从哪一步执行完成后结束执行 1:计算资产池;2:计算最优投组:3:计算再平衡信号以及持仓投组
clean-up: on
real: # 实盘执行器
export: ${EXPORT_ENABLE:off} # 是否开启报告
export: ${EXPORT_ENABLE:on} # 是否开启报告
start-date: 2023-05-08 # 实盘开始时间
include-date: []
......
......@@ -52,7 +52,7 @@ class BaseRebalanceSignal(RebalanceSignal, ABC):
date = pd.to_datetime(signal['date'].replace(day=transfer_date))
# 说明发生了跨月份问题
if signal['date'].day > transfer_date:
if rrs.get_count(risk=PortfoliosRisk.FT3, effective=True) > 1:
if rrs.get_count(risk=PortfoliosRisk.FT3, effective=True) > 0:
date = date + pd.DateOffset(months=1)
date = date + pd.DateOffset(months=frequency)
date = date - timedelta(days=1)
......
......@@ -10,6 +10,7 @@ __COLUMNS__ = {
'rrs_p_type': 'portfolio_type',
'rrs_p_weight': 'portfolio',
'rrs_effective': 'effective',
'rrs_create_time': 'create_time',
}
......
import datetime as dt
import json
from multiprocessing import Process
import logging
from statistics import pstdev
import pandas as pd
import uvicorn
from apscheduler.schedulers.blocking import BlockingScheduler
from fastapi import FastAPI
from py_jftech import prev_workday, filter_weekend, autowired
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.date import DateTrigger
from empyrical import sharpe_ratio, annual_volatility
from fastapi import FastAPI, Request
from py_jftech import prev_workday, filter_weekend, autowired, next_workday
from starlette.responses import JSONResponse
import main
from api import DatumType, PortfoliosRisk, Datum
from api import DatumType, PortfoliosRisk, Datum, RoboReportor
app = FastAPI()
# 创建 AsyncIOScheduler 实例
scheduler = AsyncIOScheduler()
REC_GID = 'E3886FBA-123B-7890-123E-123456BEEED'
fund_infos, cp, roi, risk = None, None, None, None
def get_today_rec():
......@@ -33,32 +41,41 @@ def get_last_signal():
@autowired
def get_fund_infos(datum: Datum = None):
return datum.get_datums(DatumType.FUND)
@app.get("/recommend")
async def root():
portfolio = get_today_rec()
if portfolio:
fund_infos = get_fund_infos()
global fund_infos
fund_infos = datum.get_datums(DatumType.FUND)
@autowired(names={'combo': 'hold-report'})
def load_report(max_date=prev_workday(dt.date.today()), min_date=None, combo: RoboReportor = None):
global cp, roi, risk
datas = pd.DataFrame(combo.load_report(max_date=max_date, min_date=min_date))
datas.set_index('date', inplace=True)
datas = datas['acc_av']
returns = round(datas.pct_change(), 5)
cp = round(sharpe_ratio(returns, risk_free=0, period='daily', annualization=None), 2)
roi = round(annual_volatility(datas), 1)
risk = round(pstdev(datas), 1)
return cp, roi, risk
@app.get("/franklin/prediction_data")
async def recommend():
sig = get_last_signal()
if sig:
if not fund_infos:
get_fund_infos()
id_ticker_map = {str(info['id']): info for info in fund_infos}
funds = json.loads(portfolio['portfolio'])
funds = json.loads(sig['portfolio'])
rec_list = []
portfolios = {'recomm_guid': REC_GID}
data = {'recomm_guid': REC_GID}
data['data_date'] = portfolio['date'].strftime('%Y-%m-%d')
data['funds'] = [{'weight': round(weight * 100), 'fund_id': id_ticker_map[key]['ftTicker']} for key, weight in
funds.items()]
data['creat_date'] = portfolio['create_time'].strftime('%Y-%m-%d %H:%M:%S')
# todo 补全
# returns = round(datas.pct_change(), 5)
# data['cp'] = sharpe_ratio(returns, risk_free=0, period='daily', annualization=None),
data['rr'] = 0.81
data['roi'] = 0.81
data['risk'] = round(sum([id_ticker_map[key]['risk'] * weight for key, weight in funds.items()]), 2)
note = {}
sig = get_last_signal()
note['last_reg_reb'] = sig['date'].strftime('%Y-%m-%d')
load_report(min_date=dt.date.today() - dt.timedelta(365))
data = {'recomm_guid': REC_GID, 'data_date': sig['date'].strftime('%Y-%m-%d'),
'funds': [{'weight': round(weight * 100), 'fund_id': id_ticker_map[key]['ftTicker']} for key, weight in
funds.items()], 'creat_date': sig['create_time'].strftime('%Y-%m-%d %H:%M:%S'),
'risk': risk,
'rr': round(sum([id_ticker_map[key]['risk'] * weight for key, weight in funds.items()]), 2), 'cp': cp,
'roi': roi}
note = {'last_rec': next_workday(sig['date']).strftime('%Y%m%d')}
data['note'] = json.dumps(note)
portfolios['data'] = data
rec_list.append(portfolios)
......@@ -67,28 +84,29 @@ async def root():
return {'msg': '当日投组未产生,待10:00后获取'}
def start_robo():
# 其他异常处理程序
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
# 打印一般错误信息
logging.error(f"请求 {request.url} 发生未知错误: {str(exc)}")
return JSONResponse(
status_code=500,
content={"errorCode": "500", "errorMsg": str(exc)},
)
# 定义应用启动事件
@app.on_event("startup")
async def startup_event():
# 异常情况可以重启跑当天投组
current_time = dt.datetime.now()
target_time = dt.time(10, 0)
if current_time.time() > target_time:
main.start()
scheduler.add_job(main.start, trigger=DateTrigger(run_date=current_time))
# 开启定时任务,执行实盘
scheduler = BlockingScheduler()
scheduler.add_job(main.start, 'cron', day_of_week='0-4', hour=10, minute=00)
scheduler.start()
def start_web():
uvicorn.run("robo_controller:app", reload=True, port=8080)
if __name__ == "__main__":
# 开启一个进程执行start_robo()
p1 = Process(target=start_robo)
p1.start()
# 启动进程2
p2 = Process(target=start_web)
p2.start()
uvicorn.run("robo_controller:app", host="0.0.0.0", port=8080)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment