import base64 import hashlib import logging from abc import ABC, abstractmethod from datetime import datetime as dt, timedelta from typing import List from urllib.parse import quote import pytz import requests from dateutil.relativedelta import relativedelta from py_jftech import format_date, is_workday, component, autowired, get_config, filter_weekend, next_workday from api import DatumType, DataSync, Datum from basic.dao import robo_index_datas as rid, robo_eco_datas as red, robo_fund_navs as rfn, robo_exrate as re from basic.sp500 import sync_sp500 logger = logging.getLogger(__name__) class JDCDataSync(DataSync, ABC): @autowired def __init__(self, datum: Datum = None): self._datum = datum self._config = get_config(__name__) @property def start_date(self): return filter_weekend(self._config['start-date']) @abstractmethod def datum_start_date(self, datum_id): pass @abstractmethod def build_urls(self, datum, start_date, page=0) -> str: pass @property @abstractmethod def datum_type(self) -> DatumType: pass @abstractmethod def store_date(self, datumid, datas: List[dict]): pass def do_sync(self, max_date=dt.today()): logger.info(f'start sync datas for type[{self.datum_type}]') for datum in self._datum.get_datums(type=self.datum_type): logger.debug(f'start sync ticker[{datum["bloombergTicker"]}]') page = 0 start_date = self.datum_start_date(datum['id']) while True: url = self.build_urls(datum=datum, page=page, start_date=start_date) if url is None: break response = requests.get(url).json() if not response['success']: raise Exception(f'''request indictor failed: {response['status']}''') try: self.store_date(datum['id'], response['body']['content']) except Exception as e: logger.exception(f'url[{url}] store data failed') raise e if response['body']['last']: break else: page += 1 class TWDataSync(DataSync, ABC): @autowired def __init__(self, datum: Datum = None): self._datum = datum self._config = get_config(__name__) def do_sync(self, max_date=dt.today()): logger.info(f'start sync datas for type[{self.datum_type}]') response = self.get_all_data(self.start_date) for datum in self._datum.get_datums(type=self.datum_type): logger.debug(f'start sync ticker[{datum["ftTicker"]}]') try: self.store_date(datum['id'], datum["ftTicker"], response) except Exception as e: logger.exception(f'''{datum['id']} store data failed''') raise e @property def start_date(self): return filter_weekend(dt.today() - timedelta(days=5)) @abstractmethod def last_datum(self, datum_id): pass @abstractmethod def get_all_data(self, start_date=dt.today()): pass @property @abstractmethod def datum_type(self) -> DatumType: pass @abstractmethod def store_date(self, datumid, ft_ticker, datas: List[dict]): pass @component(bean_name='navs-sync') class TWFundNavSync(TWDataSync): def get_all_data(self, start_date=dt.today()): authori = 'chifufund' + dt.today().strftime('%Y%m%d') + 'FTDMNAVDATE' authdest = base64.b64encode(hashlib.sha256(authori.encode()).digest()).decode() req = requests.session() req.headers['user'] = 'chifufund' req.headers['Authorization'] = 'Basic ' + authdest for i in range(30): try: resp = req.get('http://210.202.243.106:1688/api/public/NAV?parameter.modifyDate=' + start_date.strftime( '%Y-%m-%d'), timeout=30) return resp.json() except Exception as e: logger.error(str(e)) def last_datum(self, datum_id): last = rfn.get_last_one(fund_id=datum_id) return last @property def datum_type(self) -> DatumType: return DatumType.FUND def store_date(self, datumid, ft_ticker, datas: List[dict]): last = self.last_datum(datum_id=datumid) start_date = next_workday(last['nav_date']) if last else self.start_date save_navs = [] last_av = last['av'] last_nav_cal = last['nav_cal'] for data in datas: if dt.strptime(data['Nav_Date'], "%Y-%m-%dT%H:%M:%S") >= start_date and data['Fund_Id'] == ft_ticker: nav = { 'fund_id': datumid, 'nav_date': dt.strptime(data['Nav_Date'], "%Y-%m-%dT%H:%M:%S"), 'av': data['Nav_P'], 'div': data['Nav_T_Div'], 'split': data['Nav_Spilt'], 'accrue_split': data['Nav_Spilt'], 'av_p': data['Nav_P'], 'div_p': data['Nav_T_Div'], # 当日/上日 'nav_cal': round(((data['Nav_P'] + data['Nav_T_Div']) * data['Nav_Unit']) / ( last_av * data['Nav_Unit']) * last_nav_cal, 4) } last_av = nav['av'] last_nav_cal = nav['nav_cal'] save_navs.append(nav) if save_navs: rfn.batch_insert(save_navs) @component(bean_name='index-sync') class IndexSync(JDCDataSync): @property def start_date(self): return super(IndexSync, self).start_date - relativedelta(years=4) @property def datum_type(self) -> DatumType: return DatumType.INDEX def datum_start_date(self, datum_id): last = rid.get_last_one(index_id=datum_id) return next_workday(last['date']) if last else self.start_date def build_urls(self, datum, start_date, page=0) -> str: sourceCode = quote(datum["bloombergTicker"]) if quote(datum["bloombergTicker"]) else quote(datum["thsTicker"]) sourceType = 'BLOOMBERG' if quote(datum["bloombergTicker"]) else 'THS' return f'http://jdcprod.thiztech.com/api/datas/index-value?page={page}&size=200&sourceCode={sourceCode}&sourceType={sourceType}&startDate={format_date(start_date)}' def store_date(self, datumid, datas: List[dict]): # add frdpe,frdpes,erp,pc save_datas = [{ 'index_id': datumid, 'date': dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d'), 'close': x['close'], 'open': x['open'] if 'open' in x else None, 'high': x['high'] if 'high' in x else None, 'low': x['low'] if 'low' in x else None, 'pe': x['peRatio'] if 'peRatio' in x else None, 'pb': x['pbRatio'] if 'pbRatio' in x else None, 'volume': x['volume'] if 'volume' in x else None, 'frdpe': x['forwardPe'] if 'forwardPe' in x else None, 'frdpes': x['forwardEps'] if 'forwardEps' in x else None, 'erp': x['erp'] if 'erp' in x else None, 'pc': x['pcRatio'] if 'pcRatio' in x else None, } for x in datas if is_workday(dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai'))) and 'close' in x] if save_datas: rid.batch_insert(save_datas) @component(bean_name='eco-sync') class EcoSync(JDCDataSync): @property def start_date(self): return super().start_date - relativedelta(years=4) @property def datum_type(self) -> DatumType: return DatumType.ECO def datum_start_date(self, datum_id): last = red.get_last_one(eco_id=datum_id) return next_workday(last['date']) if last else self.start_date def build_urls(self, datum, start_date, page=0) -> str: if datum.get("source") == "calculating": return None return f'http://jdcprod.thiztech.com/api/datas/eco-value?page={page}&size=200&sourceCode={quote(datum["bloombergTicker"])}&sourceType=BLOOMBERG&startDate={format_date(start_date)}' def store_date(self, datumid, datas: List[dict]): save_datas = [{ 'eco_id': datumid, 'date': dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai')), 'indicator': x['close'], 'release_date': dt.fromtimestamp(x['releaseDate'] / 1000, tz=pytz.timezone('Asia/Shanghai')), } for x in datas if 'releaseDate' in x] if save_datas: red.batch_insert(save_datas) @component(bean_name='eco-sync-calculating') class EcoSync(EcoSync): def datum_start_date(self, datum_id): last = red.get_last_one(eco_id=datum_id) return next_workday(last['release_date']) if last else None def do_sync(self, max_date=dt.today()): logger.info(f'start sync datas for type[{self.datum_type}]') for datum in self._datum.get_datums(type=self.datum_type): if datum.get("source") == "calculating": logger.debug(f'start sync ticker[{datum["bloombergTicker"]}]') while True: datas = sync_sp500(datum.get("bloombergTicker"), self.datum_start_date(datum['id'])) if datas: self.store_date(datum['id'], datas) else: break def store_date(self, datumid, datas: List[dict]): save_datas = [{ 'eco_id': datumid, 'date': x['date'], 'indicator': x['close'], 'release_date': x['releaseDate'], } for x in datas] if save_datas: red.batch_insert(save_datas) @component(bean_name='navs-sync') class FundNavSync(JDCDataSync): def __init__(self): super(FundNavSync, self).__init__() self._jdc_querys = self.find_jdc_querys() @property def datum_type(self) -> DatumType: return DatumType.FUND def datum_start_date(self, datum_id): last = rfn.get_last_one(fund_id=datum_id) return next_workday(last['nav_date']) if last else self.start_date def build_urls(self, datum, start_date, page=0) -> str: if datum['id'] not in self._jdc_querys: return None querys = self._jdc_querys[datum['id']] query_str = '&'.join([f'{x[0]}={quote(str(x[1]).encode())}' for x in querys.items()]) return f'http://jdcprod.thiztech.com/api/datas/asset-value?page={page}&size=200&startDate={format_date(start_date)}&{query_str}' def find_jdc_querys(self): funds = self._datum.get_datums(type=DatumType.FUND, exclude=False) urls = {x['id']: { 'sourceCode': x['bloombergTicker'], 'sourceType': 'BLOOMBERG' } for x in funds if 'ftTicker' not in x and 'bloombergTicker' in x} ft_tickers = {x['ftTicker']: x for x in funds if 'ftTicker' in x} response = requests.get('http://jdcprod.thiztech.com/api/subject?busiField=TW&sourceType=TW&subjectType=FUND') response = response.json() if not response['success']: raise CollectError(f'''find fund subject failed: {response['status']}''') return {**urls, **{ ft_tickers[x['fundId']]['id']: { 'subjectKeys': x['key'], 'sourceType': 'TW' } for x in response['body']['content'] if x['fundId'] in ft_tickers }} def store_date(self, datumid, datas: List[dict]): save_navs = [{ 'fund_id': datumid, 'nav_date': dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai')), 'av': x['originValue'], 'div': x['dividend'] if 'dividend' in x else 0, 'split': x['split'] if 'split' in x else 1, 'accrue_split': x['totalSplit'] if 'totalSplit' in x else 1, 'av_p': x['postValue'], 'div_p': x['postDividend'] if 'postDividend' in x else 0, 'nav_cal': x['calibrateValue'] } for x in datas if is_workday(dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai')))] if save_navs: rfn.batch_insert(save_navs) @component(bean_name='exrate-sync') class ExrateSync(DataSync): def __init__(self): self._config = get_config(__name__) @property def start_date(self): return filter_weekend(self._config['start-date']) @property def exrate_tickers(self): navs_config = get_config('basic.navs') return [x['ticker'] for x in navs_config['exrate']] if 'exrate' in navs_config else [] def build_url(self, ticker, start_date, page=0): return f'http://jdcprod.thiztech.com/api/datas/exrate-value?page={page}&size=200&sourceCode={quote(ticker)}&sourceType=BLOOMBERG&startDate={format_date(start_date)}' def do_sync(self, max_date=dt.today()): logger.info(f'start sync datas for type[EXRATE]') for ticker in self.exrate_tickers: last_one = re.get_last_one(ticker=ticker, max_date=max_date) start_date = next_workday(last_one['date']) if last_one else self.start_date page = 0 while True: url = self.build_url(ticker=ticker, start_date=start_date, page=page) response = requests.get(url).json() if not response['success']: raise Exception(f'''request indictor failed: {response['status']}''') try: save_dates = [{ 'ticker': ticker, 'date': dt.fromtimestamp(x['date'] / 1000, tz=pytz.timezone('Asia/Shanghai')), 'close': x['close'], } for x in response['body']['content']] if save_dates: re.batch_insert(save_dates) except Exception as e: logger.exception(f'url[{url}] store data failed') raise e if response['body']['last']: break else: page += 1