import logging from abc import ABC, abstractmethod from datetime import datetime as dt from typing import List from urllib.parse import quote import requests from py_jftech import format_date, is_workday, component, autowired, get_config, filter_weekend, next_workday from api import DatumType, DataSync, Datum from basic.dao import robo_index_datas as rid, robo_eco_datas as red, robo_fund_navs as rfn logger = logging.getLogger(__name__) class JDCDataSync(DataSync, ABC): @autowired def __init__(self, datum: Datum = None): self._datum = datum self._config = get_config(__name__) @property def start_date(self): return filter_weekend(self._config['start-date']) @abstractmethod def datum_start_date(self, datum_id): pass @abstractmethod def build_urls(self, datum, start_date, page=0) -> str: pass @property @abstractmethod def datum_type(self) -> DatumType: pass @abstractmethod def store_date(self, datumid, datas: List[dict]): pass def do_sync(self, max_date=dt.today()): logger.info(f'start sync datas for type[{self.datum_type}]') for datum in self._datum.get_datums(type=self.datum_type): logger.debug(f'start sync ticker[{datum["bloombergTicker"]}]') page = 0 start_date = self.datum_start_date(datum['id']) while True: url = self.build_urls(datum=datum, page=page, start_date=start_date) if url is None: break response = requests.get(url).json() if not response['success']: raise Exception(f'''request indictor failed: {response['status']}''') try: self.store_date(datum['id'], response['body']['content']) except Exception as e: logger.exception(f'url[{url}] store data failed') raise e if response['body']['last']: break else: page += 1 @component(bean_name='index-sync') class IndexSync(JDCDataSync): @property def datum_type(self) -> DatumType: return DatumType.INDEX def datum_start_date(self, datum_id): last = rid.get_last_one(index_id=datum_id) return next_workday(last['date']) if last else self.start_date def build_urls(self, datum, start_date, page=0) -> str: return f'http://jdcprod.thiztech.com/api/datas/index-value?page={page}&size=200&sourceCode={quote(datum["bloombergTicker"])}&sourceType=BLOOMBERG&startDate={format_date(start_date)}' def store_date(self, datumid, datas: List[dict]): save_datas = [{ 'index_id': datumid, 'date': dt.fromtimestamp(x['date'] / 1000).strftime('%Y-%m-%d'), 'close': x['close'], 'open': x['open'] if 'open' in x else None, 'high': x['high'] if 'high' in x else None, 'low': x['low'] if 'low' in x else None, 'pe': x['peRatio'] if 'peRatio' in x else None, 'pb': x['pbRatio'] if 'pbRatio' in x else None, 'volume': x['volume'] if 'volume' in x else None, } for x in datas if is_workday(dt.fromtimestamp(x['date'] / 1000)) and 'close' in x] if save_datas: rid.batch_insert(save_datas) @component(bean_name='eco-sync') class EcoSync(JDCDataSync): @property def datum_type(self) -> DatumType: return DatumType.ECO def datum_start_date(self, datum_id): last = red.get_last_one(eco_id=datum_id) return next_workday(last['date']) if last else self.start_date def build_urls(self, datum, start_date, page=0) -> str: return f'http://jdcprod.thiztech.com/api/datas/eco-value?page={page}&size=200&sourceCode={quote(datum["bloombergTicker"])}&sourceType=BLOOMBERG&startDate={format_date(start_date)}' def store_date(self, datumid, datas: List[dict]): save_datas = [{ 'eco_id': datumid, 'date': dt.fromtimestamp(x['date'] / 1000), 'indicator': x['close'], 'release_date': dt.fromtimestamp(x['releaseDate'] / 1000), } for x in datas if 'releaseDate' in x] if save_datas: red.batch_insert(save_datas) @component(bean_name='navs-sync') class FundNavSync(JDCDataSync): def __init__(self): super(FundNavSync, self).__init__() self._subject_keys = self.find_jdc_subject_key() @property def datum_type(self) -> DatumType: return DatumType.FUND def datum_start_date(self, datum_id): last = rfn.get_last_one(fund_id=datum_id) return next_workday(last['nav_date']) if last else self.start_date def build_urls(self, datum, start_date, page=0) -> str: if datum['id'] not in self._subject_keys: return None key = self._subject_keys[datum['id']] return f'http://jdcprod.thiztech.com/api/datas/asset-value?subjectKeys={key}&page={page}&size=200&sourceType=TW&startDate={format_date(start_date)}' def find_jdc_subject_key(self): funds = self._datum.get_datums(type=DatumType.FUND) funds = {x['ftTicker']: x for x in self._datum.get_datums(type=DatumType.FUND)} response = requests.get('http://jdcprod.thiztech.com/api/subject?busiField=TW&sourceType=TW&subjectType=FUND') response = response.json() if not response['success']: raise CollectError(f'''find fund subject failed: {response['status']}''') return {funds[x['fundId']]['id']: x['key'] for x in response['body']['content'] if x['fundId'] in funds} def store_date(self, datumid, datas: List[dict]): save_navs = [{ 'fund_id': datumid, 'nav_date': dt.fromtimestamp(x['date'] / 1000), 'av': x['originValue'], 'div': x['dividend'] if 'dividend' in x else 0, 'split': x['split'] if 'split' in x else 1, 'accrue_split': x['totalSplit'] if 'totalSplit' in x else 1, 'av_p': x['postValue'], 'div_p': x['postDividend'] if 'postDividend' in x else 0, 'nav_cal': x['calibrateValue'] } for x in datas if is_workday(dt.fromtimestamp(x['date'] / 1000))] if save_navs: rfn.batch_insert(save_navs)