import json from typing import List from py_jftech import autowired, parse_date, prev_workday, format_date from ai.dao.robo_datas import get_base_info, get_index_list, get_fund_list from ai.data_access import DataAccess from ai.model_trainer import ModelTrainer from ai.noticer import send from ai.training_data_builder import TrainingDataBuilder from api import DataSync # 截止日期 # max_date = None max_date = '2024-03-01' # max_date = '2024-01-11' toForecast = True # False means test, True means forecast syncData = True # 开启会同步数据库指数及基金数据 uploadData = False # 开启会上传预测结果 doReport = True # 开启会生成Excel报告 # 待预测指数 # PREDICT_LIST = [67, 121, 122, 123] PREDICT_LIST = [67, 121, 122, 123, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 174, 175, 177, 178] eco = [65, 66, 74, 134] index = [67, 68, 69, 70, 71, 72, 73, 75, 76, 77, 105, 106, 116, 117, 138, 139, 142, 143, 140, 141, 144, 145, 146] fund = [121, 122, 123, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 174, 175, 177, 178] @autowired def sync(syncs: List[DataSync] = None): for s in syncs: # if isinstance(s, (IndexSync, EcoSync)): s.do_sync() def predictionFromMoel(the_model, scaledX_forecast, predict_item, indexDict: dict): prediction = the_model.predict(scaledX_forecast) predictionStr = 'DOWN' if (prediction > 0.5): predictionStr = 'UP' content = f"""\n On day {forecastDay.strftime("%m/%d/%Y")}, the model predicts {predict_item} to be {predictionStr} in {str(numForecastDays)} business days. \n""" print(content) # 上传预测结果 key = [k for k, v in indexDict.items() if v == predict_item] index_info = get_base_info(key)[0] if uploadData: from ai.noticer import upload_predict upload_predict(index_info['ticker'], forecastDay, predictionStr) if doReport: from ai.reporter import do_reporter do_reporter() send(content) return prediction def judgement(id, type, predict): from datetime import datetime predict_term = 21 start = parse_date(max_date) if max_date else prev_workday(datetime.today()) navs = [] if type == 'INDEX': navs = get_index_list(index_ids=id, min_date=start, limit=predict_term) navs = [nav['rid_close'] for nav in navs] elif type == 'FUND': navs = get_fund_list(fund_ids=id, min_date=start, limit=predict_term) navs = [nav['rfn_nav_cal'] for nav in navs] if len(navs) == predict_term: upper = True if navs[-1] >= navs[0] else False result = {} for k, v, in predict.items(): pred = True if v[0] > 0 else False if upper == pred: result[k] = True else: result[k] = False j = { 'id': id, 'date': format_date(start), 'result': result } with open('predict.txt', 'a+') as file: file.write(json.dumps(j)) file.write('\n') ######################################## if __name__ == '__main__': if syncData: sync() # define some parameters win1W = 5 # 1 week win1M = 21 # 1 Month win1Q = 63 # 1 Quarter numForecastDays = 21 # business days, 21 business days means one month theThreshold = 0.0 ids = set(PREDICT_LIST) | set(eco) | set(index) | set(fund) infos = get_base_info(ids) infos_type = {info['id']: info['type'] for info in infos} indexDict = {info['id']: info['ticker'].replace(' Index', '').replace(' Equity', '').replace(' ', '_') for info in infos} ################### # Step 1: Prepare X and y (features and labels) # 准备基础数据 data_access = DataAccess(index, eco, fund, max_date, indexDict) indexData = data_access.get_index_datas() ecoData = data_access.get_eco_datas() fundData = data_access.get_fund_datas() # 指数数据准备 vixData = data_access.get_vix(indexData) indexOtherData = data_access.get_other_index(indexData) # 经济指标数据准备 cpiData = data_access.get_cpi(ecoData) FDTRData = data_access.get_fdtr(ecoData) # 新增指标 NAPMPMI :美國的ISM製造業指數 (Monthly) NAPMPMIData = data_access.get_napmpmi(ecoData) builder = TrainingDataBuilder(index, eco, fund, indexDict, toForecast, win1W, win1M, win1Q, numForecastDays, theThreshold) for pid in PREDICT_LIST: print(f'{indexDict[pid]} start '.center(50, '=')) t_data = indexData if pid in index else fundData X_train, X_test, y_train, y_test, scaledX_forecast, forecastDay = \ builder.build_train_test(pid, t_data, vixData, indexOtherData, cpiData, FDTRData, NAPMPMIData) trainer = ModelTrainer(toForecast) rf_model = trainer.train_random_forest(X_train, y_train, X_test, y_test) gbt_model = trainer.train_GBT(X_train, y_train, X_test, y_test) svc_model = trainer.train_SVC(X_train, y_train, X_test, y_test) ensemble_model = trainer.ensemble_model(rf_model, gbt_model, svc_model, X_train, y_train, X_test, y_test) model_predict = {'forest': rf_model.predict(scaledX_forecast), 'gbt': gbt_model.predict(scaledX_forecast), 'svc': svc_model.predict(scaledX_forecast), 'ensemble': ensemble_model.predict(scaledX_forecast)} print(f'预测结果:{model_predict}'.center(60, '+')) judgement(pid, infos_type[pid], model_predict) if toForecast: predictionFromMoel(ensemble_model, scaledX_forecast, indexDict[pid], indexDict)