Commit 18c7d36d authored by wenwen.tang's avatar wenwen.tang 😕

ai 模块对X和Y列增添数据

parent f02ff4be
This diff is collapsed.
......@@ -25,3 +25,23 @@ def get_eco_list(eco_ids=None, min_date=None, max_date=None):
select * from robo_eco_datas
{where(*sqls, red_eco_id=to_tuple(eco_ids))} order by red_eco_id, red_date
'''
@read
def get_fund_list(fund_ids=None, min_date=None, max_date=None):
sqls = []
if min_date:
sqls.append(f"rfn_date >= '{min_date}'")
if max_date:
sqls.append(f"rfn_date <= '{max_date}'")
return f'''
select * from robo_fund_navs
{where(*sqls, rfn_fund_id=to_tuple(fund_ids))} order by rfn_fund_id, rfn_date
'''
@read
def get_base_info(ids=None):
sqls = []
return f"""
SELECT rbd_id id,v_rbd_bloomberg_ticker ticker,v_rbd_type type FROM `robo_base_datum`
{where(*sqls,rbd_id=to_tuple(ids))}
"""
\ No newline at end of file
from abc import ABC
import pandas as pd
from ai.dao.robo_datas import get_eco_list, get_fund_list, get_index_list
class DataAccess(ABC):
def __init__(self, index, eco, fund, max_date, indexDict) -> None:
super().__init__()
self._index = index
self._eco = eco
self._fund = fund
self._max_date = max_date
self._indexDict = indexDict
def get_index_datas(self):
indexData = pd.DataFrame(
get_index_list(index_ids=self._index, max_date=self._max_date))
# todo erp 没有数据 "rid_erp",
indexData = indexData[
["rid_index_id", "rid_date", "rid_high", "rid_open", "rid_low", "rid_close", "rid_pe", "rid_pb",
"rid_volume", "rid_frdpe", "rid_frdpes", "rid_pc"]]
indexData.rename(columns={"rid_date": 'date'}, inplace=True) # please use 'date'
indexData["rid_index_id"] = indexData["rid_index_id"].map(self._indexDict)
indexData['rid_frdpe'].ffill(inplace=True)
return indexData
def get_eco_datas(self):
ecoData = pd.DataFrame(
get_eco_list(eco_ids=self._eco, max_date=self._max_date))
ecoData = ecoData[["red_eco_id", "red_date", "red_indicator"]]
ecoData.rename(columns={"red_date": 'date'}, inplace=True) # please use 'date'
ecoData["red_eco_id"] = ecoData["red_eco_id"].map(self._indexDict)
return ecoData
def get_fund_datas(self):
fundData = pd.DataFrame(
get_fund_list(fund_ids=self._fund, max_date=self._max_date))
fundData = fundData[["rfn_fund_id", "rfn_date", "rfn_nav_cal"]]
fundData.rename(columns={"rfn_date": 'date'}, inplace=True) # please use 'date'
fundData["rfn_fund_id"] = fundData["rfn_fund_id"].map(self._indexDict)
return fundData
def get_vix(self, indexData):
# VIX:芝加哥期权交易所SPX波动率指
vixData = indexData[indexData['rid_index_id'] == "VIX"].copy()
vixData = vixData[["date", "rid_high", "rid_open", "rid_low", "rid_close"]]
vixData.rename(
columns={"rid_high": 'vix_high', 'rid_open': 'vix_open', "rid_low": 'vix_low', "rid_close": 'vix_close'},
inplace=True)
vixData.set_index('date', inplace=True)
vixData.index = pd.to_datetime(vixData.index)
return vixData
def get_other_index(self, indexData):
other_index = ["USGG10YR", "USGG2YR", "CCMP", "US0001M", "US0012M", "COI TOTL", "LEI TOTL", "MID",
"OE4EKLAC", "OEA5KLAC", "OECNKLAC", "OEJPKLAC", "OEOTGTAC", "OEUSKLAC", "USRINDEX", "SPX"]
cols = ['date', 'rid_close', 'rid_pe', 'rid_pb', 'rid_volume', 'rid_frdpe', 'rid_frdpes', 'rid_pc']
indexOtherData = pd.DataFrame()
idxs = [self._indexDict[i] for i in self._index]
for idx in other_index:
if idx in idxs:
idx_data = indexData[indexData['rid_index_id'] == idx].copy()
idx_data = idx_data[cols]
idx_data.rename(
columns={"rid_close": f'{idx}_close', 'rid_pe': f'{idx}_pe', 'rid_pb': f'{idx}_pb',
'rid_volume': f'{idx}_volume', 'rid_frdpe': f'{idx}_frdpe', 'rid_frdpes': f'{idx}_frdpes',
'rid_pc': f'{idx}_pc'},
inplace=True)
idx_data.set_index('date', inplace=True)
idx_data.index = pd.to_datetime(idx_data.index)
if indexOtherData.size > 0:
indexOtherData = pd.merge(indexOtherData, idx_data, how='outer', on='date')
else:
indexOtherData = idx_data
indexOtherData.ffill(inplace=True)
indexOtherData.bfill(inplace=True)
indexOtherData = indexOtherData.dropna(axis=1)
return indexOtherData
def get_cpi(self, ecoData):
# CPI_YOY:美国城镇消费物价指数同比未经季 CPURNSA:美国消费者物价指数未经季调
cpiData = ecoData[(ecoData['red_eco_id'] == "CPI_YOY") | (ecoData['red_eco_id'] == "CPURNSA")].copy()
cpiData = cpiData.pivot(index='date', columns='red_eco_id', values='red_indicator')
cpiData['CPI_MOM'] = (cpiData['CPURNSA'] / cpiData['CPURNSA'].shift(
1) - 1.0) * 100 * 12 # Annualized Percentage
cpiData['CPI_MOM_Diff'] = cpiData['CPURNSA'] - cpiData['CPURNSA'].shift(1)
cpiData.index = pd.to_datetime(cpiData.index)
return cpiData
def get_fdtr(self, ecoData):
# FDTR 美国联邦基金目标利率
FDTRData = ecoData[ecoData['red_eco_id'] == "FDTR"].copy()
del (FDTRData['red_eco_id'])
FDTRData.rename(columns={"red_indicator": 'FDTR'}, inplace=True)
FDTRData.set_index('date', inplace=True)
FDTRData.index = pd.to_datetime(FDTRData.index)
return FDTRData
def get_napmpmi(self, ecoData):
# 新增指标 NAPMPMI :美國的ISM製造業指數 (Monthly)
NAPMPMIData = ecoData[ecoData['red_eco_id'] == "NAPMPMI"].copy()
del (NAPMPMIData['red_eco_id'])
NAPMPMIData.rename(columns={"red_indicator": 'NAPMPMI'}, inplace=True)
NAPMPMIData.set_index('date', inplace=True)
NAPMPMIData.index = pd.to_datetime(NAPMPMIData.index)
return NAPMPMIData
from abc import ABC
import matplotlib.pyplot as plt
from lightgbm import LGBMClassifier
from sklearn import svm
from sklearn.ensemble import RandomForestClassifier, VotingClassifier
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay, accuracy_score
class ModelTrainer(ABC):
"""
模型训练类
"""
def __init__(self, toForecast) -> None:
super().__init__()
self._toForecast = toForecast
###################
# Step 3: Train the model
def test_model(self, strMethod, classifier, X_test, y_test):
print(strMethod + " ====== test results ======")
y_pred = classifier.predict(X_test)
result0 = confusion_matrix(y_test, y_pred)
print(strMethod + " Confusion Matrix:")
print(result0)
result1 = classification_report(y_test, y_pred, zero_division=1.0)
print(strMethod + " Classification Report:")
print(result1)
result2 = accuracy_score(y_test, y_pred)
print(strMethod + " Accuracy:", result2)
cm_display = ConfusionMatrixDisplay(confusion_matrix=result0, display_labels=['Down', 'Up'])
cm_display.plot()
plt.title(strMethod + ' Accuracy: ' + f'{result2:.0%}')
plt.show()
def train_random_forest(self, X_train, y_train, X_test, y_test):
classifier = RandomForestClassifier()
classifier.fit(X_train, y_train)
if not self._toForecast:
self.test_model('Random Forest', classifier, X_test, y_test)
return classifier
def train_GBT(self, X_train, y_train, X_test, y_test):
# Gradient Boosted Tree
classifierGBT = LGBMClassifier()
classifierGBT.fit(X_train, y_train)
if not self._toForecast:
self.test_model('Gradient Boosted Tree', classifierGBT, X_test, y_test)
return classifierGBT
def train_SVC(self, X_train, y_train, X_test, y_test):
# Support Vector Machines
classifierSVC = svm.SVC()
classifierSVC.fit(X_train, y_train)
if not self._toForecast:
self.test_model('Support Vector Machines', classifierSVC, X_test, y_test)
return classifierSVC
def ensemble_model(self, rf_model, gbt_model, svc_model, X_train, y_train, X_test, y_test):
# Create a dictionary of our models
estimators = [('rf', rf_model), ('gbt', gbt_model), ('svc', svc_model)]
# Create our voting classifier, inputting our models
ensemble = VotingClassifier(estimators, voting='hard')
# fit model to training data
ensemble.fit(X_train, y_train)
if not self._toForecast:
self.test_model('Ensemble Model', ensemble, X_test, y_test)
return ensemble
from datetime import datetime
import requests
from py_jftech import sendmail, format_date
# 预测发送邮箱
email = ['wenwen.tang@thizgroup.com']
jrp_domain = 'https://jrp.jfquant.com/api/v1.0'
# jrp_domain = 'http://localhost:7090/jrp'
def send(content):
receives = email
subject = '预测_{today}'.format(today=format_date(datetime.today()))
sendmail(receives=receives, copies=[], attach_paths=[], subject=subject, content=content)
def upload_predict(ticker, predictDate, predict):
predict_data = {
"aiPredict": {
"predictDate": format_date(predictDate),
"predict": 1 if predict == 'UP' else -1
},
"bloombergTicker": ticker
}
headers = {"X-AUTH-token": "rt7297LwQvyAYTke2iD8Vg"}
response = requests.post(url=f'{jrp_domain}/ai/predict', json=predict_data, headers=headers)
if response.status_code != 200:
print("上传ai预测结果失败,请重试")
from abc import ABC
import numpy as np
import pandas as pd
from finta import TA
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
def imp():
print(TA)
class TrainingDataBuilder(ABC):
def __init__(self, index, eco, fund, indexDict, toForecast, win1W, win1M, win1Q, numForecastDays,
theThreshold) -> None:
super().__init__()
self._index = index
self._eco = eco
self._fund = fund
self._indexDict = indexDict
self._toForecast = toForecast
self._win1W = win1W # 1 week
self._win1M = win1M # 1 Month
self._win1Q = win1Q # 1 Quarter
self._numForecastDays = numForecastDays # business days, 21 business days means one month
self._theThreshold = theThreshold
# List of symbols for technical indicators
# INDICATORS = ['RSI', 'MACD', 'STOCH','ADL', 'ATR', 'MOM', 'MFI', 'ROC', 'OBV', 'CCI', 'EMV', 'VORTEX']
# Note that '14 period MFI' and '14 period EMV' is not available for forecast
self.INDICATORS = ['RSI', 'MACD', 'STOCH', 'ADL', 'ATR', 'MOM', 'ROC', 'OBV', 'CCI', 'VORTEX']
self.FUND_INDICATORS = []
def get_indicator_data(self, data, pid):
"""
Function that uses the finta API to calculate technical indicators used as the features
"""
def indicator_calcu(data, indicators):
"""
指数和基金不同,基金只有收盘价,生成指标会变少
@param data:
@param indicators:
@return:
"""
for indicator in indicators:
ind_data = eval('TA.' + indicator + '(data)')
if not isinstance(ind_data, pd.DataFrame):
ind_data = ind_data.to_frame()
data = data.merge(ind_data, left_index=True, right_index=True)
return data
if pid in self._index:
data = indicator_calcu(data, self.INDICATORS)
# Instead of using the actual volume value (which changes over time), we normalize it with a moving volume average
data['normVol'] = data['volume'] / data['volume'].ewm(5).mean()
# get relative values
data['relativeOpen'] = data['open'] / data['close'].shift(1)
data['relativeHigh'] = data['high'] / data['close'].shift(1)
data['relativeLow'] = data['low'] / data['close'].shift(1)
# Remove columns that won't be used as features
# data['close'] are still needed and will be deleted later
data.drop(['open', 'high', 'low', 'volume'], axis=1, inplace=True)
elif pid in self._fund:
indicator_calcu(data, self.FUND_INDICATORS)
# Also calculate moving averages for features
data['ema50'] = data['close'] / data['close'].ewm(50).mean()
data['ema21'] = data['close'] / data['close'].ewm(21).mean()
data['ema15'] = data['close'] / data['close'].ewm(15).mean()
data['ema5'] = data['close'] / data['close'].ewm(5).mean()
data['relativeClose'] = data['close'] / data['close'].shift(1)
return data
def build_predict_data(self, indexData, pid):
"""
@param pid: 需要预测的指数或基金id
@return:
"""
if pid in self._index:
###### get individual data from raw data
predictData = indexData[indexData['rid_index_id'] == self._indexDict[pid]].copy()
del (predictData['rid_index_id'])
###### Additional preparing SPX Data
# finta expects properly formated ohlc DataFrame, with column names in lowercase:
# ["open", "high", "low", close"] and ["volume"] for indicators that expect ohlcv input.
predictData.rename(
columns={"rid_high": 'high', 'rid_open': 'open', "rid_low": 'low', "rid_close": 'close',
'rid_volume': 'volume',
"rid_pe": "SPX_pe", "rid_pb": "SPX_pb"},
inplace=True)
elif pid in self._fund:
predictData = indexData[indexData['rfn_fund_id'] == self._indexDict[pid]].copy()
del (predictData['rfn_fund_id'])
predictData.rename(columns={"rfn_nav_cal": 'close'}, inplace=True)
predictData.set_index('date', inplace=True)
predictData.index = pd.to_datetime(predictData.index)
predictData.sort_index(inplace=True)
predictData.reset_index(inplace=True)
# Calculate the indicator data
predictData = self.get_indicator_data(predictData, pid)
# Calculate Historical Return and Volatility
predictData['R1W'] = np.log(predictData['close'] / predictData['close'].shift(self._win1W))
predictData['R1M'] = np.log(predictData['close'] / predictData['close'].shift(self._win1M))
predictData['R1Q'] = np.log(predictData['close'] / predictData['close'].shift(self._win1Q))
price_list = predictData['close']
rollist = price_list.rolling(self._win1W)
predictData['Vol_1W'] = rollist.std(ddof=0)
rollist = price_list.rolling(self._win1M)
predictData['Vol_1M'] = rollist.std(ddof=0)
rollist = price_list.rolling(self._win1Q)
predictData['Vol_1Q'] = rollist.std(ddof=0)
# The following uses future info for the y label, to be deleted later
predictData['futureR'] = np.log(predictData['close'].shift(-self._numForecastDays) / predictData['close'])
# predictData = predictData[predictData['futureR'].notna()]
predictData['yLabel'] = (predictData['futureR'] >= self._theThreshold).astype(int)
spxDataCloseSave = predictData[['date', 'close']]
del (predictData['close'])
return predictData
def build_train_test(self, pid, indexData, vixData, indexOtherData, cpiData, FDTRData, NAPMPMIData):
###### Merge Data to one table
predictData = self.build_predict_data(indexData, pid)
if (self._toForecast):
forecastDay = predictData['date'].iloc[-1]
DataAll = pd.merge(predictData, vixData, how='outer', on='date')
DataAll = pd.merge(DataAll, indexOtherData, how='outer', on='date')
DataAll = pd.merge(DataAll, cpiData, how='outer', on='date')
DataAll = pd.merge(DataAll, FDTRData, how='outer', on='date')
DataAll = pd.merge(DataAll, NAPMPMIData, how='outer', on='date')
DataAll.set_index('date', inplace=True)
DataAll.sort_index(inplace=True)
DataAll.reset_index(inplace=True)
###### fill eco data
for col in ['CPI_YOY', 'CPURNSA', 'CPI_MOM', 'CPI_MOM_Diff']:
DataAll[col].bfill(inplace=True)
for col in ['FDTR']:
DataAll[col].ffill(inplace=True)
# 新增指数NAPMPMI :美國的ISM製造業指數 (Monthly)
for col in ['NAPMPMI']:
DataAll[col].bfill(inplace=True)
DataAll[col].ffill(inplace=True)
if (self._toForecast):
# 处理CPI_YOY:美国城镇消费物价指数同比未经季 CPURNSA:美国消费者物价指数未经季调
DataAllCopy = DataAll.copy()
for col in ['CPI_YOY', 'CPURNSA']:
DataAllCopy[col].ffill(inplace=True)
for col in ['CPI_MOM', 'CPI_MOM_Diff']:
DataAllCopy[col] = DataAllCopy[col].fillna(0)
DataAllCopy.drop(['futureR', 'yLabel'], axis=1, inplace=True)
forecastDayIndex = DataAllCopy.index[DataAllCopy['date'] == forecastDay]
forecastData = DataAllCopy.iloc[forecastDayIndex.to_list(), 1:]
X_forecast = forecastData.to_numpy()
del DataAllCopy
###### clean NaN
DataAll.dropna(inplace=True)
DataAll.reset_index(inplace=True, drop=True)
###### get X and y
y = DataAll['yLabel'].to_numpy(copy=True)
# delete future information
DataAll.drop(['futureR', 'yLabel'], axis=1, inplace=True)
X = DataAll.iloc[:, 1:].values
###################
# scale data
scaler = MinMaxScaler(feature_range=(0, 1))
# scaledX = scaler.fit_transform(X)
DataScaler = scaler.fit(X)
scaledX = DataScaler.transform(X)
if (self._toForecast):
scaledX_forecast = DataScaler.transform(X_forecast)
X_train = scaledX
y_train = y
X_test = []
y_test = []
else:
# Step 2: Split data into train set and test set
X_train, X_test, y_train, y_test = train_test_split(scaledX, y, test_size=0.02, shuffle=False)
# To avoid data leak, test set should start from numForecastDays later
X_test = X_test[self._numForecastDays:]
y_test = y_test[self._numForecastDays:]
return X_train, X_test, y_train, y_test, scaledX_forecast, forecastDay
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment