from typing import List from py_jftech import autowired from ai.dao.robo_datas import get_base_info from ai.data_access import DataAccess from ai.model_trainer import ModelTrainer from ai.noticer import upload_predict from ai.training_data_builder import TrainingDataBuilder from api import DataSync # 截止日期 # max_date = None max_date = '2024-01-05' # 待预测指数 PREDICT_LIST = [67, 121, 122, 123] # PREDICT_LIST = [67, 121, 122, 123, 155, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 168, 169, 170, 171, 174, 175] eco = [65, 66, 74, 134] index = [67, 68, 69, 70, 71, 72, 73, 75, 116, 117, 138, 139, 142, 143, 140, 141, 144, 145, 146] fund = [121, 122, 123, 155, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 168, 169, 170, 171, 174, 175] @autowired def sync(syncs: List[DataSync] = None): for s in syncs: # if isinstance(s, (IndexSync, EcoSync)): s.do_sync() def predictionFromMoel(the_model, scaledX_forecast, predict_item, indexDict: dict): prediction = the_model.predict(scaledX_forecast) predictionStr = 'DOWN' if (prediction > 0.5): predictionStr = 'UP' content = f"""\n On day {forecastDay.strftime("%m/%d/%Y")}, the model predicts {predict_item} to be {predictionStr} in {str(numForecastDays)} business days. \n""" print(content) # 上传预测结果 key = [k for k, v in indexDict.items() if v == predict_item] index_info = get_base_info(key)[0] upload_predict(index_info['ticker'], forecastDay, predictionStr) # send(content) return prediction ######################################## if __name__ == '__main__': sync() toForecast = True # False means test, True means forecast # define some parameters win1W = 5 # 1 week win1M = 21 # 1 Month win1Q = 63 # 1 Quarter numForecastDays = 21 # business days, 21 business days means one month theThreshold = 0.0 ids = set(PREDICT_LIST) | set(eco) | set(index) | set(fund) infos = get_base_info(ids) indexDict = {info['id']: info['ticker'].replace(' Index', '').replace(' Equity', '').replace(' ', '_') for info in infos} ################### # Step 1: Prepare X and y (features and labels) # 准备基础数据 data_access = DataAccess(index, eco, fund, max_date, indexDict) indexData = data_access.get_index_datas() ecoData = data_access.get_eco_datas() fundData = data_access.get_fund_datas() # 指数数据准备 vixData = data_access.get_vix(indexData) indexOtherData = data_access.get_other_index(indexData) # 经济指标数据准备 cpiData = data_access.get_cpi(ecoData) FDTRData = data_access.get_fdtr(ecoData) # 新增指标 NAPMPMI :美國的ISM製造業指數 (Monthly) NAPMPMIData = data_access.get_napmpmi(ecoData) builder = TrainingDataBuilder(index, eco, fund, indexDict, toForecast, win1W, win1M, win1Q, numForecastDays, theThreshold) for pid in PREDICT_LIST: print(f'{indexDict[pid]} start '.center(50, '=')) t_data = indexData if pid in index else fundData X_train, X_test, y_train, y_test, scaledX_forecast, forecastDay = \ builder.build_train_test(pid, t_data, vixData, indexOtherData, cpiData, FDTRData, NAPMPMIData) trainer = ModelTrainer(toForecast) rf_model = trainer.train_random_forest(X_train, y_train, X_test, y_test) gbt_model = trainer.train_GBT(X_train, y_train, X_test, y_test) svc_model = trainer.train_SVC(X_train, y_train, X_test, y_test) ensemble_model = trainer.ensemble_model(rf_model, gbt_model, svc_model, X_train, y_train, X_test, y_test) print(f'forest predict{rf_model.predict(scaledX_forecast)}'.center(60, '+')) print(f'gbt predict{gbt_model.predict(scaledX_forecast)}'.center(60, '+')) print(f'svc predict{svc_model.predict(scaledX_forecast)}'.center(60, '+')) print(f'ensemble predict{ensemble_model.predict(scaledX_forecast)}'.center(60, '+')) if toForecast: predictionFromMoel(ensemble_model, scaledX_forecast, indexDict[pid], indexDict)