Commit e71c080d authored by wenwen.tang's avatar wenwen.tang 😕

bugfix

parent 582e5dac
from py_jftech import read, write, where, mapper_columns, format_date
from py_jftech import read, write, where, mapper_columns, format_date,to_tuple
__COLUMNS__ = {
'red_eco_id': 'eco_id',
......@@ -24,7 +24,7 @@ def get_list(eco_ids=None, min_date=None, max_date=None):
sqls.append(f"red_date <= '{format_date(max_date)}'")
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_eco_datas
{where(*sqls, red_eco_id=to_tuple(index_ids))} order by red_eco_id, red_date
{where(*sqls, red_eco_id=to_tuple(eco_ids))} order by red_eco_id, red_date
'''
......
from datetime import datetime as dt
from typing import List
import pandas as pd
from py_jftech import get_config, component, autowired, to_tuple
......@@ -72,7 +69,7 @@ class DefaultNavs(Navs):
def get_eco_values(self, datum_ids=None, min_date=None, max_date=None, ticker=None, by_release_date=False):
datum_ids = to_tuple(datum_ids)
if ticker:
datums = self._datum.get_datums(type=DatumType.INDEX, ticker=ticker)
datums = self._datum.get_datums(type=DatumType.ECO, ticker=ticker)
datum_ids = tuple(set(list(datum_ids or []) | {x['id'] for x in datums}))
return red.get_list(eco_ids=datum_ids, min_date=min_date, max_date=max_date, by_release_date=by_release_date)
......
......@@ -6,6 +6,7 @@ from datetime import datetime as dt, timedelta
from typing import List
from urllib.parse import quote
import pytz
import requests
from dateutil.relativedelta import relativedelta
from py_jftech import format_date, is_workday, component, autowired, get_config, filter_weekend, next_workday
......@@ -107,7 +108,7 @@ class TWDataSync(DataSync, ABC):
pass
@component(bean_name='tw-navs-sync')
@component(bean_name='navs-sync')
class TWFundNavSync(TWDataSync):
def get_all_data(self, start_date=dt.today()):
......@@ -181,7 +182,7 @@ class IndexSync(JDCDataSync):
def store_date(self, datumid, datas: List[dict]):
save_datas = [{
'index_id': datumid,
'date': dt.fromtimestamp(x['date'] / 1000).strftime('%Y-%m-%d'),
'date': dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d'),
'close': x['close'],
'open': x['open'] if 'open' in x else None,
'high': x['high'] if 'high' in x else None,
......@@ -189,7 +190,7 @@ class IndexSync(JDCDataSync):
'pe': x['peRatio'] if 'peRatio' in x else None,
'pb': x['pbRatio'] if 'pbRatio' in x else None,
'volume': x['volume'] if 'volume' in x else None,
} for x in datas if is_workday(dt.fromtimestamp(x['date'] / 1000)) and 'close' in x]
} for x in datas if is_workday(dt.fromtimestamp(x['date'] / 1000), tz=pytz.timezone('Asia/Shanghai')) and 'close' in x]
if save_datas:
rid.batch_insert(save_datas)
......@@ -215,9 +216,9 @@ class EcoSync(JDCDataSync):
def store_date(self, datumid, datas: List[dict]):
save_datas = [{
'eco_id': datumid,
'date': dt.fromtimestamp(x['date'] / 1000),
'date': dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai')),
'indicator': x['close'],
'release_date': dt.fromtimestamp(x['releaseDate'] / 1000),
'release_date': dt.fromtimestamp(x['releaseDate'] / 1000, tz=pytz.timezone('Asia/Shanghai')),
} for x in datas if 'releaseDate' in x]
if save_datas:
red.batch_insert(save_datas)
......@@ -245,18 +246,19 @@ class FundNavSync(JDCDataSync):
return f'http://jdcprod.thiztech.com/api/datas/asset-value?subjectKeys={key}&page={page}&size=200&sourceType=TW&startDate={format_date(start_date)}'
def find_jdc_subject_key(self):
funds = self._datum.get_datums(type=DatumType.FUND)
funds = {x['ftTicker']: x for x in self._datum.get_datums(type=DatumType.FUND)}
funds = {x['isin']: x for x in self._datum.get_datums(type=DatumType.FUND)}
response = requests.get('http://jdcprod.thiztech.com/api/subject?busiField=TW&sourceType=TW&subjectType=FUND')
response = response.json()
if not response['success']:
raise CollectError(f'''find fund subject failed: {response['status']}''')
return {funds[x['fundId']]['id']: x['key'] for x in response['body']['content'] if x['fundId'] in funds}
content = response['body']['content']
content = [x for x in content if x.get('isin')]
return {funds[x['isin']]['id']: x['key'] for x in content if x['isin'] in funds}
def store_date(self, datumid, datas: List[dict]):
save_navs = [{
'fund_id': datumid,
'nav_date': dt.fromtimestamp(x['date'] / 1000),
'nav_date': dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai')),
'av': x['originValue'],
'div': x['dividend'] if 'dividend' in x else 0,
'split': x['split'] if 'split' in x else 1,
......@@ -264,7 +266,7 @@ class FundNavSync(JDCDataSync):
'av_p': x['postValue'],
'div_p': x['postDividend'] if 'postDividend' in x else 0,
'nav_cal': x['calibrateValue']
} for x in datas if is_workday(dt.fromtimestamp(x['date'] / 1000))]
} for x in datas if is_workday(dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai')))]
if save_navs:
rfn.batch_insert(save_navs)
......@@ -301,7 +303,7 @@ class ExrateSync(DataSync):
try:
save_dates = [{
'ticker': ticker,
'date': dt.fromtimestamp(x['date'] / 1000),
'date': dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai')),
'close': x['close'],
} for x in response['body']['content']]
if save_dates:
......
......@@ -42,6 +42,7 @@ py-jftech:
hold-report: portfolios.holder.DivHoldReportor
mpt: portfolios.builder.PoemPortfoliosBuilder
dividend-holder: portfolios.holder.DividendPortfoliosHolder
navs-sync: basic.sync.FundNavSync
email:
server: smtphz.qiye.163.com
user: jft-ra@thizgroup.com
......
......@@ -42,6 +42,7 @@ py-jftech:
hold-report: portfolios.holder.DivHoldReportor
mpt: portfolios.builder.PoemPortfoliosBuilder
dividend-holder: portfolios.holder.DividendPortfoliosHolder
navs-sync: basic.sync.FundNavSync
email:
server: smtphz.qiye.163.com
user: jft-ra@thizgroup.com
......
......@@ -42,6 +42,7 @@ py-jftech:
hold-report: portfolios.holder.DivHoldReportor
mpt: portfolios.builder.PoemARCPortfoliosBuilder
dividend-holder: portfolios.holder.InvTrustPortfoliosHolder
navs-sync: basic.sync.FundNavSync
email:
server: smtphz.qiye.163.com
user: jft-ra@thizgroup.com
......@@ -231,7 +232,7 @@ reports: # 报告模块相关
content: "Dear All: 附件是今天生成的监测数据,請驗收,謝謝! 注>:該郵件為自動發送,如有問題請聯繫矽谷團隊 telan_qian@chifufund.com"
robo-executor: # 执行器相关
use: ${ROBO_EXECUTOR:backtest} # 执行哪个执行器,优先取系统环境变量ROBO_EXECUTOR的值,默认backtest
sync-data: ${SYNC_DATA:off} # 是否开启同步资料数据
sync-data: ${SYNC_DATA:on} # 是否开启同步资料数据
backtest: # 回测执行器相关
start-date: 2022-02-16 # 回测起始日期
end-date: 2023-01-03 # 回测截止日期
......
......@@ -23,3 +23,12 @@ urllib3==1.26.12
fastapi==0.100.0
uvicorn==0.23.1
apscheduler==3.10.1
sklearn
finta
keras
tensorflow
matplotlib
lightgbm
\ No newline at end of file
import datetime as dt
import json
from multiprocessing import Process
import uvicorn
from apscheduler.schedulers.blocking import BlockingScheduler
from fastapi import FastAPI
from py_jftech import prev_workday, filter_weekend
import main
from api import DatumType, PortfoliosRisk
from basic.datum import DefaultDatum
app = FastAPI()
REC_GID = 'E3886FBA-123B-7890-123E-123456BEEED'
def get_today_rec():
from portfolios.dao import robo_mpt_portfolios as rmp
from api import PortfoliosType, PortfoliosRisk
day = prev_workday(filter_weekend(dt.date.today()))
portfolio = rmp.get_one(day, PortfoliosType.NORMAL, PortfoliosRisk.FT3)
return portfolio
def get_last_signal():
from rebalance.dao import robo_rebalance_signal as rrs
day = prev_workday(filter_weekend(dt.date.today()))
last_re = rrs.get_last_one(max_date=day, risk=PortfoliosRisk.FT3, effective=True)
return last_re
@app.get("/recommend")
async def root():
portfolio = get_today_rec()
if portfolio:
fund_infos = DefaultDatum().get_datums(DatumType.FUND)
id_ticker_map = {str(info['id']): info for info in fund_infos}
funds = json.loads(portfolio['portfolio'])
rec_list = []
portfolios = {'recomm_guid': REC_GID}
data = {'recomm_guid': REC_GID}
data['data_date'] = portfolio['date'].strftime('%Y-%m-%d')
data['funds'] = [{'weight': round(weight * 100), 'fund_id': id_ticker_map[key]['ftTicker']} for key, weight in
funds.items()]
data['creat_date'] = portfolio['create_time'].strftime('%Y-%m-%d %H:%M:%S')
# todo 补全
data['cp'] = 0.81
data['rr'] = 0.81
data['roi'] = 0.81
data['risk'] = round(sum([id_ticker_map[key]['risk'] * weight for key, weight in funds.items()]), 2)
note = {}
sig = get_last_signal()
note['last_reg_reb'] = sig['date'].strftime('%Y-%m-%d')
data['note'] = json.dumps(note)
portfolios['data'] = data
rec_list.append(portfolios)
return rec_list
else:
return {'msg': '当日投组未产生,待10:00后获取'}
def start_robo():
# 异常情况可以重启跑当天投组
current_time = dt.datetime.now()
target_time = dt.time(10, 0)
if current_time.time() > target_time:
main.start()
# 开启定时任务,执行实盘
scheduler = BlockingScheduler()
scheduler.add_job(main.start, 'cron', day_of_week='0-4', hour=10, minute=00)
scheduler.start()
def start_web():
uvicorn.run("robo_controller:app", reload=True, port=8080)
if __name__ == "__main__":
# 开启一个进程执行start_robo()
p1 = Process(target=start_robo)
p1.start()
# 启动进程2
p2 = Process(target=start_web)
p2.start()
# import datetime as dt
# import json
# from multiprocessing import Process
#
# import uvicorn
# from apscheduler.schedulers.blocking import BlockingScheduler
# from fastapi import FastAPI
# from py_jftech import prev_workday, filter_weekend
#
# import main
# from api import DatumType, PortfoliosRisk
# from basic.datum import DefaultDatum
#
# app = FastAPI()
#
# REC_GID = 'E3886FBA-123B-7890-123E-123456BEEED'
#
#
# def get_today_rec():
# from portfolios.dao import robo_mpt_portfolios as rmp
# from api import PortfoliosType, PortfoliosRisk
# day = prev_workday(filter_weekend(dt.date.today()))
# portfolio = rmp.get_one(day, PortfoliosType.NORMAL, PortfoliosRisk.FT3)
# return portfolio
#
#
# def get_last_signal():
# from rebalance.dao import robo_rebalance_signal as rrs
#
# day = prev_workday(filter_weekend(dt.date.today()))
# last_re = rrs.get_last_one(max_date=day, risk=PortfoliosRisk.FT3, effective=True)
# return last_re
#
#
# @app.get("/recommend")
# async def root():
# portfolio = get_today_rec()
# if portfolio:
# fund_infos = DefaultDatum().get_datums(DatumType.FUND)
# id_ticker_map = {str(info['id']): info for info in fund_infos}
# funds = json.loads(portfolio['portfolio'])
# rec_list = []
# portfolios = {'recomm_guid': REC_GID}
# data = {'recomm_guid': REC_GID}
# data['data_date'] = portfolio['date'].strftime('%Y-%m-%d')
# data['funds'] = [{'weight': round(weight * 100), 'fund_id': id_ticker_map[key]['ftTicker']} for key, weight in
# funds.items()]
# data['creat_date'] = portfolio['create_time'].strftime('%Y-%m-%d %H:%M:%S')
# # todo 补全
# data['cp'] = 0.81
# data['rr'] = 0.81
# data['roi'] = 0.81
# data['risk'] = round(sum([id_ticker_map[key]['risk'] * weight for key, weight in funds.items()]), 2)
# note = {}
# sig = get_last_signal()
# note['last_reg_reb'] = sig['date'].strftime('%Y-%m-%d')
# data['note'] = json.dumps(note)
# portfolios['data'] = data
# rec_list.append(portfolios)
# return rec_list
# else:
# return {'msg': '当日投组未产生,待10:00后获取'}
#
#
# def start_robo():
# # 异常情况可以重启跑当天投组
# current_time = dt.datetime.now()
# target_time = dt.time(10, 0)
# if current_time.time() > target_time:
# main.start()
#
# # 开启定时任务,执行实盘
# scheduler = BlockingScheduler()
# scheduler.add_job(main.start, 'cron', day_of_week='0-4', hour=10, minute=00)
# scheduler.start()
#
#
# def start_web():
# uvicorn.run("robo_controller:app", reload=True, port=8080)
#
#
# if __name__ == "__main__":
# # 开启一个进程执行start_robo()
# p1 = Process(target=start_robo)
# p1.start()
#
# # 启动进程2
# p2 = Process(target=start_web)
# p2.start()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment