Commit 7c822105 authored by wenwen.tang's avatar wenwen.tang 😕

add ai module

parent 22ebf8c5
from datetime import datetime
from typing import List
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
# from finta import TA
from lightgbm import LGBMClassifier
from py_jftech import sendmail, format_date, autowired
from sklearn import svm
from sklearn.ensemble import RandomForestClassifier, VotingClassifier
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay, accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
# for draw confusion matrix
# import sys
# import matplotlib
# matplotlib.use('Agg')/,nZ'/
# from sklearn import metrics
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import Dense, Dropout, LSTM
from ai.dao.robo_datas import get_eco_list, get_index_list
from api import DataSync
# List of symbols for technical indicators
# INDICATORS = ['RSI', 'MACD', 'STOCH','ADL', 'ATR', 'MOM', 'MFI', 'ROC', 'OBV', 'CCI', 'EMV', 'VORTEX']
# Note that '14 period MFI' and '14 period EMV' is not available for forecast
from basic.sync import EcoSync, IndexSync
INDICATORS = ['RSI', 'MACD', 'STOCH', 'ADL', 'ATR', 'MOM', 'ROC', 'OBV', 'CCI', 'VORTEX']
eco = [65, 66, 74]
index = [67, 68, 69, 70, 71, 72, 73, 75]
# 预测发送邮箱
email = ['wenwen.tang@thizgroup.com']
# 截止日期
max_date = None
# max_date = '2023-09-01'
@autowired
def sync(syncs: List[DataSync] = None):
for s in syncs:
if isinstance(s, (IndexSync, EcoSync)):
s.do_sync()
def send(content):
receives = email
subject = '预测_{today}'.format(today=format_date(datetime.today()))
sendmail(receives=receives, copies=[], attach_paths=[], subject=subject, content=content)
def _get_indicator_data(data):
"""
Function that uses the finta API to calculate technical indicators used as the features
"""
for indicator in INDICATORS:
ind_data = eval('TA.' + indicator + '(data)')
if not isinstance(ind_data, pd.DataFrame):
ind_data = ind_data.to_frame()
data = data.merge(ind_data, left_index=True, right_index=True)
data.rename(columns={"14 period EMV.": '14 period EMV'}, inplace=True)
# Also calculate moving averages for features
data['ema50'] = data['close'] / data['close'].ewm(50).mean()
data['ema21'] = data['close'] / data['close'].ewm(21).mean()
data['ema15'] = data['close'] / data['close'].ewm(15).mean()
data['ema5'] = data['close'] / data['close'].ewm(5).mean()
# Instead of using the actual volume value (which changes over time), we normalize it with a moving volume average
data['normVol'] = data['volume'] / data['volume'].ewm(5).mean()
# get relative values
data['relativeOpen'] = data['open'] / data['close'].shift(1)
data['relativeHigh'] = data['high'] / data['close'].shift(1)
data['relativeLow'] = data['low'] / data['close'].shift(1)
data['relativeClose'] = data['close'] / data['close'].shift(1)
# Remove columns that won't be used as features
del (data['open'])
del (data['high'])
del (data['low'])
# data['close'] are still needed and will be deleted later
del (data['volume'])
return data
########################################
if __name__ == '__main__':
sync()
toForecast = True # False means test, True means forecast
indexDataPath = r"AI_Data"
# define some parameters
win1W = 5 # 1 week
win1M = 21 # 1 Month
win1Q = 63 # 1 Quarter
numForecastDays = 21 # business days, 21 business days means one month
theThreshold = 0.0
indexDict = {
65: "CPI_YOY",
66: "FDTR",
67: "SPX",
68: "USGG10YR",
69: "USGG2YR",
70: "MXWO", # not use now
71: "MXWD", # not use now
72: "CCMP",
73: "TWSE", # not use now
74: "CPURNSA",
75: "VIX",
76: "US0001M",
77: "US0012M"}
###################
# Step 1: Prepare X and y (features and labels)
###### get raw data
# indexData = pd.read_excel(indexDataPath + r"\robo_index_datas.xlsx", sheet_name='robo_index_datas')
indexData = pd.DataFrame(get_index_list(index_ids=index, max_date=max_date))
indexData = indexData[
["rid_index_id", "rid_date", "rid_high", "rid_open", "rid_low", "rid_close", "rid_pe", "rid_pb", "rid_volume"]]
indexData.rename(columns={"rid_date": 'date'}, inplace=True) # please use 'date'
indexData["rid_index_id"] = indexData["rid_index_id"].map(indexDict)
# ecoData = pd.read_excel(indexDataPath + r"\robo_index_datas.xlsx", sheet_name='robo_eco_datas')
ecoData = pd.DataFrame(get_eco_list(eco_ids=eco, max_date=max_date))
ecoData = ecoData[["red_eco_id", "red_date", "red_indicator"]]
ecoData.rename(columns={"red_date": 'date'}, inplace=True) # please use 'date'
ecoData["red_eco_id"] = ecoData["red_eco_id"].map(indexDict)
###### get individual data from raw data
spxData = indexData[indexData['rid_index_id'] == "SPX"].copy()
del (spxData['rid_index_id'])
spxData.set_index('date', inplace=True)
spxData.index = pd.to_datetime(spxData.index)
spxData.sort_index(inplace=True)
spxData.reset_index(inplace=True)
if (toForecast):
forecastDay = spxData['date'].iloc[-1]
vixData = indexData[indexData['rid_index_id'] == "VIX"].copy()
vixData = vixData[["date", "rid_high", "rid_open", "rid_low", "rid_close"]]
vixData.rename(
columns={"rid_high": 'vix_high', 'rid_open': 'vix_open', "rid_low": 'vix_low', "rid_close": 'vix_close'},
inplace=True)
vixData.set_index('date', inplace=True)
vixData.index = pd.to_datetime(vixData.index)
indexOtherData = indexData[(indexData['rid_index_id'] == "USGG10YR") | (indexData['rid_index_id'] == "USGG2YR") | (
indexData['rid_index_id'] == "CCMP")
| (indexData['rid_index_id'] == "US0001M") | (
indexData['rid_index_id'] == "US0012M")].copy()
indexOtherData = indexOtherData[['rid_index_id', 'date', 'rid_close']]
indexOtherData = indexOtherData.pivot(index='date', columns='rid_index_id', values='rid_close')
indexOtherData.index = pd.to_datetime(indexOtherData.index)
cpiData = ecoData[(ecoData['red_eco_id'] == "CPI_YOY") | (ecoData['red_eco_id'] == "CPURNSA")].copy()
cpiData = cpiData.pivot(index='date', columns='red_eco_id', values='red_indicator')
cpiData['CPI_MOM'] = (cpiData['CPURNSA'] / cpiData['CPURNSA'].shift(1) - 1.0) * 100 * 12 # Annualized Percentage
cpiData['CPI_MOM_Diff'] = cpiData['CPURNSA'] - cpiData['CPURNSA'].shift(1)
cpiData.index = pd.to_datetime(cpiData.index)
FDTRData = ecoData[ecoData['red_eco_id'] == "FDTR"].copy()
del (FDTRData['red_eco_id'])
FDTRData.rename(columns={"red_indicator": 'FDTR'}, inplace=True)
FDTRData.set_index('date', inplace=True)
FDTRData.index = pd.to_datetime(FDTRData.index)
###### Additional preparing SPX Data
# finta expects properly formated ohlc DataFrame, with column names in lowercase:
# ["open", "high", "low", close"] and ["volume"] for indicators that expect ohlcv input.
spxData.rename(
columns={"rid_high": 'high', 'rid_open': 'open', "rid_low": 'low', "rid_close": 'close', 'rid_volume': 'volume',
"rid_pe": "SPX_pe", "rid_pb": "SPX_pb"},
inplace=True)
# Calculate the indicator data
spxData = _get_indicator_data(spxData)
# Calculate Historical Return and Volatility
spxData['R1W'] = np.log(spxData['close'] / spxData['close'].shift(win1W))
spxData['R1M'] = np.log(spxData['close'] / spxData['close'].shift(win1M))
spxData['R1Q'] = np.log(spxData['close'] / spxData['close'].shift(win1Q))
price_list = spxData['close']
rollist = price_list.rolling(win1W)
spxData['Vol_1W'] = rollist.std(ddof=0)
rollist = price_list.rolling(win1M)
spxData['Vol_1M'] = rollist.std(ddof=0)
rollist = price_list.rolling(win1Q)
spxData['Vol_1Q'] = rollist.std(ddof=0)
# The following uses future info for the y label, to be deleted later
spxData['futureR'] = np.log(spxData['close'].shift(-numForecastDays) / spxData['close'])
# spxData = spxData[spxData['futureR'].notna()]
spxData['yLabel'] = (spxData['futureR'] >= theThreshold).astype(int)
spxDataCloseSave = spxData[['date', 'close']]
del (spxData['close'])
###### Merge Data to one table
DataAll = pd.merge(spxData, vixData, how='outer', on='date')
DataAll = pd.merge(DataAll, indexOtherData, how='outer', on='date')
DataAll = pd.merge(DataAll, cpiData, how='outer', on='date')
DataAll = pd.merge(DataAll, FDTRData, how='outer', on='date')
DataAll.set_index('date', inplace=True)
DataAll.sort_index(inplace=True)
DataAll.reset_index(inplace=True)
###### fill eco data
for col in ['CPI_YOY', 'CPURNSA', 'CPI_MOM', 'CPI_MOM_Diff']:
DataAll[col].bfill(inplace=True)
for col in ['FDTR']:
DataAll[col].ffill(inplace=True)
if (toForecast):
DataAllCopy = DataAll.copy()
for col in ['CPI_YOY', 'CPURNSA']:
DataAllCopy[col].ffill(inplace=True)
for col in ['CPI_MOM', 'CPI_MOM_Diff']:
DataAllCopy[col] = DataAllCopy[col].fillna(0)
del (DataAllCopy['futureR'])
del (DataAllCopy['yLabel'])
forecastDayIndex = DataAllCopy.index[DataAllCopy['date'] == forecastDay]
forecastData = DataAllCopy.iloc[forecastDayIndex.to_list(), 1:]
X_forecast = forecastData.to_numpy()
del DataAllCopy
###### clean NaN
DataAll.dropna(inplace=True)
DataAll.reset_index(inplace=True, drop=True)
###### get X and y
y = DataAll['yLabel'].to_numpy(copy=True)
# delete future information
del (DataAll['futureR'])
del (DataAll['yLabel'])
X = DataAll.iloc[:, 1:].values
###################
# scale data
scaler = MinMaxScaler(feature_range=(0, 1))
# scaledX = scaler.fit_transform(X)
DataScaler = scaler.fit(X)
scaledX = DataScaler.transform(X)
if (toForecast):
scaledX_forecast = DataScaler.transform(X_forecast)
X_train = scaledX
y_train = y
X_test = []
y_test = []
else:
# Step 2: Split data into train set and test set
X_train, X_test, y_train, y_test = train_test_split(scaledX, y, test_size=0.02, shuffle=False)
# To avoid data leak, test set should start from numForecastDays later
X_test = X_test[numForecastDays:]
y_test = y_test[numForecastDays:]
def test_model(strMethod, classifier, X_test, y_test):
print(strMethod + " ====== test results ======")
y_pred = classifier.predict(X_test)
result0 = confusion_matrix(y_test, y_pred)
print(strMethod + " Confusion Matrix:")
print(result0)
result1 = classification_report(y_test, y_pred, zero_division=1.0)
print(strMethod + " Classification Report:")
print(result1)
result2 = accuracy_score(y_test, y_pred)
print(strMethod + " Accuracy:", result2)
cm_display = ConfusionMatrixDisplay(confusion_matrix=result0, display_labels=['Down', 'Up'])
cm_display.plot()
plt.title(strMethod + ' Accuracy: ' + f'{result2:.0%}')
plt.show()
###################
# Step 3: Train the model
def _train_random_forest(X_train, y_train, X_test, y_test):
classifier = RandomForestClassifier()
classifier.fit(X_train, y_train)
if (not toForecast):
test_model('Random Forest', classifier, X_test, y_test)
return classifier
rf_model = _train_random_forest(X_train, y_train, X_test, y_test)
def _train_GBT(X_train, y_train, X_test, y_test):
# Gradient Boosted Tree
classifierGBT = LGBMClassifier()
classifierGBT.fit(X_train, y_train)
if (not toForecast):
test_model('Gradient Boosted Tree', classifierGBT, X_test, y_test)
return classifierGBT
gbt_model = _train_GBT(X_train, y_train, X_test, y_test)
def _train_SVC(X_train, y_train, X_test, y_test):
# Support Vector Machines
classifierSVC = svm.SVC()
classifierSVC.fit(X_train, y_train)
if (not toForecast):
test_model('Support Vector Machines', classifierSVC, X_test, y_test)
return classifierSVC
svc_model = _train_SVC(X_train, y_train, X_test, y_test)
def _ensemble_model(rf_model, gbt_model, svc_model, X_train, y_train, X_test, y_test):
# Create a dictionary of our models
estimators = [('rf', rf_model), ('gbt', gbt_model), ('svc', svc_model)]
# Create our voting classifier, inputting our models
ensemble = VotingClassifier(estimators, voting='hard')
# fit model to training data
ensemble.fit(X_train, y_train)
if (not toForecast):
test_model('Ensemble Model', ensemble, X_test, y_test)
return ensemble
ensemble_model = _ensemble_model(rf_model, gbt_model, svc_model, X_train, y_train, X_test, y_test)
def predictionFromMoel(the_model, scaledX_forecast):
prediction = the_model.predict(scaledX_forecast)
predictionStr = 'DOWN'
if (prediction > 0.5):
predictionStr = 'UP'
content = "\n On day " + forecastDay.strftime(
"%m/%d/%Y") + ", the model predicts SPX to be " + predictionStr + " in " + str(
numForecastDays) + " business days. \n"
print(content)
send(content)
return prediction
if (toForecast):
predictionFromMoel(ensemble_model, scaledX_forecast)
from py_jftech import read, to_tuple, where
@read
def get_index_list(index_ids=None, min_date=None, max_date=None):
sqls = []
if min_date:
sqls.append(f"rid_date >= '{min_date}'")
if max_date:
sqls.append(f"rid_date <= '{max_date}'")
return f'''
select * from robo_index_datas
{where(*sqls, rid_index_id=to_tuple(index_ids))} order by rid_index_id, rid_date
'''
@read
def get_eco_list(eco_ids=None, min_date=None, max_date=None):
sqls = []
if min_date:
sqls.append(f"red_date >= '{min_date}'")
if max_date:
sqls.append(f"red_date <= '{max_date}'")
return f'''
select * from robo_eco_datas
{where(*sqls, red_eco_id=to_tuple(eco_ids))} order by red_eco_id, red_date
'''
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment