import json from abc import ABC, abstractmethod from datetime import datetime as dt import pandas as pd from dateutil.relativedelta import relativedelta from empyrical import sortino_ratio from api import AssetOptimize, Navs, BusinessException, Datum, AssetPoolType from asset_pool.dao import robo_assets_pool as rop from framework import filter_weekend, dict_remove, get_config, component, autowired, get_quarter_start class SortinoAssetOptimize(AssetOptimize, ABC): def __init__(self): optimize_config = get_config(__name__) self._config = [{ **x, 'name': [f"sortino_{y[1]}_{y[0]}" for y in x.items() if y[0] != 'weight'][0] } for x in optimize_config['sortino-weight']] if 'sortino-weight' in optimize_config else [] def find_optimize(self, fund_ids, day): if not self._config: raise BusinessException(f"find optimize, but not found sortino config.") pct_change = pd.DataFrame(self.get_pct_change(fund_ids, day)) pct_change.set_index('date', inplace=True) sortino = pd.DataFrame() for item in self._config: delta_kwargs = item.copy() del delta_kwargs['weight'], delta_kwargs['name'] ratio = dict(sortino_ratio(pct_change.truncate(before=(day - relativedelta(**delta_kwargs))))) sortino = pd.concat([sortino, pd.DataFrame([ratio], index=[item['name']])]) sortino = sortino.T sortino['score'] = sortino.apply(lambda r: sum([x['weight'] * r[x['name']] for x in self._config]), axis=1) sortino.sort_values('score', ascending=False, inplace=True) return pct_change.columns[sortino.index[0]] def get_optimize_pool(self, day): last_one = rop.get_last_one(day=day, type=AssetPoolType.OPTIMIZE) start = get_quarter_start(day or dt.today()) if not last_one or start > last_one['date']: pool = [] for fund_group in self.get_groups(): if len(fund_group) > 1: pool.append(self.find_optimize(fund_group, day)) else: pool.append(fund_group[0]) rop.insert(day, AssetPoolType.OPTIMIZE, sorted(pool)) last_one = rop.get_last_one(day=day, type=AssetPoolType.OPTIMIZE) return json.loads(last_one['asset_ids']) @abstractmethod def get_groups(self): ''' :return: 返回待处理的id数组 ''' pass @abstractmethod def get_pct_change(self, fund_ids, day): ''' 根据id数组,返回指定日期的收益率 :param fund_ids: id数组 :param day: 指定的日期 :return: 收益率 ''' pass @component class FundSortinoAssetOptimize(SortinoAssetOptimize): ''' 根据索提诺比率计算基金优选的优选实现 ''' @autowired def __init__(self, navs: Navs = None, datum: Datum = None): super().__init__() self._navs = navs self._datum = datum def get_groups(self): funds = pd.DataFrame(self._datum.get_fund_datums()) result = [] for (category, asset_type), fund_group in funds.groupby(by=['category', 'assetType']): result.append(tuple(fund_group['id'])) return result def get_pct_change(self, fund_ids, day): if not self._config: raise BusinessException(f"find optimize, but not found sortino config.") start = filter_weekend( sorted([day - relativedelta(days=1, **dict_remove(x, ('weight', 'name'))) for x in self._config])[0]) fund_navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=tuple(fund_ids), min_date=start, max_date=day)) fund_navs.sort_values('nav_date', inplace=True) fund_navs = fund_navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal') fund_navs.fillna(method='ffill', inplace=True) result = round(fund_navs.pct_change().dropna(), 4) result.reset_index(inplace=True) result.rename(columns={'nav_date': 'date'}, inplace=True) return result.to_dict('records')