Commit 80567726 authored by 吕先亚's avatar 吕先亚

ai

parent bfec2eee
......@@ -15,22 +15,21 @@ from api import DataSync
# 截止日期
max_date = None
# max_date = '2024-03-20'
# max_date = '2024-01-11'
toForecast = True # False means test, True means forecast
syncData = True # 开启会同步数据库指数及基金数据
uploadData = True # 开启会上传预测结果
doReport = True # 开启会生成Excel报告
syncData = False # 开启会同步数据库指数及基金数据
uploadData = False # 开启会上传预测结果
doReport = False # 开启会生成Excel报告
# 待预测指数
# PREDICT_LIST = [67, 121, 122, 123]
PREDICT_LIST = [67, 121, 122, 123, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163,
164, 165, 166, 167, 168, 169, 170, 171, 174, 175, 177, 178]
eco = [65, 66, 74, 134]
PREDICT_LIST = [156]
# PREDICT_LIST = [67, 121, 122, 123, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163,
# 164, 165, 166, 167, 168, 169, 170, 171, 174, 175, 177, 178]
eco = [65, 66, 74, 134, 191]
index = [67, 68, 69, 70, 71, 72, 73, 75, 76, 77, 105, 106, 116, 117, 138, 139, 142, 143, 140, 141, 144, 145, 146]
fund = [121, 122, 123, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165,
166, 167, 168, 169, 170, 171, 174, 175, 177, 178]
# fund = [121, 122, 123, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165,
# 166, 167, 168, 169, 170, 171, 174, 175, 177, 178]
fund = [156]
@autowired
......@@ -119,6 +118,7 @@ if __name__ == '__main__':
FDTRData = data_access.get_fdtr(ecoData)
# 新增指标 NAPMPMI :美國的ISM製造業指數 (Monthly)
NAPMPMIData = data_access.get_napmpmi(ecoData)
TTM = data_access.get_jifu_spx_opeps_currq_ttm(ecoData)
builder = TrainingDataBuilder(index, eco, fund, indexDict, toForecast, win1W, win1M, win1Q, numForecastDays,
theThreshold)
......@@ -126,7 +126,7 @@ if __name__ == '__main__':
print(f'{indexDict[pid]} start '.center(50, '='))
t_data = indexData if pid in index else fundData
X_train, X_test, y_train, y_test, scaledX_forecast, forecastDay = \
builder.build_train_test(pid, t_data, vixData, indexOtherData, cpiData, FDTRData, NAPMPMIData)
builder.build_train_test(pid, t_data, vixData, indexOtherData, cpiData, FDTRData, NAPMPMIData,TTM)
trainer = ModelTrainer(toForecast)
rf_model = trainer.train_random_forest(X_train, y_train, X_test, y_test)
gbt_model = trainer.train_GBT(X_train, y_train, X_test, y_test)
......
......@@ -36,8 +36,8 @@ class DataAccess(ABC):
def get_eco_datas(self):
ecoData = pd.DataFrame(
get_eco_list(eco_ids=self._eco, max_date=self._max_date))
ecoData = ecoData[["red_eco_id", "red_date", "red_indicator"]]
ecoData.rename(columns={"red_date": 'date'}, inplace=True) # please use 'date'
ecoData = ecoData[["red_eco_id", "red_release_date", "red_indicator"]]
ecoData.rename(columns={"red_release_date": 'date'}, inplace=True) # please use 'date'
ecoData["red_eco_id"] = ecoData["red_eco_id"].map(self._indexDict)
return ecoData
......@@ -118,3 +118,12 @@ class DataAccess(ABC):
NAPMPMIData.set_index('date', inplace=True)
NAPMPMIData.index = pd.to_datetime(NAPMPMIData.index)
return NAPMPMIData
def get_jifu_spx_opeps_currq_ttm(self, ecoData):
# 新增指标 SP500 Operating EPS Current Quarter TTM
ttm = ecoData[ecoData['red_eco_id'] == "JIFU_SPX_OPEPS_CURRQ_TTM"].copy()
del (ttm['red_eco_id'])
ttm.rename(columns={"red_indicator": 'JIFU_SPX_OPEPS_CURRQ_TTM'}, inplace=True)
ttm.set_index('date', inplace=True)
ttm.index = pd.to_datetime(ttm.index)
return ttm
......@@ -124,7 +124,7 @@ class TrainingDataBuilder(ABC):
del (predictData['close'])
return predictData
def build_train_test(self, pid, indexData, vixData, indexOtherData, cpiData, FDTRData, NAPMPMIData):
def build_train_test(self, pid, indexData, vixData, indexOtherData, cpiData, FDTRData, NAPMPMIData,TTM):
###### Merge Data to one table
predictData = self.build_predict_data(indexData, pid)
forecastDay = None
......@@ -135,26 +135,12 @@ class TrainingDataBuilder(ABC):
DataAll = pd.merge(DataAll, cpiData, how='outer', on='date')
DataAll = pd.merge(DataAll, FDTRData, how='outer', on='date')
DataAll = pd.merge(DataAll, NAPMPMIData, how='outer', on='date')
DataAll = pd.merge(DataAll, TTM, how='outer', on='date')
DataAll.set_index('date', inplace=True)
DataAll.sort_index(inplace=True)
DataAll.reset_index(inplace=True)
###### fill eco data
for col in ['CPI_YOY', 'CPURNSA', 'CPI_MOM', 'CPI_MOM_Diff']:
DataAll[col].bfill(inplace=True)
for col in ['FDTR']:
DataAll[col].ffill(inplace=True)
# 新增指数NAPMPMI :美國的ISM製造業指數 (Monthly)
for col in ['NAPMPMI']:
DataAll[col].bfill(inplace=True)
DataAll[col].ffill(inplace=True)
for col in DataAll.columns:
if col not in ['CPI_YOY', 'CPURNSA', 'CPI_MOM', 'CPI_MOM_Diff', 'futureR', 'yLabel']:
DataAll[col].ffill(inplace=True)
DataAll.ffill(inplace=True)
if (self._toForecast):
# 处理CPI_YOY:美国城镇消费物价指数同比未经季 CPURNSA:美国消费者物价指数未经季调
DataAllCopy = DataAll.copy()
......
This diff is collapsed.
This diff is collapsed.
CREATE TABLE IF NOT EXISTS robo_assets_pool
(
rap_id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
rap_date DATETIME NOT NULL COMMENT '数据日期',
rap_type TINYINT NOT NULL COMMENT '资产池类别',
rap_asset_ids JSON DEFAULT NULL COMMENT '基金ID',
rap_create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
rap_update_time DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (rap_id),
UNIQUE INDEX (rap_date, rap_type),
INDEX (rap_type)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT '资产池';
CREATE TABLE IF NOT EXISTS robo_indicator
(
`ri_rbd_id` bigint(20) NOT NULL,
`ri_date` datetime NOT NULL,
`ri_annual` double NOT NULL,
`ri_sortino` json NULL,
`ri_create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`ri_update_time` datetime NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
UNIQUE INDEX `ri_rbd_id`(`ri_rbd_id`, `ri_date`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
\ No newline at end of file
import json
from py_jftech import read, write, where, format_date
from api import AssetPoolType
__COLUMNS__ = {
'rap_id': 'id',
'rap_date': 'date',
'rap_type': 'type',
'rap_asset_ids': 'asset_ids',
}
@read
def get_list(max_date=None, min_date=None, type: AssetPoolType = None):
sqls = []
if max_date:
sqls.append(f"rap_date <= '{format_date(max_date)}'")
if min_date:
sqls.append(f"rap_date >= '{format_date(min_date)}'")
return f'''
select {','.join([f"`{x[0]}` as `{x[1]}`" for x in __COLUMNS__.items()])} from robo_assets_pool
{where(*sqls, rap_type=type)} order by rap_type, rap_date
'''
@read(one=True)
def get_one(day, type: AssetPoolType):
return f'''select {','.join([f"`{x[0]}` as `{x[1]}`" for x in __COLUMNS__.items()])} from robo_assets_pool {where(rap_date=day, rap_type=type)}'''
@read(one=True)
def get_last_one(type: AssetPoolType = None, day=None):
sql = f"rap_date <= '{format_date(day)}'" if day else None
return f'''select {','.join([f"`{x[0]}` as `{x[1]}`" for x in __COLUMNS__.items()])} from robo_assets_pool {where(sql, rap_type=type)} order by rap_date desc limit 1'''
@write
def insert(day, type: AssetPoolType, pool: list):
return f'''
insert into robo_assets_pool(rap_date, rap_type, rap_asset_ids)
values ('{format_date(day)}', {type.value},'{json.dumps(pool)}')
'''
@write
def delete(day=None):
if day:
return f"delete from robo_assets_pool where rap_date >= '{format_date(day)}'"
else:
return 'truncate table robo_assets_pool'
from py_jftech import write, mapper_columns
__COLUMNS__ = {
'ri_rbd_id': 'id',
'ri_date': 'date',
'ri_annual': 'annual',
'ri_sortino': 'sortino',
}
@write
def insert(datas):
datas = [mapper_columns(datas=x, columns=__COLUMNS__, ignore_none=False) for x in datas]
values = ','.join(
[f'''({','.join([(f"'{x[j]}'" if j in x and x[j] is not None else 'null') for j in __COLUMNS__.keys()])})''' for
x in datas])
return f'''insert into robo_indicator({','.join(__COLUMNS__.keys())}) values {values}'''
@write
def update_sortino(id, date, sortino):
return f'''update robo_indicator set ri_sortino='{sortino}' where ri_rbd_id={id} and ri_date='{date}' '''
@write
def clear():
return 'TRUNCATE robo_indicator'
from datetime import datetime as dt
from py_jftech import component, autowired
from api import AssetPool, AssetOptimize
from asset_pool.dao import robo_assets_pool as rap, robo_indicator
@component
class FundAssetPool(AssetPool):
@autowired
def __init__(self, optimize: AssetOptimize = None):
self._optimize = optimize
def get_pool(self, day=dt.today()):
return self._optimize.get_optimize_pool(day)
def clear(self, day=None):
rap.delete(day)
robo_indicator.clear()
import logging
import unittest
from py_jftech import autowired, parse_date
from api import AssetOptimize
logger = logging.getLogger(__name__)
class AssetPoolTest(unittest.TestCase):
@autowired(names={'asset': 'dividend'})
def test_dividend_asset_optimize(self, asset: AssetOptimize = None):
asset.get_optimize_pool(parse_date('2023-03-01'))
if __name__ == '__main__':
unittest.main()
import os
from datetime import datetime, timedelta
from pathlib import Path
import pandas as pd
import pytz
import requests
from openpyxl.reader.excel import load_workbook
def is_dst():
"""
判断当前时区是否实行夏令时
@return:
"""
tz = pytz.timezone('America/New_York')
now = datetime.now(tz)
return now.dst() != timedelta(0)
def usa_close_day():
"""
美股收盘时间,收盘后,日期+1天
@return:
"""
tz = pytz.timezone('America/New_York')
now = datetime.now(tz)
if is_dst():
# 夏令时
if now.hour > 16:
now = now + timedelta(1)
else:
# 冬令时
if now.hour > 17:
now = now + timedelta(1)
return now.strftime("%Y%m%d")
def get_quarter_end_date(date=None):
"""
@return: 当前日期所在季度的最后一天日期
"""
# 获取当前日期
if date is None:
date = datetime.now()
# 计算当前季度
current_quarter = (date.month - 1) // 3 + 1
# 计算季度的最后一个月份
if current_quarter == 1:
quarter_end_month = 3
elif current_quarter == 2:
quarter_end_month = 6
elif current_quarter == 3:
quarter_end_month = 9
else:
quarter_end_month = 12
# 计算季度末的日期
quarter_end_date = datetime(date.year, quarter_end_month, 1) + timedelta(days=31) - timedelta(
days=datetime(date.year, quarter_end_month, 1).day)
return quarter_end_date
def list_files_sorted_by_name(directory, max_day=None):
"""
文件排序
@param directory: 所在目录
@param max_day: 期望最大日期
@return: 返回日期小于max_day的所有文件
"""
files = []
for root, dirs, filenames in os.walk(directory):
for filename in filenames:
files.append(os.path.join(root, filename))
files.sort() # 默认是按照字典序排序
if max_day:
files = [f for f in files[:-1] if str(f)[-13:-5] >= max_day.strftime("%Y%m%d")]
return files
def fetch_sp500():
temp_file = Path(__file__).parent/'resources/sp-500.xlsx'
response = requests.get("https://www.spglobal.com/spdji/en/documents/additional-material/sp-500-eps-est.xlsx")
# 确保请求成功
if response.status_code == 200:
# 保存临时文件
with open(temp_file, 'wb') as f:
f.write(response.content)
else:
print(f"Failed to retrieve file: {response.status_code}")
def save_sp500():
fetch_sp500()
files = list_files_sorted_by_name(Path(__file__).parent/'resources')[-2:]
compare_day = None
for file in files:
# 使用openpyxl加载Excel文件
wb = load_workbook(filename=file, data_only=True)
ws = wb['ESTIMATES&PEs']
# 读取特定单元格的值
report_day = ws['A2'].value
if compare_day is None:
compare_day = report_day
else:
if compare_day != report_day:
wb.save(Path(__file__).parent/f'resources/sp-500-eps-est_USA{usa_close_day()}.xlsx')
# 关闭工作簿
wb.close()
def sync_sp500(day):
file = Path(__file__).parent/'resources/sp-500-eps-est_USA20241014.xlsx'
if day:
files = list_files_sorted_by_name(Path(__file__).parent / 'resources', day)
if files:
file = files[-1]
else:
return []
wb = load_workbook(filename=file, data_only=True)
ws = wb['ESTIMATES&PEs']
estimates = "ESTIMATES"
estimates_row = 0
actuals = "ACTUALS"
actuals_row = 0
datas = []
# 遍历A列
for row in range(100, 300):
cell_value = ws[f'A{row}'].value
if cell_value and estimates == str(cell_value):
estimates_row = row
if cell_value and actuals == str(cell_value):
actuals_row = row
break
report_day = ws['A2'].value
for i in range(estimates_row + 1, actuals_row):
if ws[f'A{i}'].value is None:
break
date_value = datetime.strptime(str(ws[f'A{i}'].value).split(' ')[0].strip(), '%m/%d/%Y') if type(
ws[f'A{i}'].value) == str else ws[f'A{i}'].value
if date_value < report_day:
data = {'date': date_value,
'eps': ws[f'C{i}'].value}
data["releaseDate"] = data['date'] + timedelta(days=1)
datas.append(data)
elif date_value == get_quarter_end_date(report_day):
data = {'date': report_day,
'eps': ws[f'C{i}'].value,
'releaseDate': datetime.strptime(str(file)[-13:-5], "%Y%m%d")}
datas.append(data)
for i in range(actuals_row + 1, ws.max_row):
if ws[f'A{i}'].value is None:
break
data = {'date': datetime.strptime(str(ws[f'A{i}'].value).strip(), '%m/%d/%Y') if type(
ws[f'A{i}'].value) == str else ws[f'A{i}'].value,
'eps': ws[f'C{i}'].value}
data["releaseDate"] = data['date'] + timedelta(days=1)
datas.append(data)
wb.close()
datas = pd.DataFrame(datas[::-1])
datas['close'] = datas['eps'].rolling(window=4).sum().round(2)
datas.dropna(inplace=True)
return datas.to_dict(orient="records")[-1::] if day else datas.to_dict(orient="records")
if __name__ == '__main__':
# print(list_files_sorted_by_name(Path(__file__).parent / 'resources'))
# save_sp500()
sync_sp500(day=None)
......@@ -13,6 +13,7 @@ from py_jftech import format_date, is_workday, component, autowired, get_config,
from api import DatumType, DataSync, Datum
from basic.dao import robo_index_datas as rid, robo_eco_datas as red, robo_fund_navs as rfn, robo_exrate as re
from basic.sp500 import sync_sp500
logger = logging.getLogger(__name__)
......@@ -208,7 +209,7 @@ class EcoSync(JDCDataSync):
@property
def start_date(self):
return super(EcoSync, self).start_date - relativedelta(years=4)
return super().start_date - relativedelta(years=4)
@property
def datum_type(self) -> DatumType:
......@@ -219,6 +220,8 @@ class EcoSync(JDCDataSync):
return next_workday(last['date']) if last else self.start_date
def build_urls(self, datum, start_date, page=0) -> str:
if datum.get("source") == "calculating":
return None
return f'http://jdcprod.thiztech.com/api/datas/eco-value?page={page}&size=200&sourceCode={quote(datum["bloombergTicker"])}&sourceType=BLOOMBERG&startDate={format_date(start_date)}'
def store_date(self, datumid, datas: List[dict]):
......@@ -232,6 +235,33 @@ class EcoSync(JDCDataSync):
red.batch_insert(save_datas)
@component(bean_name='eco-sync-calculating')
class EcoSync(EcoSync):
def datum_start_date(self, datum_id):
last = red.get_last_one(eco_id=datum_id)
return next_workday(last['release_date']) if last else None
def do_sync(self, max_date=dt.today()):
logger.info(f'start sync datas for type[{self.datum_type}]')
for datum in self._datum.get_datums(type=self.datum_type):
if datum.get("source") == "calculating":
logger.debug(f'start sync ticker[{datum["bloombergTicker"]}]')
start_date = self.datum_start_date(datum['id'])
datas = sync_sp500(start_date)
self.store_date(datum['id'], datas)
def store_date(self, datumid, datas: List[dict]):
save_datas = [{
'eco_id': datumid,
'date': x['date'],
'indicator': x['close'],
'release_date': x['releaseDate'],
} for x in datas]
if save_datas:
red.batch_insert(save_datas)
@component(bean_name='navs-sync')
class FundNavSync(JDCDataSync):
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
from typing import List
from apscheduler.schedulers.blocking import BlockingScheduler
from py_jftech import autowired
from api import RoboExecutor
from api import DataSync
from basic.sp500 import save_sp500
@autowired(names={'executor': RoboExecutor.use_name()})
def start(executor: RoboExecutor = None):
executor.start_exec()
@autowired
def sync(syncs: List[DataSync] = None):
for s in syncs:
s.do_sync()
if __name__ == '__main__':
start()
sync()
scheduler = BlockingScheduler()
# 开启定时任务,每日抓取sp500数据
scheduler.add_job(save_sp500, 'cron', day_of_week='0-6', hour=3, minute=55)
scheduler.add_job(save_sp500, 'cron', day_of_week='0-6', hour=4, minute=00)
scheduler.add_job(save_sp500, 'cron', day_of_week='0-6', hour=4, minute=55)
scheduler.add_job(save_sp500, 'cron', day_of_week='0-6', hour=5, minute=00)
scheduler.add_job(sync, 'cron', day_of_week='0-6', hour=8, minute=00)
scheduler.start()
import json
import logging
from py_jftech import component, autowired, format_date
from pymysql import IntegrityError, constants
from api import PortfoliosBuilder, PortfoliosRisk, AssetPool, Navs, PortfoliosType, Datum, SolveType, SolverFactory, \
PortfoliosChecker
from portfolios.dao import robo_mpt_portfolios as rmp
logger = logging.getLogger(__name__)
@component(bean_name='mpt')
class MptPortfoliosBuilder(PortfoliosBuilder):
@autowired
def __init__(self, assets: AssetPool = None, navs: Navs = None, datum: Datum = None, factory: SolverFactory = None,
checker: PortfoliosChecker = None):
self._assets = assets
self._navs = navs
self._datum = datum
self._factory = factory
self._checker = checker
def get_portfolios(self, day, risk: PortfoliosRisk, type: PortfoliosType = PortfoliosType.NORMAL):
try:
portfolio = rmp.get_one(day, type, risk)
if not portfolio:
result = self.build_portfolio(day, type)
for build_risk, datas in result.items():
datas['portfolio'] = self._checker.check(day, json.loads(datas['portfolio']))
try:
rmp.insert({
**datas,
'risk': build_risk,
'type': type,
'date': day
})
except IntegrityError as e:
code, msg = e.args
if code != constants.ER.DUP_ENTRY:
raise e
portfolio = rmp.get_one(day, type, risk)
if SolveType(portfolio['solve']) is not SolveType.INFEASIBLE:
result = json.loads(portfolio['portfolio'])
return {int(x[0]): x[1] for x in result.items()}
return None
except Exception as e:
logger.exception(
f"build portfolio of type[{type.name}] and risk[{risk.name}] with date[{format_date(day)}] failure.", e)
raise e
def build_portfolio(self, day, type: PortfoliosType):
result = {}
portfolios = {}
for risk in PortfoliosRisk:
logger.info(
f"start to build protfolio of type[{type.name}] and risk[{risk.name}] with date[{format_date(day)}]")
solver = self._factory.create_solver(risk, type)
navs_group = solver.reset_navs(day)
for category, navs in navs_group.items():
# count = solver.get_config('asset-count')[0]
# nav_count = len(navs.columns)
# if count <= nav_count:
# pass
solver.set_navs(navs)
solver.set_category(category)
logger.debug({
'Khist': len(solver.rtn_history),
'beta': solver.get_config('mpt.cvar-beta'),
'Kbeta': solver.k_beta,
})
max_rtn, max_var, minCVaR_whenMaxR = solver.solve_max_rtn()
min_rtn, min_var, maxCVaR_whenMinV = solver.solve_min_rtn()
portfolio, cvar = solver.solve_mpt(min_rtn, max_rtn)
portfolios = {**portfolios, **portfolio}
result[risk] = {
'solve': SolveType.MPT,
'portfolio': json.dumps(portfolios),
} if portfolios else {
'solve': SolveType.INFEASIBLE
}
return result
def clear(self, day=None, risk: PortfoliosRisk = None):
rmp.delete(min_date=day, risk=risk)
@component(bean_name='mpt')
class PoemPortfoliosBuilder(MptPortfoliosBuilder):
def build_portfolio(self, day, type: PortfoliosType):
result = {}
portfolios = {}
for risk in PortfoliosRisk:
solver = self._factory.create_solver(risk, type)
navs_group = solver.reset_navs(day)
for category, navs in navs_group.items():
solver.set_navs(navs)
solver.set_category(category)
max_rtn, max_var, minCVaR_whenMaxR = solver.solve_max_rtn()
min_rtn, min_var, maxCVaR_whenMinV = solver.solve_min_rtn()
mpt_portfolio, mpt_cvar = solver.solve_mpt(min_rtn, max_rtn)
portfolio, cvar = solver.solve_poem(min_rtn, max_rtn, mpt_cvar, maxCVaR_whenMinV)
if not portfolio:
portfolio = mpt_portfolio
portfolios = {**portfolios, **portfolio}
if portfolios:
result[risk] = {
'solve': SolveType.POEM,
'portfolio': json.dumps(portfolios),
}
return result
@component(bean_name='mpt')
class MptARCPortfoliosBuilder(MptPortfoliosBuilder):
def get_portfolios(self, day, risk: PortfoliosRisk, type: PortfoliosType = PortfoliosType.NORMAL):
try:
portfolio = rmp.get_one(day, type, risk)
if not portfolio:
result, detail = self.build_portfolio(day, type)
for build_risk, datas in result.items():
datas['portfolio'] = self._checker.check(day, json.loads(datas['portfolio']))
try:
rmp.insert({
**datas,
'risk': build_risk,
'type': type,
'date': day
})
except IntegrityError as e:
code, msg = e.args
if code != constants.ER.DUP_ENTRY:
raise e
portfolio = rmp.get_one(day, type, risk)
if SolveType(portfolio['solve']) is not SolveType.INFEASIBLE:
result = json.loads(portfolio['portfolio'])
return {int(x[0]): x[1] for x in result.items()}
return None
except Exception as e:
logger.exception(
f"build protfolio of type[{type.name}] and risk[{risk.name}] with date[{format_date(day)}] failure.",
exc_info=e)
raise e
def build_portfolio(self, day, type: PortfoliosType):
result = {}
detail = {}
risk = PortfoliosRisk.FT3
logger.info(
f"start to build protfolio of type[{type.name}] and risk[{risk.name}] with date[{format_date(day)}]")
solver = self._factory.create_solver(risk, type)
solver.reset_navs(day)
logger.debug({
'Khist': len(solver.rtn_history),
'beta': solver.get_config('mpt.cvar-beta'),
'Kbeta': solver.k_beta,
})
max_rtn, max_var, minCVaR_whenMaxR = solver.solve_max_rtn()
min_rtn, min_var, maxCVaR_whenMinV = solver.solve_min_rtn()
portfolio, cvar = solver.solve_mpt(min_rtn, max_rtn)
result[risk] = {
'solve': SolveType.MPT,
'portfolio': json.dumps(portfolio),
'cvar': cvar
} if portfolio else {
'solve': SolveType.INFEASIBLE
}
detail[risk] = {
'max_rtn': max_rtn,
'max_var': max_var,
'minCVaR_whenMaxR': minCVaR_whenMaxR,
'min_rtn': min_rtn,
'min_var': min_var,
'maxCVaR_whenMinV': maxCVaR_whenMinV,
}
return result, detail
@component(bean_name='mpt')
class PoemARCPortfoliosBuilder(MptARCPortfoliosBuilder):
def build_portfolio(self, day, type: PortfoliosType):
result, detail = super(PoemARCPortfoliosBuilder, self).build_portfolio(day, type)
risk = PortfoliosRisk.FT3
# if result[risk]['solve'] is SolveType.INFEASIBLE:
# continue
solver = self._factory.create_solver(risk, type)
solver.reset_navs(day)
min_rtn = detail[risk]['min_rtn']
max_rtn = detail[risk]['max_rtn']
mpt_cvar = result[risk]['cvar']
maxCVaR_whenMinV = detail[risk]['maxCVaR_whenMinV']
portfolio, cvar = solver.solve_poem(min_rtn, max_rtn, mpt_cvar, maxCVaR_whenMinV)
if portfolio:
result[risk] = {
'solve': SolveType.POEM,
'portfolio': json.dumps(portfolio),
'cvar': cvar
}
detail[risk]['mpt_cvar'] = mpt_cvar
return result, detail
@component(bean_name='mpt')
class RiskParityARCPortfoliosBuilder(MptPortfoliosBuilder):
def build_portfolio(self, day, type: PortfoliosType):
result = {}
risk = PortfoliosRisk.FT3
logger.info(
f"start to build protfolio of type[{type.name}] and risk[{risk.name}] with date[{format_date(day)}]")
solver = self._factory.create_solver(risk, type)
solver.reset_navs(day)
portfolio = solver.solve_risk_parity()
result[risk] = {
'solve': SolveType.RISK_PARITY,
'portfolio': json.dumps(portfolio),
} if portfolio else {
'solve': SolveType.INFEASIBLE
}
return result
import logging
from py_jftech import autowired, component, get_config
from api import AssetOptimize, PortfoliosChecker, Datum, Navs, DatumType
logger = logging.getLogger(__name__)
@component(bean_name='checker')
class DefaultPortfoliosChecker(PortfoliosChecker):
@autowired
def __init__(self, asset: AssetOptimize = None, navs: Navs = None, datum: Datum = None):
self._asset = asset
self._navs = navs
self._datum = datum
self._config = get_config(__name__)
def check(self, day=None, portfolios: dict = None):
if not self._config.get('switch'):
return portfolios
funds = self._datum.get_datums(type=DatumType.FUND)
company = {f"{fund['id']}": fund['companyType'] for fund in funds}
customType = {f"{fund['id']}": fund['customType'] for fund in funds}
companies = set(company[key] for key in portfolios.keys())
# 同时出现全部是ft或美盛基金的情况
if len(companies) == 1:
# step1: 检查原始投组的customType。检查顺序用列表呈现,依序进行
priority = self._config.get('custom-type-priority')
for p in priority:
# 找出对应优先级序列的基金列表
keys = [key for key in portfolios.keys() if customType[key] == p]
# 若存在匹配值则执行后跳出循环
if len(keys) > 0:
# 选取非同公司的、风险等级小于等于原基金的 基金
min_risk = min(fund['risk'] for fund in funds if str(fund['id']) in keys)
ids = [fund['id'] for fund in funds if fund['companyType'] != list(companies)[0] and
fund['risk'] <= min_risk]
if len(ids) == 0:
continue
best = self.find_highest_score(ids, day)
# 若刚好有一个匹配,直接替换
if len(keys) == 1:
portfolios[best] = portfolios[keys[0]]
# 删除原始键
del portfolios[keys[0]]
else:
# 算分,把分低的替换掉
scores = self.do_score(keys, day)
weight_scores = {key: scores[key] * portfolios[key] for key in keys}
lowest = min(scores, key=lambda k: weight_scores[k])
portfolios[best] = portfolios[lowest]
# 删除原始键
del portfolios[lowest]
break
return portfolios
def do_score(self, ids, day):
optimize = self._asset.find_optimize(fund_ids=ids, day=day)
scores = optimize[1].to_dict()
id_score = {}
for k, v in scores.items():
id_score[f'{ids[k]}'] = v
return id_score
def find_highest_score(self, ids, day):
optimize = self._asset.find_optimize(fund_ids=ids, day=day)
return optimize[0][0]
CREATE TABLE IF NOT EXISTS robo_mpt_portfolios
(
rmp_id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
rmp_date DATETIME NOT NULL COMMENT '日期',
rmp_risk TINYINT NOT NULL COMMENT '风险等级',
rmp_type VARCHAR(255) NOT NULL COMMENT '投组类型',
rmp_rolve TINYINT NOT NULL COMMENT '求解方式',
rmp_portfolio JSON DEFAULT NULL COMMENT '投组权重',
rmp_cvar DOUBLE DEFAULT NULL COMMENT '投组cvar',
rmp_create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
rmp_update_time DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (rmp_id),
UNIQUE INDEX (rmp_date, rmp_risk, rmp_type),
INDEX (rmp_risk),
INDEX (rmp_type)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT '最优投组表';
CREATE TABLE IF NOT EXISTS robo_hold_portfolios
(
`rhp_id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT,
`rhp_date` datetime NOT NULL COMMENT '日期',
`rhp_risk` tinyint(4) NOT NULL COMMENT '风险等级',
`rhp_rrs_id` bigint(20) UNSIGNED NULL DEFAULT NULL COMMENT '调仓信号id',
`rhp_rebalance` tinyint(4) NOT NULL DEFAULT 0 COMMENT '是否调仓',
`rhp_portfolios` json NOT NULL COMMENT '投组信息',
`rhp_fund_av` double(12, 4) NOT NULL COMMENT '投组原始净值,sum(个股原始净值*对应份额)',
`rhp_fund_nav` double(12, 4) NOT NULL DEFAULT 0.0000 COMMENT '基金被动配息做配股',
`rhp_nav` double(12, 4) NOT NULL COMMENT '复权净值',
`rhp_asset_nav` double(12, 4) NOT NULL COMMENT '产品净值,投顾模式:fund_av',
`rhp_div_forecast` double(12, 4) NOT NULL DEFAULT 0.0000 COMMENT '预配息金额',
`rhp_div_acc` double(12, 4) NOT NULL COMMENT '累计配息金额,投顾:acc(port_div + fund_div)',
`rhp_port_div` double(12, 4) NOT NULL COMMENT '主动配息',
`rhp_cash` double(12, 4) NOT NULL DEFAULT 0.0000 COMMENT '现金(产品的现金账户)',
`rhp_fund_div` double(12, 4) NOT NULL COMMENT '持有基金配息sum(个股每股配息*对应份额)',
`rhp_create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`rhp_update_time` datetime NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
`v_nav_div_acc` double(12, 4) GENERATED ALWAYS AS ((`rhp_asset_nav` + `rhp_div_acc`)) VIRTUAL COMMENT '产品累计净值 asset_nav+ acc_div' NOT NULL,
PRIMARY KEY (`rhp_id`) USING BTREE,
UNIQUE INDEX `rhp_date`(`rhp_date`, `rhp_risk`) USING BTREE,
INDEX `rhp_risk`(`rhp_risk`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '持仓投组表' ROW_FORMAT = Dynamic;
\ No newline at end of file
from py_jftech import read, where, write, format_date, mapper_columns
from api import PortfoliosRisk
__COLUMNS__ = {
'rhp_id': 'id',
'rhp_date': 'date',
'rhp_risk': 'risk',
'rhp_div_acc': 'div_acc',
'rhp_rrs_id': 'signal_id',
'rhp_rebalance': 'rebalance',
'rhp_portfolios': 'portfolios',
'rhp_nav': 'nav',
'rhp_cash': 'cash',
'rhp_fund_av': 'fund_av',
'rhp_fund_nav': 'fund_nav',
'rhp_fund_div': 'fund_div',
'rhp_div_forecast': 'div_forecast',
'rhp_asset_nav': 'asset_nav',
'rhp_port_div': 'port_div',
'v_nav_div_acc': 'acc_av',
}
@read
def get_list(risk: PortfoliosRisk = None, min_date=None, max_date=None, rebalance: bool = None):
sqls = []
if min_date:
sqls.append(f"rhp_date >= '{format_date(min_date)}'")
if max_date:
sqls.append(f"rhp_date <= '{format_date(max_date)}'")
return f'''
select {','.join([f'{x[0]} as {x[1]}' for x in __COLUMNS__.items()])} from robo_hold_portfolios
{where(*sqls, rhp_risk=risk, rhp_rebalance=rebalance)} order by rhp_risk, rhp_date
'''
@read(one=True)
def get_one(day, risk: PortfoliosRisk):
return f'''select {','.join([f'{x[0]} as {x[1]}' for x in __COLUMNS__.items()])} from robo_hold_portfolios {where(rhp_date=day, rhp_risk=risk)}'''
@read(one=True)
def get_last_one(risk: PortfoliosRisk = None, max_date=None, rebalance: bool = None, signal_id=None):
sql = f"rhp_date <= '{format_date(max_date)}'" if max_date else None
return f'''
select {','.join([f'{x[0]} as {x[1]}' for x in __COLUMNS__.items()])} from robo_hold_portfolios
{where(sql, rhp_risk=risk, rhp_rrs_id=signal_id, rhp_rebalance=rebalance)}
order by rhp_date desc limit 1
'''
def get_count(risk: PortfoliosRisk = None):
@read(one=True)
def exec():
return f'''select count(*) as `count` from robo_hold_portfolios {where(rhp_risk=risk)}'''
result = exec()
return result['count']
@write
def insert(datas):
datas = mapper_columns(datas=datas, columns=__COLUMNS__)
return f'''
insert into robo_hold_portfolios({','.join([x for x in datas.keys()])})
values ({','.join([f"'{x[1]}'" for x in datas.items()])})
'''
@write
def delete(min_date=None, risk: PortfoliosRisk = None):
if min_date is None and risk is None:
return 'truncate table robo_hold_portfolios'
else:
sql = f"rhp_date >= '{format_date(min_date)}'" if min_date else None
return f"delete from robo_hold_portfolios {where(sql, rhp_risk=risk)}"
from py_jftech import read, write, where, format_date, mapper_columns
from api import PortfoliosRisk, PortfoliosType
__COLUMNS__ = {
'rmp_id': 'id',
'rmp_date': 'date',
'rmp_risk': 'risk',
'rmp_type': 'type',
'rmp_rolve': 'solve',
'rmp_portfolio': 'portfolio',
'rmp_cvar': 'cvar',
'rmp_create_time': 'create_time'
}
@write
def insert(datas):
datas = mapper_columns(datas=datas, columns=__COLUMNS__)
return f'''
insert into robo_mpt_portfolios({','.join([x for x in datas.keys()])})
values ({','.join([f"'{x[1]}'" for x in datas.items()])})
'''
@write
def delete(min_date=None, risk: PortfoliosRisk = None):
if min_date is None and risk is None:
return 'truncate table robo_mpt_portfolios'
else:
sql = f"rmp_date >= '{format_date(min_date)}'" if min_date else None
return f"delete from robo_mpt_portfolios {where(sql, rmp_risk=risk)}"
@read(one=True)
def get_one(day, type: PortfoliosType, risk: PortfoliosRisk):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_mpt_portfolios
{where(rmp_date=day, rmp_risk=risk, rmp_type=type)}
'''
@read
def get_list(max_date=None, min_date=None, type: PortfoliosType = None, risk: PortfoliosRisk = None):
sqls = []
if max_date:
sqls.append(f"rmp_date <= '{format_date(max_date)}'")
if min_date:
sqls.append(f"rmp_date >= '{format_date(min_date)}'")
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_mpt_portfolios
{where(*sqls, rmp_risk=risk, rmp_type=type)}
order by rmp_date
'''
@read(one=True)
def get_last_one(date=None, type: PortfoliosType = None, risk: PortfoliosRisk = None):
sqls = []
if date:
sqls.append(f"rmp_date <= '{format_date(date)}'")
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_mpt_portfolios
{where(*sqls, rmp_risk=risk, rmp_type=type)}
order by rmp_date desc limit 1
'''
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
import logging
import unittest
from py_jftech import autowired, parse_date
from api import PortfoliosBuilder, PortfoliosType, PortfoliosRisk, PortfoliosHolder
class PortfoliosTest(unittest.TestCase):
logger = logging.getLogger(__name__)
@autowired(names={'builder': 'poem'})
def test_poem_build_portfolio(self, builder: PortfoliosBuilder = None):
result, detail = builder.build_portfolio(parse_date('2008-01-21'), PortfoliosType.NORMAL)
self.logger.info("portfolios: ")
for risk, portfolio in result.items():
self.logger.info(risk.name)
self.logger.info(portfolio)
self.logger.info(detail[risk])
@autowired(names={'builder': 'poem'})
def test_poem_get_portfolio(self, builder: PortfoliosBuilder = None):
portfolio = builder.get_portfolios(parse_date('2022-11-07'), PortfoliosRisk.FT9)
self.logger.info(portfolio)
@autowired(names={'hold': 'dividend-holder'})
def test_has_hold(self, hold: PortfoliosHolder = None):
self.logger.info(hold.has_hold(PortfoliosRisk.FT3))
@autowired(names={'hold': 'dividend-holder'})
def test_build_hold(self, hold: PortfoliosHolder = None):
hold.build_hold_portfolio(parse_date('2023-02-23'), PortfoliosRisk.FT9)
@autowired(names={'hold': 'dividend-holder'})
def test_clear(self, hold: PortfoliosHolder = None):
hold.clear()
if __name__ == '__main__':
unittest.main()
import pandas as pd
from py_jftech import autowired, get_config
from api import DatumType, Datum
risk_dict = {}
@autowired
def build_risk_dict(datum: Datum = None):
global risk_dict
if risk_dict:
pass
else:
funds = datum.get_datums(type=DatumType.FUND)
risk_dict = {fund['id']: fund['risk'] for fund in funds}
def format_weight(weight: dict, to=1) -> dict:
"""
对权重的小数点进行截取,到指定权重
@param datum:
@param weight:
@param to: 指定权重
@return:
"""
# funds = datum.get_datums(type=DatumType.FUND)
# risk_dict = {fund['id']: fund['risk'] for fund in funds}
# risk = 0
# for k, v in weight.items():
# risk += risk_dict.get(int(k)) * v
# print(risk)
build_risk_dict()
weight_series = pd.Series(weight)
weight_series = weight_series.fillna(0)
weight_series = weight_series.apply(lambda x: round(x, 2))
if weight_series.sum() == to:
return dict(weight_series)
id_sort = sorted(weight_series.to_dict().keys(), key=lambda x: risk_dict.get(int(x)))
low = get_config('portfolios.solver.mpt.low-weight')
high = get_config('portfolios.solver.mpt.high-weight')[0]
# 低风险
minidx = [i for i in id_sort if weight_series[i] < high][0]
# 高风险
maxidx = [i for i in id_sort if weight_series[i] > low][-1]
if weight_series.sum() < to:
weight_series[minidx] += to - weight_series.sum()
elif weight_series.sum() > to:
weight_series[maxidx] += to - weight_series.sum()
return dict(weight_series.apply(lambda x: round(float(x), 2)))
if __name__ == '__main__':
format_weight({"5": 0.35, "6": 0.35, "10": 0.1, "11": 0.16, "22": 0.05})
import json
from abc import ABC
from datetime import datetime as dt
from datetime import timedelta
from functools import reduce
from typing import List
import pandas as pd
from py_jftech import component, autowired, get_config, prev_workday, workday_range
from py_jftech import is_workday
from api import PortfoliosBuilder
from api import (
PortfoliosRisk, RebalanceSignal, SignalType, PortfoliosType, PortfoliosHolder,
RoboReportor, Datum, DatumType
)
from rebalance.dao import robo_rebalance_signal as rrs
@component(bean_name='base-signal')
class BaseRebalanceSignal(RebalanceSignal, ABC):
@autowired
def __init__(self, builder: PortfoliosBuilder = None):
self._builder = builder
def get_signal(self, day, risk: PortfoliosRisk):
signal = rrs.get_one(type=self.signal_type, risk=risk, date=day)
if signal:
return signal
trigger = self.need_rebalance(day, risk)
if trigger:
portfolio_type = self.portfolio_type
portfolio = self._builder.get_portfolios(day, risk, portfolio_type)
id = rrs.insert({
'date': day,
'type': self.signal_type,
'risk': risk,
'portfolio_type': portfolio_type,
'portfolio': portfolio,
'effective': 1
})
return rrs.get_by_id(id)
return None
def need_rebalance(self, day, risk: PortfoliosRisk) -> bool:
# 若记录为空则,将传入日期作为初始日期,进行build
signal = rrs.get_last_one(day, risk, SignalType.NORMAL, effective=None)
if signal:
frequency = get_config('portfolios')['holder']['warehouse-frequency']
transfer_date = get_config('portfolios')['holder']['warehouse-transfer-date']
date = pd.to_datetime(signal['date'].replace(day=transfer_date))
# 说明发生了跨月份问题
if signal['date'].day > transfer_date:
if rrs.get_count(risk=PortfoliosRisk.FT3, effective=True) > 0:
date = date + pd.DateOffset(months=1)
date = date + pd.DateOffset(months=frequency)
date = date - timedelta(days=1)
# 指定周期末的工作日
date = date if is_workday(date) else prev_workday(date)
if date == day:
return True
elif signal['date'] == day:
return True
else:
return False
else:
return True
@property
def portfolio_type(self):
return self.signal_type.p_type
@property
def signal_type(self) -> SignalType:
return SignalType.NORMAL
def get_last_signal(self, day, risk: PortfoliosRisk):
last_re = rrs.get_last_one(max_date=day, risk=risk, effective=True)
return last_re
def clear(self, min_date=None, risk: PortfoliosRisk = None):
rrs.delete(min_date=min_date, risk=risk)
@component(bean_name='signal-report')
class SignalReportor(RoboReportor):
@autowired
def __init__(self, hold: PortfoliosHolder = None, datum: Datum = None):
self._hold = hold
self._datum = datum
@property
def report_name(self) -> str:
return '調倉信號'
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
result = []
datums = {str(x['id']): x for x in self._datum.get_datums(type=DatumType.FUND, exclude=False)}
for signal in rrs.get_list(max_date=max_date, min_date=prev_workday(min_date), effective=True):
rebalance_date = self._hold.get_rebalance_date_by_signal(signal['id'])
if rebalance_date:
for fund_id, weight in json.loads(signal['portfolio']).items():
result.append({
'risk': PortfoliosRisk(signal['risk']).name,
'type': SignalType(signal['type']).name,
'signal_date': signal['date'],
'rebalance_date': rebalance_date,
'portfolio_type': PortfoliosType(signal['portfolio_type']).name,
'ft_ticker': datums[fund_id]['ftTicker'],
'bloomberg_ticker': datums[fund_id]['bloombergTicker'],
'fund_name': datums[fund_id]['chineseName'],
'weight': weight
})
return result
@component(bean_name='daily-signal-report')
class DailySignalReportor(RoboReportor):
@autowired
def __init__(self, hold: PortfoliosHolder = None, datum: Datum = None):
self._hold = hold
self._datum = datum
@property
def report_name(self) -> str:
return '每月調倉信號'
def load_report(self, max_date=prev_workday(dt.today()), min_date=None) -> List[dict]:
signals = pd.DataFrame(rrs.get_list(max_date=max_date, min_date=min_date))
signals = signals[(signals['date'].dt.date == max_date.date())]
if not signals.empty:
datum_ids = reduce(lambda x, y: x | y, signals['portfolio'].apply(lambda x: set(json.loads(x).keys())))
datums = pd.DataFrame(self._datum.get_datums(type=DatumType.FUND, datum_ids=datum_ids))
datums.set_index('id', inplace=True)
signals['risk'] = signals.apply(lambda row: PortfoliosRisk(row['risk']).name, axis=1)
signals['rebalance_type'] = signals.apply(lambda row: SignalType(row['type']).name, axis=1)
signals['portfolio_type'] = signals.apply(lambda row: PortfoliosType(row['portfolio_type']).name, axis=1)
signals['portfolio'] = signals.apply(lambda row: [x for x in json.loads(row['portfolio']).items()], axis=1)
signals = signals.explode('portfolio', ignore_index=True)
signals['weight'] = signals.apply(lambda row: format(row['portfolio'][1], '.0%'), axis=1)
signals['asset_ids'] = signals.apply(lambda row: datums.loc[int(row['portfolio'][0])]['ftTicker'], axis=1)
signals['name'] = signals.apply(lambda row: datums.loc[int(row['portfolio'][0])]['chineseName'], axis=1)
signals['lipper_id'] = signals.apply(lambda row: datums.loc[int(row['portfolio'][0])]['lipperKey'], axis=1)
signals = signals[['lipper_id', 'asset_ids', 'name', 'weight', 'risk', 'date', 'rebalance_type']]
return signals.to_dict('records')
return []
CREATE TABLE IF NOT EXISTS robo_rebalance_signal
(
rrs_id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
rrs_date DATETIME NOT NULL COMMENT '信号日期',
rrs_type TINYINT NOT NULL COMMENT '信号类型',
rrs_risk TINYINT NOT NULL COMMENT '风险等级',
rrs_p_type VARCHAR(255) DEFAULT NULL COMMENT '投组类型',
rrs_p_weight JSON DEFAULT NULL COMMENT '投组信息',
rrs_effective TINYINT NOT NULL DEFAULT 0 COMMENT '是否生效',
rrs_create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
rrs_update_time DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (rrs_id),
INDEX (rrs_date),
INDEX (rrs_type),
INDEX (rrs_risk)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT '再平衡信号表';
from py_jftech import read, write, where, format_date, mapper_columns, to_tuple
from api import SignalType, PortfoliosRisk
__COLUMNS__ = {
'rrs_id': 'id',
'rrs_date': 'date',
'rrs_type': 'type',
'rrs_risk': 'risk',
'rrs_p_type': 'portfolio_type',
'rrs_p_weight': 'portfolio',
'rrs_effective': 'effective',
'rrs_create_time': 'create_time',
}
@read
def get_list(min_date=None, max_date=None, risk: PortfoliosRisk = None, type: SignalType = None, effective: bool = None):
sqls = []
if min_date:
sqls.append(f"rrs_date >= '{format_date(min_date)}'")
if max_date:
sqls.append(f"rrs_date <= '{format_date(max_date)}'")
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
{where(*sqls, rrs_risk=risk, rrs_type=type, rrs_effective=effective)} order by rrs_risk, rrs_date
'''
@read
def get_by_ids(ids):
return f'''select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal {where(rrs_id=to_tuple(ids))}'''
@read(one=True)
def get_by_id(id):
return f'''select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal {where(rrs_id=id)}'''
@read(one=True)
def get_one(type: SignalType, risk: PortfoliosRisk, date):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
{where(rrs_date=date, rrs_type=type, rrs_risk=risk)}
'''
@read(one=True)
def get_first_after(type: SignalType, risk: PortfoliosRisk, min_date, effective=None):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
{where(f"rrs_date >= '{format_date(min_date)}'", rrs_type=type, rrs_risk=risk, rrs_effective=effective)} order by rrs_date limit 1
'''
@read(one=True)
def get_last_one(max_date, risk: PortfoliosRisk, type: SignalType = None, effective=None):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
{where(f"rrs_date <= '{format_date(max_date)}'", rrs_type=type, rrs_risk=risk, rrs_effective=effective)} order by rrs_date desc limit 1
'''
def get_count(risk: PortfoliosRisk = None, day=None, effective=None):
@read(one=True)
def exec():
return f"select count(*) as `count` from robo_rebalance_signal {where(rrs_risk=risk, rrs_date=day, rrs_effective=effective)}"
result = exec()
return result['count']
@write
def insert(datas):
datas = mapper_columns(datas=datas, columns=__COLUMNS__)
return f'''
insert into robo_rebalance_signal({','.join([x for x in datas.keys()])})
values ({','.join([f"'{x[1]}'" for x in datas.items()])})
'''
@write
def update(id, datas):
datas = mapper_columns(datas=datas, columns=__COLUMNS__)
return f'''
update robo_rebalance_signal
set {','.join([f"{x[0]} = '{x[1]}'" for x in datas.items()])}
where rrs_id = {id}
'''
@write
def delete_by_id(id):
return f"delete from robo_rebalance_signal where rrs_id = {id}"
@write
def delete(min_date=None, risk: PortfoliosRisk = None):
if min_date is None and risk is None:
return 'truncate table robo_rebalance_signal'
else:
sql = f"rrs_date >= '{format_date(min_date)}'" if min_date else None
return f"delete from robo_rebalance_signal {where(sql, rrs_risk=risk)}"
import json
from datetime import datetime as dt
from typing import List
from urllib.parse import urlencode
import pandas as pd
import requests
from py_jftech import component, filter_weekend, next_workday, get_config, format_date
from api import RoboReportor
from reports.dao import robo_benckmark as rb
config = get_config(__name__)
@component(bean_name='benckmark-report')
class BenchmarkAlligamReportor(RoboReportor):
@property
def report_name(self) -> str:
return 'BENCHMARK_ALLIGAM'
@property
def module_name(self) -> str:
return 'divrobo'
@property
def risk(self):
return 'alligam'
@property
def base_params(self):
return {
'subjectKeys': 879,
'size': 200,
'sourceType': 'BLOOMBERG'
}
def sync_benchmark(self, start_date=None):
params = {
**self.base_params,
'page': 0
}
if start_date:
params['startDate'] = format_date(start_date)
while True:
response = requests.get(f'http://jdcprod.thiztech.com/api/datas/asset-value?{urlencode(params)}').json()
if not response['success']:
raise Exception(f'''request jdc alligam failed: {response['status']}''')
rb.batch_insert([{
'date': dt.fromtimestamp(x['date'] / 1000),
'module': self.module_name,
'risk': self.risk,
'nav': x['calibrateValue'],
'remarks': json.dumps({
'av': x['originValue'],
'div': x['dividend'] if 'dividend' in x else 0
}, ensure_ascii=False)
} for x in response['body']['content']])
if response['body']['last']:
break
else:
params = {**params, 'page': params['page'] + 1}
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
max_date = filter_weekend(max_date)
min_date = filter_weekend(min_date) if min_date else None
last = rb.get_last_one(module=self.module_name, risk=self.risk, max_date=max_date)
if not last or last['date'] < max_date:
self.sync_benchmark(start_date=next_workday(last['date']) if last else None)
result = pd.DataFrame(rb.get_list(max_date=max_date, min_date=min_date))
result['av'] = result['remarks'].apply(lambda x: json.loads(x)['av'])
result['div'] = result['remarks'].apply(lambda x: json.loads(x)['div'])
result['acc'] = result.apply(lambda row: result[result['date'] <= row['date']]['div'].sum() + row['av'], axis=1)
result = result[['date', 'av', 'div', 'acc', 'nav']]
result.rename(columns={'nav': f'{self.risk}_nav', 'av': f'{self.risk}_av', 'div': f'{self.risk}_div', 'acc': f'{self.risk}_acc'}, inplace=True)
return result.to_dict('records')
from datetime import datetime as dt
from typing import List
import pandas as pd
from py_jftech import component, autowired
from api import RoboReportor
@component(bean_name='combo-report')
class DivAlligamComboDatasReportor(RoboReportor):
@autowired(names={'hold_reportor': 'hold-report', 'benchmark': 'benckmark-report'})
def __init__(self, hold_reportor: RoboReportor = None, benchmark: RoboReportor = None):
self._hold_reportor = hold_reportor
self._benchmark = benchmark
@property
def report_name(self) -> str:
return '对比报告'
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
holds = pd.DataFrame(self._hold_reportor.load_report(max_date=max_date, min_date=min_date))
if not holds.empty:
holds.set_index('date', inplace=True)
holds = holds[['real_av', 'acc_av', 'nav', 'fund_nav']]
holds.rename(columns={'real_av': 'av', 'acc_av': 'acc'}, inplace=True)
benchmark = pd.DataFrame(self._benchmark.load_report(max_date=max_date, min_date=min_date))
benchmark.set_index('date', inplace=True)
benchmark = benchmark[['alligam_av', 'alligam_acc', 'alligam_nav']]
datas = holds.join(benchmark)
datas.fillna(method='ffill', inplace=True)
datas.dropna(inplace=True)
datas.reset_index(inplace=True)
return datas.to_dict('records')
return []
import math
from datetime import datetime as dt
from typing import List
import pandas as pd
from py_jftech import component, autowired, filter_weekend, prev_workday
from api import RoboReportor, PortfoliosRisk, PortfoliosHolder, Datum, DatumType, Navs, RoboExecutor
@component(bean_name='contribution-report')
class ContributionReportor(RoboReportor):
@autowired
def __init__(self, hold: PortfoliosHolder = None, datum: Datum = None, navs: Navs = None, exec: RoboExecutor = None):
self._hold = hold
self._datum = datum
self._navs = navs
self._exec = exec
@property
def report_name(self) -> str:
return '贡献率'
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
max_date = filter_weekend(max_date)
min_date = filter_weekend(min_date) if min_date is not None else self._exec.start_date
result = pd.DataFrame()
for risk in PortfoliosRisk.values():
buy_date = None
sell_date = max_date
while buy_date is None or sell_date > min_date:
last_date = sell_date if sell_date == max_date else prev_workday(sell_date)
buy_date = self._hold.get_last_rebalance_date(risk=risk, max_date=last_date)
weight = self._hold.get_portfolios_weight(day=last_date, risk=risk)
datums = pd.DataFrame(self._datum.get_datums(type=DatumType.FUND, datum_ids=tuple(weight.keys())))
datums = datums[['id', 'ftTicker', 'bloombergTicker', 'chineseName']]
datums.columns = ['id', 'ft_ticker', 'bloomberg_ticker', 'name']
datums['ratio'] = datums.apply(lambda row: weight[row.id], axis=1)
datums['hold'] = (sell_date - buy_date).days
navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=tuple(weight.keys()), max_date=sell_date, min_date=buy_date))
navs = navs.pivot_table(columns='fund_id', index='nav_date', values='nav_cal')
rtns = navs.iloc[-1] / navs.iloc[0] - 1
rtns.name = 'rtns'
datums = datums.join(rtns, on='id')
datums['risk'] = risk.name
datums['buy_date'] = buy_date
datums['sell_date'] = sell_date if sell_date != max_date else math.nan
datums.drop('id', axis=1, inplace=True)
result = pd.concat([result, datums], ignore_index=True)
sell_date = buy_date if buy_date < sell_date else prev_workday(buy_date)
return result.to_dict('records') if not result.empty else []
DROP TABLE IF EXISTS robo_benchmark;
CREATE TABLE IF NOT EXISTS robo_benchmark
(
rb_id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
rb_module VARCHAR(255) NOT NULL COMMENT '模块',
rb_date DATETIME NOT NULL COMMENT '日期',
rb_risk VARCHAR(255) NOT NULL COMMENT '风险等级',
rb_nav DOUBLE(16, 4) NOT NULL COMMENT '资产值',
rb_remarks JSON DEFAULT NULL COMMENT '其他信息',
rb_create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
rb_update_time DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (rb_id),
UNIQUE INDEX (rb_module, rb_date, rb_risk),
INDEX (rb_date, rb_risk),
INDEX (rb_risk)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT 'BENCHMARK数据表';
ALTER TABLE robo_benchmark ADD COLUMN v_rb_re TINYINT GENERATED ALWAYS AS (IF(rb_remarks->>'$.re' = 'true', 1, 0)) COMMENT '是否再分配' AFTER rb_remarks;
ALTER TABLE robo_benchmark ADD INDEX v_rb_re(`v_rb_re`);
ALTER TABLE robo_benchmark DROP INDEX v_rb_re;
ALTER TABLE robo_benchmark DROP COLUMN v_rb_re;
DROP TABLE IF EXISTS robo_data_logger;
CREATE TABLE IF NOT EXISTS robo_data_logger
(
rdl_id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
rdl_date DATETIME NOT NULL COMMENT '日期',
rdl_risk VARCHAR(255) NOT NULL COMMENT '风险等级',
rdl_type VARCHAR(255) NOT NULL COMMENT '数据类别',
rdl_datas JSON NOT NULL COMMENT '日志数据',
rdl_create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
rdl_update_time DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (rdl_id),
UNIQUE INDEX (rdl_date, rdl_risk, rdl_type),
INDEX (rdl_risk, rdl_type),
INDEX (rdl_type)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT '数据日志表';
\ No newline at end of file
from py_jftech import read, write, where, mapper_columns, format_date
__COLUMNS__ = {
'rb_id': 'id',
'rb_module': 'module',
'rb_date': 'date',
'rb_risk': 'risk',
'rb_nav': 'nav',
'rb_remarks': 'remarks',
}
@write
def batch_insert(datas):
datas = [mapper_columns(x, __COLUMNS__) for x in datas]
values = ','.join([f'''({','.join([(f"'{x[j]}'" if j in x and x[j] is not None else 'null') for j in __COLUMNS__.keys() if j != 'rb_id'])})''' for x in datas])
return f'''insert into robo_benchmark({','.join([x for x in __COLUMNS__.keys() if x != 'rb_id'])}) values {values}'''
@read(one=True)
def get_last_one(module=None, max_date=None, risk=None, re: bool = None):
sqls = []
if max_date:
sqls.append(f"rb_date <= '{format_date(max_date)}'")
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_benchmark
{where(*sqls, rb_module=module, rb_risk=risk, v_rb_re=re)} order by rb_date desc limit 1
'''
@read
def get_list(max_date=None, min_date=None, module=None, risk=None, re: bool = None):
sqls = []
if max_date:
sqls.append(f"rb_date <= '{format_date(max_date)}'")
if min_date:
sqls.append(f"rb_date >= '{format_date(min_date)}'")
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_benchmark
{where(*sqls, rb_module=module, rb_risk=risk, v_rb_re=re)} order by rb_risk, rb_date
'''
This diff is collapsed.
import json
from datetime import datetime as dt
from py_jftech import component
from api import DataLogger, LoggerType, PortfoliosRisk, Cleanable, BacktestStep
from reports.dao import robo_data_logger as rdl
@component(bean_name='data-logger')
class DatabaseLogger(DataLogger, Cleanable):
def save_record(self, date: dt, risk: PortfoliosRisk, type: LoggerType, datas: dict, exist_merge=True):
assert date is not None, "save record, date cannot be null"
assert risk is not None, "save record, risk cannot be null"
assert type is not None, "save record, type cannot be null"
assert datas is not None, "save record, dates cannot be null"
exist = rdl.get_one(date=date, risk=risk, type=type)
if exist:
save_datas = datas
if exist_merge:
save_datas = {**json.loads(exist['datas']), **datas}
rdl.update(exist['id'], save_datas)
else:
rdl.insert({
'date': date,
'risk': risk,
'type': type,
'datas': datas
})
def load_records(self, max_date=None, min_date=None, risk: PortfoliosRisk = None, type: LoggerType = None):
result = rdl.get_list(max_date=max_date, min_date=min_date, risk=risk, type=type, like_type=True)
return [{**x, 'datas': json.loads(x['datas'])} for x in result]
def clean_up(self, min_date=None, risk: PortfoliosRisk = None):
rdl.delete(min_date=min_date, risk=risk)
@property
def clean_step(self):
return BacktestStep.HOLD_PORTFOLIO
@property
def clean_name(self):
return 'data logger'
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment