import json import os import pandas as pd from py_jftech import component, parse_date, get_config, to_tuple, get_project_path, transaction from api import DatumType, Datum, PortfoliosRisk, RoboExecutor from basic.dao import robo_base_datum as rbd @component(bean_name='datum') class DefaultDatum(Datum): def __init__(self): self._config = get_config(__name__) @property def excludes(self): excludes = self._config['excludes'] if 'excludes' in self._config else [] if isinstance(excludes, dict): excludes = excludes[RoboExecutor.use_name()] if RoboExecutor.use_name() in excludes else [] return excludes @property def change_date(self): change = self._config['change'] if 'change' in self._config else {} return pd.to_datetime(change['date']) if 'date' in change else None @property def change_file(self): change = self._config['change'] if 'change' in self._config else {} if 'file' not in change or change['file'] is None: return None file_path: str = change['file'] if file_path.startswith('.'): return os.path.abspath(os.path.join(os.path.dirname(__file__), file_path)) elif file_path.startswith('/'): return os.path.abspath(file_path) return os.path.abspath(os.path.join(get_project_path(), file_path)) def format_datum(self, datum): if DatumType(datum['type']) is DatumType.FUND: return { **datum, 'inceptDate': parse_date(datum.get('inceptDate')) if datum.get('inceptDate') else None } return datum def get_datums(self, type: DatumType = None, crncy=None, risk=None, datum_ids=None, ticker=None, exclude=True): datum_ids = to_tuple(datum_ids) if ticker: datums = rbd.get_base_datums(type=type, ticker=ticker) datum_ids = tuple(set(datum_ids or []) | {x['id'] for x in datums}) result = rbd.get_base_datums(type=type, crncy=crncy, risk=risk, datum_ids=datum_ids) result = [{**json.loads(x['datas']), 'id': x['id']} for x in result] return [self.format_datum(x) for x in result if not exclude or x['id'] in (datum_ids or []) or x['bloombergTicker'] not in self.excludes] def get_high_risk_datums(self, risk: PortfoliosRisk): risk3 = self.get_datums(type=DatumType.FUND, risk=3) if risk is PortfoliosRisk.FT3: return risk3 risk3 = [x for x in risk3 if x['assetType'] in ['STOCK', 'BALANCED', 'COMMODITY']] if risk is PortfoliosRisk.FT6: return risk3 + self.get_datums(type=DatumType.FUND, risk=4) if risk is PortfoliosRisk.FT9: return risk3 + self.get_datums(type=DatumType.FUND, risk=(4, 5)) return None @transaction def update_change(self, date): if self.change_date is not None and self.change_date == pd.to_datetime(date): if self.change_file is not None: for fund in pd.read_excel(self.change_file).to_dict('records'): db_data = rbd.get_base_datums(ticker=fund['bloombergTicker']) rbd.update_datum(db_data['id'], json.dumps({ **json.loads(db_data['datas']), **fund })) return True return False @component(bean_name='datum') class HkDatum(DefaultDatum): def get_datums(self, type: DatumType = None, crncy=None, risk=None, datum_ids=None, ticker=None, exclude=True): datas = super().get_datums(type, crncy, None, datum_ids, ticker, exclude) # 香港基金risk与现有risk有差异,全查出来再筛选 datas = [d for d in datas if d.get('hk')] # 进行值的覆盖操作 datas = [d.update(d['hk']) or d for d in datas] # 进行筛选操作 datas = [d for d in datas if not risk or d['risk'] == risk] return datas