import pandas as pd from py_jftech import get_config, component, autowired, to_tuple from api import Navs, Datum, DatumType from basic.dao import robo_exrate as re, robo_fund_navs as rfn, robo_index_datas as rid, robo_eco_datas as red @component class DefaultNavs(Navs): @autowired def __init__(self, datum: Datum = None): self._config = get_config(__name__) self._datum = datum def get_fund_navs(self, fund_ids=None, min_date=None, max_date=None): navs = rfn.get_navs(fund_id=fund_ids, min_date=min_date, max_date=max_date) if navs and 'exrate' in self._config: navs = pd.DataFrame(navs) for exrate_config in self._config['exrate']: exrate = pd.DataFrame(re.get_exrates(ticker=exrate_config['ticker'], min_date=min_date, max_date=max_date)) exrate.rename(columns={'date': 'nav_date'}, inplace=True) exrate = exrate[['nav_date', 'close']] fund_ids = [x['id'] for x in self._datum.get_datums(type=DatumType.FUND, crncy=exrate_config['from'])] merged = pd.merge(navs, exrate, on='nav_date', how='left') merged.loc[merged['fund_id'].isin(fund_ids), 'nav_cal'] *= merged['close'] merged.loc[merged['fund_id'].isin(fund_ids), 'av'] *= merged['close'] merged.loc[merged['fund_id'].isin(fund_ids), 'dividend'] *= merged['close'] merged['nav_cal'] = merged['nav_cal'].round(4) # 四舍五入保留四位小数 merged['av'] = merged['av'].round(4) # 四舍五入保留四位小数 merged['dividend'] = merged['dividend'].round(4) # 四舍五入保留四位小数 navs = merged.drop('close', axis=1) navs = navs.to_dict('records') return navs def get_nav_start_date(self, fund_ids=None): return {x['fund_id']: x['min_date'] for x in rfn.get_min_dates(fund_ids=fund_ids)} def get_index_close(self, datum_ids=None, min_date=None, max_date=None, ticker=None): datum_ids = to_tuple(datum_ids) if ticker: datums = self._datum.get_datums(type=DatumType.INDEX, ticker=ticker) datum_ids = tuple(set(datum_ids or []) | {x['id'] for x in datums}) results = rid.get_list(index_ids=datum_ids, min_date=min_date, max_date=max_date) return [{'index_id': x['index_id'], 'date': x['date'], 'close': x['close']} for x in results] def get_last_index_close(self, max_date, datum_id=None, ticker=None, count=1): if not datum_id: assert ticker, "get last index close, datum_id and ticker give at least one" datum = self._datum.get_datums(type=DatumType.INDEX, ticker=ticker) datum_id = datum[0]['id'] if datum else None assert datum_id, "get last index close, datum id is not found" assert max_date, "get last index close, start_date is not found" if count == 1: last = rid.get_last_one(index_id=datum_id, max_date=max_date) return { 'index_id': last['index_id'], 'date': last['date'], 'close': last['close'] } if last else None else: last = rid.get_last(index_id=datum_id, max_date=max_date, count=count) return [{ 'index_id': x['index_id'], 'date': x['date'], 'close': x['close'] } for x in last] if last else None def get_eco_values(self, datum_ids=None, min_date=None, max_date=None, ticker=None, by_release_date=False): datum_ids = to_tuple(datum_ids) if ticker: datums = self._datum.get_datums(type=DatumType.ECO, ticker=ticker) datum_ids = tuple(set(list(datum_ids or []) | {x['id'] for x in datums})) return red.get_list(eco_ids=datum_ids, min_date=min_date, max_date=max_date, by_release_date=by_release_date) def get_last_eco_values(self, max_date, datum_id=None, ticker=None, count=1, by_release_date=False): if not datum_id: assert ticker, "get last eco close, datum_id and ticker give at least one" datum = self._datum.get_datums(type=DatumType.ECO, ticker=ticker) datum_id = datum[0]['id'] if datum else None assert datum_id, "get last eco close, datum id is not found" assert max_date, "get last eco close, start_date is not found" if count == 1: return red.get_last_one(eco_id=datum_id, max_date=max_date, by_release_date=by_release_date) else: return red.get_last(eco_id=datum_id, max_date=max_date, count=count, by_release_date=by_release_date)