Commit 71c9d2a6 authored by wenwen.tang's avatar wenwen.tang 😕

适配新数据源

parent 24827f2c
import base64
import hashlib
import json
import logging
from abc import ABC, abstractmethod
from datetime import datetime as dt, timedelta
from typing import List
from urllib.parse import quote
import pandas as pd
import pytz
import requests
from dateutil.relativedelta import relativedelta
......@@ -140,7 +142,7 @@ class TWFundNavSync(TWDataSync):
'accrue_split': 1,
'av_p': data['nav_P'],
'div_p': div,
'nav_cal': round(data['nav_P']*data['nav_Unit'], 4)
'nav_cal': round(data['nav_P'] * data['nav_Unit'], 4)
}
save_navs.append(nav)
if save_navs:
......@@ -188,13 +190,27 @@ class IndexSync(JDCDataSync):
if save_datas:
rid.batch_insert(save_datas)
def ths_token():
# Token accessToken 及权限校验机制
getAccessTokenUrl = 'https://quantapi.51ifind.com/api/v1/get_access_token'
refreshtoken = 'eyJzaWduX3RpbWUiOiIyMDI1LTA1LTE5IDE5OjE5OjM5In0=.eyJ1aWQiOiI3NzE0ODI3NzAiLCJ1c2VyIjp7ImFjY291bnQiOiJzaGpmc3kwMDEiLCJhdXRoVXNlckluZm8iOnsiU0ZUU0UiOnRydWUsIlNVU0FJbmRleENvZGUiOnRydWUsIlNEQ0UiOnRydWUsIlNMTUUiOnRydWUsIlNDSUNDIjp0cnVlLCJhcGlGb3JtYWwiOiIxIn0sImNvZGVDU0kiOltdLCJjb2RlWnpBdXRoIjpbXSwiaGFzQUlQcmVkaWN0IjpmYWxzZSwiaGFzQUlUYWxrIjpmYWxzZSwiaGFzQ0lDQyI6dHJ1ZSwiaGFzQ1NJIjpmYWxzZSwiaGFzRXZlbnREcml2ZSI6ZmFsc2UsImhhc0ZUU0UiOnRydWUsImhhc0Zhc3QiOmZhbHNlLCJoYXNGdW5kVmFsdWF0aW9uIjpmYWxzZSwiaGFzSEsiOnRydWUsImhhc0xNRSI6dHJ1ZSwiaGFzTGV2ZWwyIjpmYWxzZSwiaGFzUmVhbENNRSI6ZmFsc2UsImhhc1RyYW5zZmVyIjpmYWxzZSwiaGFzVVMiOmZhbHNlLCJoYXNVU0FJbmRleCI6dHJ1ZSwiaGFzVVNERUJUIjpmYWxzZSwibWFya2V0QXV0aCI6eyJEQ0UiOmZhbHNlfSwibWFya2V0Q29kZSI6IjE2OzMyOzE0NDsxNzY7MTEyOzg4OzQ4OzEyODsxNjgtMTsxODQ7MjAwOzIxNjsxMDQ7MTIwOzEzNjsyMzI7NTY7OTY7MTYwOzY0OyIsIm1heE9uTGluZSI6MSwibm9EaXNrIjpmYWxzZSwicHJvZHVjdFR5cGUiOiJTVVBFUkNPTU1BTkRQUk9EVUNUIiwicmVmcmVzaFRva2VuRXhwaXJlZFRpbWUiOiIyMDI4LTAzLTMxIDE5OjExOjQzIiwic2Vzc3Npb24iOiI1MTgzMzQxNDM2YWUxMTA3N2M3OGQwODYwZDhkODY5MCIsInNpZEluZm8iOnt9LCJ0cmFuc0F1dGgiOmZhbHNlLCJ1aWQiOiI3NzE0ODI3NzAiLCJ1c2VyVHlwZSI6Ik9GRklDSUFMIiwid2lmaW5kTGltaXRNYXAiOnt9fX0=.51B86B4E892B5B90BB134429C30D0E1F7FBA86258898D1030017C0CA1184FBA1'
getAccessTokenHeader = {"Content-Type": "application/json", "refresh_token": refreshtoken}
getAccessTokenResponse = requests.post(url=getAccessTokenUrl, headers=getAccessTokenHeader)
accessToken = json.loads(getAccessTokenResponse.content)['data']['access_token']
print(accessToken)
return accessToken
@component(bean_name='eco-sync')
class EcoSync(JDCDataSync):
class EcoSync(DataSync):
@autowired
def __init__(self, datum: Datum = None):
self._datum = datum
self._config = get_config(__name__)
@property
def start_date(self):
return super(EcoSync, self).start_date - relativedelta(years=4)
return filter_weekend(self._config['start-date'])
@property
def datum_type(self) -> DatumType:
......@@ -204,19 +220,53 @@ class EcoSync(JDCDataSync):
last = red.get_last_one(eco_id=datum_id)
return next_workday(last['date']) if last else self.start_date
def build_urls(self, datum, start_date, page=0) -> str:
return f'http://jdcprod.thiztech.com/api/datas/eco-value?page={page}&size=200&sourceCode={quote(datum["bloombergTicker"])}&sourceType=BLOOMBERG&startDate={format_date(start_date)}'
def do_sync(self, max_date=dt.today()):
logger.info(f'start sync datas for type[{self.datum_type}]')
for datum in self._datum.get_datums(type=self.datum_type):
logger.debug(f'start sync ticker[{datum["bloombergTicker"]}]')
start_date = self.datum_start_date(datum['id'])
response = self.ths_eco(code=datum['thsTicker'],start_date=start_date)
try:
self.store_date(datum['id'], response['tables'][0])
except Exception as e:
logger.exception(f'''datumid[{datum['id']}] store data failed''')
raise e
def store_date(self, datumid, datas: List[dict]):
def store_date(self, datumid, datas):
keys = ['time', 'value','rtime']
values = [datas[k] for k in keys]
datas = [dict(zip(keys, row)) for row in zip(*values)]
save_datas = [{
'eco_id': datumid,
'date': dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai')),
'indicator': x['close'],
'release_date': dt.fromtimestamp(x['releaseDate'] / 1000, tz=pytz.timezone('Asia/Shanghai')),
} for x in datas if 'releaseDate' in x]
'date': x['time'],
'indicator': x['value'],
'release_date': x['rtime'],
} for x in datas]
if save_datas:
red.batch_insert(save_datas)
def ths_eco(self, code,start_date):
"""
经济数据库(EDB)-中国经济-社会消费品零售总额:当月同比-iFinD数据接口
requestMethod:POST
requestURL:https://quantapi.51ifind.com/api/v1/edb_service
requestHeaders:{"Content-Type":"application/json","access_token":"81573051b7978789229c4ae00d31c00c678248d5.signs_NzcxNDgyNzcw","ifindlang":"cn"}
formData:{"indicators":"M001625520","startdate":"2025-01-23","enddate":"2026-01-23"}
"""
url = "https://quantapi.51ifind.com/api/v1/edb_service"
headers = {
"Content-Type": "application/json",
"access_token": ths_token(),
"ifindlang": "cn"
}
formData = {
"indicators": code,
"startdate": start_date.strftime('%Y-%m-%d'),
"enddate": dt.today().strftime('%Y-%m-%d')
}
response = requests.post(url, json=formData, headers=headers)
return response.json()
@component(bean_name='navs-sync')
class FundNavSync(JDCDataSync):
......@@ -290,32 +340,40 @@ class ExrateSync(DataSync):
navs_config = get_config('basic.navs')
return [x['ticker'] for x in navs_config['exrate']] if 'exrate' in navs_config else []
def build_url(self, ticker, start_date, page=0):
return f'http://jdcprod.thiztech.com/api/datas/exrate-value?page={page}&size=200&sourceCode={quote(ticker)}&sourceType=BLOOMBERG&startDate={format_date(start_date)}'
def do_sync(self, max_date=dt.today()):
logger.info(f'start sync datas for type[EXRATE]')
for ticker in self.exrate_tickers:
last_one = re.get_last_one(ticker=ticker)
start_date = next_workday(last_one['date']) if last_one else self.start_date
page = 0
while True:
url = self.build_url(ticker=ticker, start_date=start_date, page=page)
response = requests.get(url).json()
if not response['success']:
raise Exception(f'''request indictor failed: {response['status']}''')
try:
response = self.exrate_ths(code=ticker, start_date=start_date)
response = response['tables'][0]
save_dates = [{
'ticker': ticker,
'date': dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai')),
'close': x['close'],
} for x in response['body']['content']]
'date': date,
'close': close,
} for date,close in zip(response['time'], response['table']['close'])]
if save_dates:
re.batch_insert(save_dates)
except Exception as e:
logger.exception(f'url[{url}] store data failed')
raise e
if response['body']['last']:
break
else:
page += 1
def exrate_ths(self, code, start_date):
"""
历史行情-外汇-收盘价-iFinD数据接口
requestMethod:POST
requestURL:https://quantapi.51ifind.com/api/v1/cmd_history_quotation
requestHeaders:{"Content-Type":"application/json","access_token":"81573051b7978789229c4ae00d31c00c678248d5.signs_NzcxNDgyNzcw","ifindlang":"cn"}
formData:{"codes":"EURUSD.FX","indicators":"close","startdate":"2025-01-20","enddate":"2026-01-23"}
"""
url = "https://quantapi.51ifind.com/api/v1/cmd_history_quotation"
headers = {
"Content-Type": "application/json",
"access_token": ths_token(),
"ifindlang": "cn"
}
formData = {
"codes": code,
"indicators": "close",
"startdate": start_date.strftime('%Y-%m-%d'),
"enddate": dt.today().strftime('%Y-%m-%d')
}
response = requests.post(url, json=formData, headers=headers)
return response.json()
......@@ -80,7 +80,7 @@ basic: # 基础信息模块
navs: # 净值模块
exrate: # 汇率,如果不开启,整个这块注释掉
- from: EUR # 需要转换的货币类型
ticker: EURUSD BGN Curncy # 汇率值的彭博ticker
ticker: EURUSD.FX # 汇率值的彭博ticker
asset-pool: # 资产池模块
asset-optimize: # 资产优选模块
sortino-weight: # sortino计算需要的权重,下面每一条为一次计算,e.g. months: 3, weight: 0.5 表示 3个月数据使用权重0.5来计算分值
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment