Commit 9d4c3c33 authored by wenwen.tang's avatar wenwen.tang 😕

基金数据获取方式替换-tw

parent 45367cee
import base64
import hashlib
import logging
from abc import ABC, abstractmethod
from datetime import datetime as dt
from dateutil.relativedelta import relativedelta
from datetime import datetime as dt, timedelta
from typing import List
from urllib.parse import quote
import requests
from dateutil.relativedelta import relativedelta
from py_jftech import format_date, is_workday, component, autowired, get_config, filter_weekend, next_workday
from api import DatumType, DataSync, Datum
......@@ -66,6 +68,98 @@ class JDCDataSync(DataSync, ABC):
page += 1
class TWDataSync(DataSync, ABC):
@autowired
def __init__(self, datum: Datum = None):
self._datum = datum
self._config = get_config(__name__)
def do_sync(self, max_date=dt.today()):
logger.info(f'start sync datas for type[{self.datum_type}]')
response = self.get_all_data(self.start_date)
for datum in self._datum.get_datums(type=self.datum_type):
logger.debug(f'start sync ticker[{datum["ftTicker"]}]')
try:
self.store_date(datum['id'], datum["ftTicker"], response)
except Exception as e:
logger.exception(f'''{datum['id']} store data failed''')
raise e
@property
def start_date(self):
return filter_weekend(dt.today() - timedelta(days=5))
@abstractmethod
def last_datum(self, datum_id):
pass
@abstractmethod
def get_all_data(self, start_date=dt.today()):
pass
@property
@abstractmethod
def datum_type(self) -> DatumType:
pass
@abstractmethod
def store_date(self, datumid, ft_ticker, datas: List[dict]):
pass
@component(bean_name='tw-navs-sync')
class TWFundNavSync(TWDataSync):
def get_all_data(self, start_date=dt.today()):
authori = 'chifufund' + dt.today().strftime('%Y%m%d') + 'FTDMNAVDATE'
authdest = base64.b64encode(hashlib.sha256(authori.encode()).digest()).decode()
req = requests.session()
req.headers['user'] = 'chifufund'
req.headers['Authorization'] = 'Basic ' + authdest
for i in range(30):
try:
resp = req.get('http://210.202.243.106:1688/api/public/NAV?parameter.modifyDate=' + start_date.strftime(
'%Y-%m-%d'), timeout=30)
return resp.json()
except Exception as e:
logger.error(str(e))
def last_datum(self, datum_id):
last = rfn.get_last_one(fund_id=datum_id)
return last
@property
def datum_type(self) -> DatumType:
return DatumType.FUND
def store_date(self, datumid, ft_ticker, datas: List[dict]):
last = self.last_datum(datum_id=datumid)
start_date = next_workday(last['nav_date']) if last else self.start_date
save_navs = []
last_av = last['av']
last_nav_cal = last['nav_cal']
for data in datas:
if dt.strptime(data['Nav_Date'], "%Y-%m-%dT%H:%M:%S") >= start_date and data['Fund_Id'] == ft_ticker:
nav = {
'fund_id': datumid,
'nav_date': dt.strptime(data['Nav_Date'], "%Y-%m-%dT%H:%M:%S"),
'av': data['Nav_P'],
'div': data['Nav_T_Div'],
'split': data['Nav_Spilt'],
'accrue_split': data['Nav_Spilt'],
'av_p': data['Nav_P'],
'div_p': data['Nav_T_Div'],
# 当日/上日
'nav_cal': round(((data['Nav_P'] + data['Nav_T_Div']) * data['Nav_Unit']) / (
last_av * data['Nav_Unit']) * last_nav_cal, 4)
}
last_av = nav['av']
last_nav_cal = nav['nav_cal']
save_navs.append(nav)
if save_navs:
rfn.batch_insert(save_navs)
@component(bean_name='index-sync')
class IndexSync(JDCDataSync):
......
......@@ -232,7 +232,7 @@ reports: # 报告模块相关
content: "Dear All: 附件是今天生成的监测数据,請驗收,謝謝! 注>:該郵件為自動發送,如有問題請聯繫矽谷團隊 telan_qian@chifufund.com"
robo-executor: # 执行器相关
use: ${ROBO_EXECUTOR:real} # 执行哪个执行器,优先取系统环境变量ROBO_EXECUTOR的值,默认backtest
sync-data: ${SYNC_DATA:off} # 是否开启同步资料数据
sync-data: ${SYNC_DATA:on} # 是否开启同步资料数据
backtest: # 回测执行器相关
start-date: 2022-09-30 # 回测起始日期
end-date: 2023-07-03 # 回测截止日期
......
......@@ -133,7 +133,8 @@ class BacktestExecutor(RoboExecutor):
@component(bean_name='real')
class RealExecutor(RoboExecutor):
@autowired(names={'daily_export': 'daily-real-export', 'monitor_export': 'daily-monitor-export'})
@autowired(names={'daily_export': 'daily-real-export', 'monitor_export': 'daily-monitor-export',
'syncs': 'tw-navs-sync'})
def __init__(self, builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None, syncs: List[DataSync] = None,
daily_export: RoboExportor = None, monitor_export: RoboExportor = None, pool: AssetPool = None,
signal: RebalanceSignal = None):
......@@ -146,7 +147,6 @@ class RealExecutor(RoboExecutor):
self._config = get_config(__name__)['real']
self._signal = signal
@property
def start_date(self):
return pd.to_datetime(filter_weekend(self._config['start-date']))
......@@ -174,8 +174,7 @@ class RealExecutor(RoboExecutor):
def start_exec(self):
if self.is_sync_data:
for sync in self._syncs:
sync.do_sync()
self._syncs.do_sync()
date = self.curt_date
if is_workday(date) or date in self.include_date:
date = prev_workday(filter_weekend(date))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment