diff --git a/api.py b/api.py index 99c35d61b94449a7ae7273bba353510e4467ec8e..d74873c4b0d9a1c7a2d5a79e20e318e296864881 100644 --- a/api.py +++ b/api.py @@ -125,6 +125,15 @@ class Navs(ABC): ''' pass + @abstractmethod + def get_nav_start_date(self, fund_ids = None): + ''' + èŽ·å–æŒ‡å®šid资产的净值开始时间 + :param fund_ids: 指定id资产,如果为None,则返回全部资产的开始时间 + :return: 资产的开始时间å—å…¸ + ''' + pass + @abstractmethod def get_index_close(self, datum_ids=None, min_date=None, max_date=None, ticker=None): ''' @@ -350,6 +359,26 @@ class PortfoliosHolder(ABC): ''' pass + @abstractmethod + def get_last_rebalance_date(self, risk: PortfoliosRisk, max_date=None, signal_id=None): + ''' + èŽ·å–æœ€åŽä¸€æ¬¡å®žé™…调仓的时间 + :param risk: æŒä»“风险ç‰çº§ç±»åž‹ï¼Œå¿…é¡» + :param max_date: 指定日期之å‰çš„æœ€åŽä¸€æ¬¡ï¼Œå¯é€‰ + :param signal_id: 指定信å·çš„æœ€åŽä¸€æ¬¡è°ƒä»“,å¯é€‰ + :return: 最åŽä¸€æ¬¡å®žé™…调仓的日期 + ''' + pass + + @property + @abstractmethod + def interval_days(self): + ''' + 返回实际交易的最å°é—´éš”交易日数 + :return: 实际交易的最å°é—´éš”交易日数 + ''' + pass + class DriftSolver(ABC): ''' @@ -402,10 +431,9 @@ class RebalanceRuler(ABC): pass @abstractmethod - def cancel_signal(self, sign_id): + def commit_signal(self, sign_id): ''' - å–æ¶ˆä¿¡å·ID为已消费状æ€ï¼Œå³è®¾ç½®ä¿¡å·ä¸ºæœªæ¶ˆè´¹çŠ¶æ€ + æäº¤ä¿¡å·IDä¸ºå·²æ¶ˆè´¹çŠ¶æ€ :param sign_id: ä¿¡å·ID - :return å–æ¶ˆæˆåŠŸåˆ™è¿”å›žTrue, å¦åˆ™è¿”回False ''' pass diff --git a/asset_pool/asset_optimize.py b/asset_pool/asset_optimize.py index ef29877ab0ffdd1ff33183c93bc4b584a3e98f56..e8f8fc516bd7eda016a10af30a020f27ce8c7aeb 100644 --- a/asset_pool/asset_optimize.py +++ b/asset_pool/asset_optimize.py @@ -1,14 +1,15 @@ import json from abc import ABC, abstractmethod -from datetime import datetime as dt +from datetime import datetime as dt, timedelta import pandas as pd +import numpy as np from dateutil.relativedelta import relativedelta from empyrical import sortino_ratio from api import AssetOptimize, Navs, Datum, AssetPoolType from asset_pool.dao import robo_assets_pool as rop -from framework import filter_weekend, dict_remove, get_config, component, autowired, get_quarter_start +from framework import filter_weekend, dict_remove, get_config, component, autowired, get_quarter_start, next_workday, is_workday class SortinoAssetOptimize(AssetOptimize, ABC): @@ -20,15 +21,22 @@ class SortinoAssetOptimize(AssetOptimize, ABC): 'name': [f"sortino_{y[1]}_{y[0]}" for y in x.items() if y[0] != 'weight'][0] } for x in optimize_config['sortino-weight']] if 'sortino-weight' in optimize_config else [] + @property + def delta_kwargs(self): + result = [] + for item in self._config: + delta_kwargs = item.copy() + del delta_kwargs['weight'], delta_kwargs['name'] + result.append(delta_kwargs) + return result + def find_optimize(self, fund_ids, day): assert self._config, "find optimize, but not found sortino config." pct_change = pd.DataFrame(self.get_pct_change(fund_ids, day)) pct_change.set_index('date', inplace=True) sortino = pd.DataFrame() for item in self._config: - delta_kwargs = item.copy() - del delta_kwargs['weight'], delta_kwargs['name'] - ratio = dict(sortino_ratio(pct_change.truncate(before=(day - relativedelta(**delta_kwargs))))) + ratio = dict(sortino_ratio(pct_change.truncate(before=(day - relativedelta(**dict_remove(item, ('weight', 'name'))))))) sortino = pd.concat([sortino, pd.DataFrame([ratio], index=[item['name']])]) sortino = sortino.T sortino['score'] = sortino.apply(lambda r: sum([x['weight'] * r[x['name']] for x in self._config]), axis=1) @@ -38,17 +46,30 @@ class SortinoAssetOptimize(AssetOptimize, ABC): def get_optimize_pool(self, day): last_one = rop.get_last_one(day=day, type=AssetPoolType.OPTIMIZE) start = get_quarter_start(day or dt.today()) - if not last_one or start > last_one['date']: + if not last_one or start > last_one['date'] or self.has_incept_asset(last_one['date'] + timedelta(1), day): pool = [] + min_dates = self.nav_min_dates + max_incept_date = sorted([(day - relativedelta(**x)) for x in self.delta_kwargs])[0] + max_incept_date = max_incept_date if is_workday(max_incept_date) else next_workday(max_incept_date) for fund_group in self.get_groups(): + fund_group = [x for x in fund_group if min_dates[x] <= max_incept_date] if len(fund_group) > 1: - pool.append(self.find_optimize(fund_group, day)) - else: + pool.append(self.find_optimize(tuple(fund_group), day)) + elif len(fund_group) == 1: pool.append(fund_group[0]) rop.insert(day, AssetPoolType.OPTIMIZE, sorted(pool)) last_one = rop.get_last_one(day=day, type=AssetPoolType.OPTIMIZE) return json.loads(last_one['asset_ids']) + @abstractmethod + def has_incept_asset(self, start_date, end_date): + pass + + @property + @abstractmethod + def nav_min_dates(self) -> dict: + pass + @abstractmethod def get_groups(self): ''' @@ -79,8 +100,18 @@ class FundSortinoAssetOptimize(SortinoAssetOptimize): self._navs = navs self._datum = datum + @property + def nav_min_dates(self) -> dict: + return self._navs.get_nav_start_date() + + def has_incept_asset(self, start_date, end_date): + start_date = sorted([(start_date - relativedelta(**x)) for x in self.delta_kwargs])[0] + end_date = sorted([(end_date - relativedelta(**x)) for x in self.delta_kwargs])[0] + return len([x for x in self.nav_min_dates.items() if start_date <= x[1] <= end_date]) > 0 + def get_groups(self): funds = pd.DataFrame(self._datum.get_fund_datums()) + min_dates = self._navs.get_nav_start_date() result = [] for (category, asset_type), fund_group in funds.groupby(by=['category', 'assetType']): result.append(tuple(fund_group['id'])) @@ -92,10 +123,12 @@ class FundSortinoAssetOptimize(SortinoAssetOptimize): start = filter_weekend( sorted([day - relativedelta(days=1, **dict_remove(x, ('weight', 'name'))) for x in self._config])[0]) fund_navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=tuple(fund_ids), min_date=start, max_date=day)) - fund_navs.sort_values('nav_date', inplace=True) - fund_navs = fund_navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal') - fund_navs.fillna(method='ffill', inplace=True) - result = round(fund_navs.pct_change().dropna(), 4) - result.reset_index(inplace=True) - result.rename(columns={'nav_date': 'date'}, inplace=True) - return result.to_dict('records') + if not fund_navs.empty: + fund_navs.sort_values('nav_date', inplace=True) + fund_navs = fund_navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal') + fund_navs.fillna(method='ffill', inplace=True) + result = round(fund_navs.pct_change().dropna(), 4) + result.reset_index(inplace=True) + result.rename(columns={'nav_date': 'date'}, inplace=True) + return result.to_dict('records') + return [] diff --git a/asset_pool/asset_risk.py b/asset_pool/asset_risk.py index 47f97d536d92d1ab4b4dc6872c590eca649ba6f5..fe4eed1a699cbbfeb37386c2fb89e8f35fa87d38 100644 --- a/asset_pool/asset_risk.py +++ b/asset_pool/asset_risk.py @@ -1,5 +1,4 @@ import json -import logging from datetime import datetime as dt import pandas as pd @@ -8,9 +7,9 @@ from scipy.stats import norm from api import AssetRisk, Navs, AssetRiskDateType as DateType, Datum, AssetPoolType from asset_pool.dao import asset_risk_dates as ard, asset_ewma_value as aev, robo_assets_pool as rap -from framework import component, autowired, get_config, format_date, block_execute +from framework import component, autowired, get_config, format_date, block_execute, get_logger -logger = logging.getLogger(__name__) +logger = get_logger(__name__) def get_risk_start_date(): @@ -44,18 +43,20 @@ class CvarEwmaAssetRisk(AssetRisk): asset_pool = rap.get_one(day, AssetPoolType.RISK) if asset_pool: return id in json.loads(asset_pool['asset_ids']) - self.build_risk_date(id, day) - last = ard.get_last_one(id, day) - return DateType(last['type']) is DateType.START_DATE if last else False + last = ard.get_last_one(fund_id=id) + if last and last['date'] < day: + self.build_risk_date(id, day) + result = ard.get_last_one(id, day) + return DateType(result['type']) is DateType.START_DATE if result else True def build_risk_date(self, asset_id, day=dt.today()): risk_date = not None try: - logger.info(f"start build risk date for asset[{asset_id}] to date[{format_date(day)}]") + logger.debug(f"start build risk date for asset[{asset_id}] to date[{format_date(day)}]") while risk_date is not None: risk_date = self.get_next_date(asset_id, day=day) except Exception as e: - logger.exception(f"build risk date for asset[{asset_id}] after date[{risk_date}] error", e) + logger.exception(f"build risk date for asset[{asset_id}] after date[{risk_date}] to date[{format_date(day)}] error", e) def get_next_date(self, asset_id, day=dt.today()): last = ard.get_last_one(asset_id, day) @@ -120,10 +121,12 @@ class CvarEwmaAssetRisk(AssetRisk): def get_income_return(self, asset_id, min_date=None, max_date=None): fund_navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=asset_id, max_date=max_date)) - fund_navs['rtn'] = fund_navs['nav_cal'] / fund_navs['nav_cal'].shift(self._config['rtn-days']) - 1 - fund_navs.dropna(inplace=True) - if min_date: - fund_navs = fund_navs[fund_navs.nav_date >= pd.to_datetime(min_date)] - fund_navs.rename(columns={'nav_date': 'date'}, inplace=True) - fund_navs = fund_navs[['date', 'rtn']] - return fund_navs.to_dict('records') + if not fund_navs.empty: + fund_navs['rtn'] = fund_navs['nav_cal'] / fund_navs['nav_cal'].shift(self._config['rtn-days']) - 1 + fund_navs.dropna(inplace=True) + if min_date: + fund_navs = fund_navs[fund_navs.nav_date >= pd.to_datetime(min_date)] + fund_navs.rename(columns={'nav_date': 'date'}, inplace=True) + fund_navs = fund_navs[['date', 'rtn']] + return fund_navs.to_dict('records') + return [] diff --git a/asset_pool/dao/asset_risk_dates.py b/asset_pool/dao/asset_risk_dates.py index 19242ad40ac458f653f69fddf454213ee2911dbb..0843bbd1068f5c3ee90eb49d2d7afb4a68d4934a 100644 --- a/asset_pool/dao/asset_risk_dates.py +++ b/asset_pool/dao/asset_risk_dates.py @@ -18,7 +18,7 @@ def insert(asset_id, type: DateType, date): @read(one=True) -def get_last_one(fund_id, date, type: DateType = None): +def get_last_one(fund_id, date=None, type: DateType = None): kwargs = { 'ard_asset_id': fund_id, 'ard_type': type.value if type is not None else None diff --git a/basic/dao/robo_fund_navs.py b/basic/dao/robo_fund_navs.py index cbf02cc8248505e801ae655bbd1f0035ee4a018d..47f191fe6ca29c42366939d39a68e3e17181d5c9 100644 --- a/basic/dao/robo_fund_navs.py +++ b/basic/dao/robo_fund_navs.py @@ -18,3 +18,11 @@ def get_navs(fund_id=None, min_date=None, max_date=None): select {','.join([f"`{x[0]}` as `{x[1]}`" for x in __COLUMNS__.items()])} from robo_fund_navs {where(*sqls, rfn_fund_id=to_tuple(fund_id))} order by rfn_fund_id, rfn_date ''' + + +@read +def get_min_dates(fund_ids=None): + return f''' + select rfn_fund_id as fund_id, min(rfn_date) as min_date from robo_fund_navs {where(rfn_fund_id=to_tuple(fund_ids))} + group by rfn_fund_id + ''' diff --git a/basic/datum.py b/basic/datum.py index c5d9bcc35e0fb364c8c99fc71c29cacee78bd4a2..6024d759190b48c39a1c6b2e785f171b0554c335 100644 --- a/basic/datum.py +++ b/basic/datum.py @@ -2,16 +2,23 @@ import json from api import DatumType, Datum, PortfoliosRisk from basic.dao import robo_base_datum as rbd -from framework import component, parse_date +from framework import component, parse_date, get_config @component class DefaultDatum(Datum): + def __init__(self): + self._config = get_config(__name__) + + @property + def excludes(self): + return self._config['excludes'] if 'excludes' in self._config else [] + def get_fund_datums(self, crncy=None, risk=None, fund_ids=None): result = rbd.get_base_datums(type=DatumType.FUND, crncy=crncy, risk=risk, datum_ids=fund_ids) result = [{**json.loads(x['datas']), 'id': x['id']} for x in result] - return [{**x, 'inceptDate': parse_date(x['inceptDate'])} for x in result] + return [{**x, 'inceptDate': parse_date(x['inceptDate'])} for x in result if x['bloombergTicker'] not in self.excludes] def get_index_datums(self, ticker=None, index_ids=None): result = rbd.get_base_datums(type=DatumType.INDEX, ticker=ticker, datum_ids=index_ids) diff --git a/basic/navs.py b/basic/navs.py index 560ba03a2710025e65acec44ea0a905d52726164..ed13fa86c01e2d0cd7ad68b1192b351f4c2199dd 100644 --- a/basic/navs.py +++ b/basic/navs.py @@ -15,7 +15,7 @@ class DefaultNavs(Navs): def get_fund_navs(self, fund_ids=None, min_date=None, max_date=None): navs = rfn.get_navs(fund_id=fund_ids, min_date=min_date, max_date=max_date) - if 'exrate' in self._config: + if navs and 'exrate' in self._config: navs = pd.DataFrame(navs) navs = navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal') for exrate_config in self._config['exrate']: @@ -33,6 +33,9 @@ class DefaultNavs(Navs): navs = navs.to_dict('records') return navs + def get_nav_start_date(self, fund_ids = None): + return {x['fund_id']: x['min_date'] for x in rfn.get_min_dates(fund_ids=fund_ids)} + def get_index_close(self, datum_ids=None, min_date=None, max_date=None, ticker=None): datum_ids = to_tuple(datum_ids) if ticker: diff --git a/config.yml b/config.yml index 99d138a7c485442669cda1a812e9666ffe42e615..83de1bd74df70daf9ac5a4be8329920bb5288993 100644 --- a/config.yml +++ b/config.yml @@ -46,8 +46,18 @@ framework: level: INFO handlers: [ console ] main: - start-date: 2022-09-01 + start-date: 2008-01-02 basic: + datum: + excludes: + - 'FKUQX US Equity' + - 'FTAAUSH LX Equity' + - 'FTJAPAU LX Equity' + - 'TEGAUH1 LX Equity' + - 'TMEEAAU LX Equity' + - 'TEUSAAU LX Equity' + - 'FTEAUH1 LX Equity' + - 'TFIAAUS LX Equity' navs: exrate: - from: EUR @@ -98,17 +108,20 @@ portfolios: poem: cvar-scale-factor: 0.1 right_side: + asset-count: [3, 5] navs: risk: [1, 2] exclude-asset-type: ['STOCK', 'BALANCED'] mpt: quantile: 0.3 crisis_1: + asset-count: [3, 5] navs: risk: [1, 2] mpt: quantile: 0.1 crisis_2: + asset-count: [3, 5] navs: risk: [ 1, 2 ] mpt: @@ -121,7 +134,7 @@ rebalance: high-weight: coef: 0.2 ruler: - disable-period: #自然日 + disable-period: normal: 10 crisis_1: 15 crisis_2: 15 @@ -131,7 +144,7 @@ rebalance: date: 2022-09-01 crisis-signal: exp-years: 3 - exp-init: 2022-03-04 + exp-init: 2008-01-01 inversion-years: 1 inversion-threshold: 0.3 crisis-1: @@ -145,6 +158,7 @@ rebalance: rtn-days: 5 min-threshold: -0.05 coef: 0.95 + cvar-min-volume: 30 curve-drift: diff-threshold: 0.4 init-factor: 0.000000002 diff --git a/framework/__init__.py b/framework/__init__.py index 7e8d261c48161be79a0a1ae9456e0dc806097aa3..c230c4bad0316208860b530ed8d2e05987eaa395 100644 --- a/framework/__init__.py +++ b/framework/__init__.py @@ -1,6 +1,6 @@ from .date_utils import * from .base import * -from .database import read, write, transaction, where, to_columns +from .database import read, write, transaction, where, mapper_columns from .env_config import config, get_config from .logs import build_logger, get_logger from .injectable import component, autowired, get_instance, init_injectable as _init_injectable diff --git a/framework/database.py b/framework/database.py index 698cf916ee683c0d03af847cec9cfdb9db6bdde1..b95dde49c147802ae79c5601451cad24a98cb709 100644 --- a/framework/database.py +++ b/framework/database.py @@ -1,4 +1,5 @@ import functools +import json import threading from enum import Enum @@ -154,5 +155,12 @@ def where(*args, **kwargs) -> str: return f"where {' and '.join(result)}" if result else '' -def to_columns(columns: dict, datas: dict) -> dict: - return dict([(x[0], datas[x[1]]) for x in columns.items()]) +def mapper_columns(datas: dict, columns: dict) -> dict: + datas = {x[0]: datas[x[1]] for x in columns.items() if x[1] in datas and datas[x[1]] is not None} + return { + **datas, + **{x[0]: format_date(x[1]) for x in datas.items() if isinstance(x[1], datetime)}, + **{x[0]: x[1].value for x in datas.items() if isinstance(x[1], Enum)}, + **{x[0]: json.dumps(x[1]) for x in datas.items() if isinstance(x[1], dict)}, + **{x[0]: (1 if x[1] else 0) for x in datas.items() if isinstance(x[1], bool)} + } diff --git a/framework/date_utils.py b/framework/date_utils.py index 1df13dcda06266a45f0765a376e18a60bec36249..ceca18e9ed207f5182e3fb5d27706263fbff41fc 100644 --- a/framework/date_utils.py +++ b/framework/date_utils.py @@ -1,11 +1,13 @@ import calendar from datetime import timedelta, datetime, date +import pandas as pd + def filter_weekend(day): while calendar.weekday(day.year, day.month, day.day) in [5, 6]: day = day - timedelta(1) - return day + return pd.to_datetime(day) def next_workday(day): @@ -15,10 +17,21 @@ def next_workday(day): return result +def prev_workday(day): + result = day + while result == day or result.weekday() in [5, 6]: + result = result - timedelta(1) + return result + + def is_workday(day): return day.weekday() in range(5) +def workday_range(start, end): + return [datetime.combine(x.date(), datetime.min.time()) for x in pd.date_range(start, end) if is_workday(x)] + + def format_date(date, has_time=False): return date.strftime('%Y-%m-%d %H:%M:%S' if has_time else '%Y-%m-%d') @@ -33,4 +46,4 @@ def get_quarter_start(today=datetime.today()): if __name__ == '__main__': - print(get_quarter_start()) + print(workday_range(datetime.today() - timedelta(1), datetime.today())) diff --git a/portfolios/dao/mysql.sql b/portfolios/dao/mysql.sql index 0e7b60265c59e6c6b4f4ca9c0adfffdf9054af6f..47abf7e433614e0fd751f30b4ca0a926715da5fa 100644 --- a/portfolios/dao/mysql.sql +++ b/portfolios/dao/mysql.sql @@ -26,7 +26,7 @@ CREATE TABLE IF NOT EXISTS robo_hold_portfolios rhp_rrs_id BIGINT UNSIGNED DEFAULT NULL COMMENT '调仓信å·id', rhp_rebalance TINYINT NOT NULL DEFAULT 0 COMMENT '是å¦è°ƒä»“', rhp_portfolios JSON NOT NULL COMMENT '投组信æ¯', - rhp_nav DOUBLE NOT NULL COMMENT '资产值', + rhp_nav DOUBLE(12, 4) NOT NULL COMMENT '资产值', rhp_create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, rhp_update_time DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (rhp_id), diff --git a/portfolios/dao/robo_hold_portfolios.py b/portfolios/dao/robo_hold_portfolios.py index 024b3ea09755e049d631bfdbb528e073c73d5574..d728c4f1c65ae285b64189a8418e5ecc54fae012 100644 --- a/portfolios/dao/robo_hold_portfolios.py +++ b/portfolios/dao/robo_hold_portfolios.py @@ -1,4 +1,4 @@ -from framework import read, where, write +from framework import read, where, write, format_date, mapper_columns from api import PortfoliosRisk __COLUMNS__ = { @@ -17,9 +17,29 @@ def get_one(day, risk: PortfoliosRisk): return f'''select {','.join([f'{x[0]} as {x[1]}' for x in __COLUMNS__.items()])} from robo_hold_portfolios {where(rhp_date=day, rhp_risk=risk)}''' +@read(one=True) +def get_last_one(risk: PortfoliosRisk, max_date=None, rebalance: bool = None, signal_id=None): + sql = "rhp_date <= '{format_date(max_date)}'" if max_date else None + return f''' + select {','.join([f'{x[0]} as {x[1]}' for x in __COLUMNS__.items()])} from robo_hold_portfolios + {where(sql, rhp_risk=risk, rhp_rrs_id=signal_id, rhp_rebalance=rebalance)} + order by rhp_date desc limit 1 + ''' + + def get_count(risk: PortfoliosRisk = None): @read(one=True) def exec(): return f'''select count(*) as `count` from robo_hold_portfolios {where(rhp_risk=risk)}''' + result = exec() return result['count'] + + +@write +def insert(datas): + datas = mapper_columns(datas=datas, columns=__COLUMNS__) + return f''' + insert into robo_hold_portfolios({','.join([x for x in datas.keys()])}) + values ({','.join([f"'{x[1]}'" for x in datas.items()])}) + ''' diff --git a/portfolios/dao/robo_mpt_portfolios.py b/portfolios/dao/robo_mpt_portfolios.py index 40a223e70fe1915f7f36519bf47da6a040fbeb1b..49624109f4ec36488b50a1f39a47f797b5f6d1c1 100644 --- a/portfolios/dao/robo_mpt_portfolios.py +++ b/portfolios/dao/robo_mpt_portfolios.py @@ -2,7 +2,7 @@ from datetime import datetime from enum import Enum from api import PortfoliosRisk, PortfoliosType -from framework import read, write, where, format_date +from framework import read, write, where, format_date, mapper_columns __COLUMNS__ = { 'rmp_id': 'id', @@ -17,12 +17,7 @@ __COLUMNS__ = { @write def insert(datas): - datas = {x[0]: datas[x[1]] for x in __COLUMNS__.items() if x[1] in datas and datas[x[1]] is not None} - datas = { - **datas, - **{x[0]: format_date(x[1]) for x in datas.items() if isinstance(x[1], datetime)}, - **{x[0]: x[1].value for x in datas.items() if isinstance(x[1], Enum)} - } + datas = mapper_columns(datas=datas, columns=__COLUMNS__) return f''' insert into robo_mpt_portfolios({','.join([x for x in datas.keys()])}) values ({','.join([f"'{x[1]}'" for x in datas.items()])}) diff --git a/portfolios/holder.py b/portfolios/holder.py index 822f6e08e08384de4d8f3e8bf08f36c32aad730b..1362d16bd014462dc7328d5fd0be455f02f81142 100644 --- a/portfolios/holder.py +++ b/portfolios/holder.py @@ -1,20 +1,41 @@ -from framework import component, autowired -from api import PortfoliosHolder, PortfoliosRisk, RebalanceRuler -from portfolios.dao import robo_hold_portfolios as rhp import json +import pandas as pd + +from api import PortfoliosHolder, PortfoliosRisk, RebalanceRuler, Navs, SignalType +from framework import ( + component, autowired, get_config, next_workday, filter_weekend, + prev_workday, transaction, workday_range, format_date, get_logger +) +from portfolios.dao import robo_hold_portfolios as rhp +from portfolios.utils import format_weight + +logger = get_logger(__name__) + + +def get_start_date(): + config = get_config('main') + return pd.to_datetime(filter_weekend(config['start-date'])) + @component(bean_name='next-re') class NextReblanceHolder(PortfoliosHolder): @autowired - def __init__(self, rebalance: RebalanceRuler): - self._rebalance = rebalance + def __init__(self, rule: RebalanceRuler, navs: Navs = None): + self._rule = rule + self._navs = navs + self._config = get_config(__name__) + + def get_last_rebalance_date(self, risk: PortfoliosRisk, max_date=None, signal_id=None): + assert risk, f"get last rebalance date, risk can not be none" + last = rhp.get_last_one(max_date=max_date, risk=risk, signal_id=signal_id, rebalance=True) + return last['date'] if last else None def get_portfolios_weight(self, day, risk: PortfoliosRisk): hold = rhp.get_one(day, risk) if hold: - result = json.loads(hold['portfolios']['weight']) + result = json.loads(hold['portfolios'])['weight'] return {int(x[0]): x[1] for x in result.items()} return None @@ -22,5 +43,78 @@ class NextReblanceHolder(PortfoliosHolder): return rhp.get_count(risk=risk) > 0 def build_hold_portfolio(self, day, risk: PortfoliosRisk): + last_nav = rhp.get_last_one(max_date=day, risk=risk) + start = next_workday(last_nav['date'] if last_nav else get_start_date()) + while start <= day: + logger.info(f"start to build hold portfolio for date[{format_date(start)}]") + signal = None + if last_nav: + last_re = rhp.get_last_one(max_date=start, risk=risk, rebalance=True) + if len(workday_range(last_re['date'], start)) > self.interval_days: + signal = self._rule.take_next_signal(prev_workday(start), risk) + else: + signal = self._rule.take_next_signal(prev_workday(start), risk) + if signal and not signal['effective']: + logger.info(f"start to rebalance hold portfolio for date[{format_date(start)}] " + f"with signal[{SignalType(signal['type']).name}]") + self.do_rebalance(start, risk, signal, last_nav) + elif last_nav and signal is None: + self.no_rebalance(start, risk, last_nav) + start = next_workday(start) + last_nav = rhp.get_last_one(max_date=day, risk=risk) + + @transaction + def do_rebalance(self, day, risk: PortfoliosRisk, signal, last_nav): + weight = {int(x[0]): x[1] for x in json.loads(signal['portfolio']).items()} + if last_nav: + share = {int(x): y for x, y in json.loads(last_nav['portfolios'])['share'].items()} + navs = self.get_navs(fund_ids=tuple(set(weight) | set(share)), day=day) + nav = round(sum([navs[x] * y for x, y in share.items()]), 4) + else: + nav = self.init_nav + navs = self.get_navs(fund_ids=tuple(weight), day=day) + share = {x: nav * w / navs[x] for x, w in weight.items()} + rhp.insert({ + 'date': day, + 'risk': risk, + 'signal_id': signal['id'], + 'rebalance': True, + 'portfolios': { + 'weight': weight, + 'share': share, + }, + 'nav': nav, + }) + self._rule.commit_signal(signal['id']) + + def no_rebalance(self, day, risk: PortfoliosRisk, last_nav): + share = {int(x): y for x, y in json.loads(last_nav['portfolios'])['share'].items()} + navs = self.get_navs(fund_ids=tuple(share), day=day) + nav = round(sum([navs[x] * y for x, y in share.items()]), 4) + weight = {x: round(y * navs[x] / nav, 2) for x, y in share.items()} + weight = format_weight(weight) + rhp.insert({ + 'date': day, + 'risk': risk, + 'signal_id': last_nav['signal_id'], + 'rebalance': False, + 'portfolios': { + 'weight': weight, + 'share': share, + }, + 'nav': nav, + }) + + def get_navs(self, day, fund_ids): + navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=fund_ids, max_date=day)) + navs = navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal') + navs.fillna(method='ffill', inplace=True) + return dict(navs.iloc[-1]) + + @property + def interval_days(self): + return self._config['min-interval-days'] - pass + @property + def init_nav(self): + return self._config['init-nav'] diff --git a/portfolios/solver.py b/portfolios/solver.py index fc760236d8baae14fc992c2a337f44bf84b6e950..c6f7fcea6f96cb28e146c4aaca1aa8b1fa8fc148 100644 --- a/portfolios/solver.py +++ b/portfolios/solver.py @@ -9,6 +9,7 @@ from pyomo.environ import * from api import SolverFactory as Factory, PortfoliosRisk, PortfoliosType, AssetPool, Navs, Solver, Datum from framework import component, autowired, get_config, get_logger +from portfolios.utils import format_weight logger = get_logger(__name__) @@ -164,22 +165,10 @@ class DefaultSolver(Solver): df_w = pd.DataFrame(data=weight_list, index=id_list, columns=['weight']) df_w.replace(0, NAN, inplace=True) df_w.dropna(axis=0, inplace=True) - df_w['weight'] = self.format_weight(df_w['weight']) + df_w['weight'] = pd.Series(format_weight(dict(df_w['weight']))) dict_w = df_w.to_dict()['weight'] return dict_w - @staticmethod - def format_weight(weight_series): - weight_series = weight_series.fillna(0) - minidx = weight_series[weight_series > 0].idxmin() - maxidx = weight_series.idxmax() - weight_series = weight_series.apply(lambda x: round(x, 2)) - if weight_series.sum() < 1: - weight_series[minidx] += 1 - weight_series.sum() - elif weight_series.sum() > 1: - weight_series[maxidx] += 1 - weight_series.sum() - return weight_series.apply(lambda x: round(float(x), 2)) - def calc_port_rtn(self, model): return sum([model.w[i]._value * self.rtn_annualized[i] for i in model.indices]) @@ -199,6 +188,7 @@ class DefaultSolver(Solver): count = self.get_config('asset-count') min_count = count[0] if isinstance(count, list) else count max_count = count[1] if isinstance(count, list) else count + min_count = min(min_count, len(self.rtn_annualized)) low_weight = self.get_config('mpt.low-weight') high_weight = self.get_config('mpt.high-weight') diff --git a/portfolios/test_case.py b/portfolios/test_case.py index fe960c7a58e770d0c556ac3872cf6bbd79456613..2e47f2adda5b8c8893cbabe24d664c749841f1f4 100644 --- a/portfolios/test_case.py +++ b/portfolios/test_case.py @@ -27,6 +27,7 @@ class PortfoliosTest(unittest.TestCase): @autowired(names={'hold': 'next-re'}) def test_build_hold(self, hold: PortfoliosHolder = None): + hold.build_hold_portfolio(parse_date('2009-01-01'), PortfoliosRisk.FT9) pass diff --git a/portfolios/utils.py b/portfolios/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..1d1b9e3e0a2946d7a4168fbf632dc5d4705c832b --- /dev/null +++ b/portfolios/utils.py @@ -0,0 +1,20 @@ +import pandas as pd + + +def format_weight(weight: dict) -> dict: + weight_series = pd.Series(weight) + weight_series = weight_series.fillna(0) + minidx = weight_series[weight_series > 0].idxmin() + maxidx = weight_series.idxmax() + weight_series = weight_series.apply(lambda x: round(x, 2)) + if weight_series.sum() == 1: + return dict(weight_series) + elif weight_series.sum() < 1: + weight_series[minidx] += 1 - weight_series.sum() + elif weight_series.sum() > 1: + weight_series[maxidx] += 1 - weight_series.sum() + return dict(weight_series.apply(lambda x: round(float(x), 2))) + + +if __name__ == '__main__': + print(format_weight({19: 0.13, 27: 0.17, 31: 0.35, 56: 0.36})) diff --git a/rebalance/base_signal.py b/rebalance/base_signal.py index 994a41086744792f103ef0855ec81f7edd7e633c..7de7c09877120abcf651a14080a85fd83a39fe90 100644 --- a/rebalance/base_signal.py +++ b/rebalance/base_signal.py @@ -12,6 +12,9 @@ class BaseRebalanceSignal(RebalanceSignal, ABC): self._builder = builder def get_signal(self, day, risk: PortfoliosRisk): + signal = rrs.get_one(type=self.signal_type, risk=risk, date=day) + if signal: + return signal trigger = self.is_trigger(day, risk) if trigger: portfolio = self._builder.get_portfolios(day, risk, self.signal_type.p_type) diff --git a/rebalance/dao/robo_rebalance_signal.py b/rebalance/dao/robo_rebalance_signal.py index 9b68b169a92643f9d2016bf22329567145466b0c..574e87f96b0db90b074b0d49d8108b7774eb1f36 100644 --- a/rebalance/dao/robo_rebalance_signal.py +++ b/rebalance/dao/robo_rebalance_signal.py @@ -1,4 +1,4 @@ -from framework import read, write, where, format_date +from framework import read, write, where, format_date, mapper_columns from api import SignalType, PortfoliosRisk import json from datetime import datetime @@ -21,6 +21,14 @@ def get_by_id(id): select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal {where(rrs_id=id)}''' +@read(one=True) +def get_one(type: SignalType, risk: PortfoliosRisk, date): + return f''' + select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal + {where(rrs_date=date, rrs_type=type, rrs_risk=risk)} + ''' + + @read(one=True) def get_first_after(type: SignalType, risk: PortfoliosRisk, min_date, effective=None): return f''' @@ -33,7 +41,7 @@ def get_first_after(type: SignalType, risk: PortfoliosRisk, min_date, effective= def get_last_one(max_date, risk: PortfoliosRisk, type: SignalType = None, effective=None): return f''' select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal - {where(f"rrs_date <= {format_date(max_date)}", rrs_type=type, rrs_risk=risk, rrs_effective=effective)} order by rrs_date desc limit 1 + {where(f"rrs_date <= '{format_date(max_date)}'", rrs_type=type, rrs_risk=risk, rrs_effective=effective)} order by rrs_date desc limit 1 ''' @@ -45,20 +53,9 @@ def get_count(risk: PortfoliosRisk = None, day=None, effective=None): return result['count'] -def format_datas(datas): - datas = {x[0]: datas[x[1]] for x in __COLUMNS__.items() if x[1] in datas and datas[x[1]] is not None} - return { - **datas, - **{x[0]: format_date(x[1]) for x in datas.items() if isinstance(x[1], datetime)}, - **{x[0]: x[1].value for x in datas.items() if isinstance(x[1], Enum)}, - **{x[0]: json.dumps(x[1]) for x in datas.items() if isinstance(x[1], dict)}, - **{x[0]: (1 if x[1] else 0) for x in datas.items() if isinstance(x[1], bool)} - } - - @write def insert(datas): - datas = format_datas(datas) + datas = mapper_columns(datas=datas, columns=__COLUMNS__) return f''' insert into robo_rebalance_signal({','.join([x for x in datas.keys()])}) values ({','.join([f"'{x[1]}'" for x in datas.items()])}) @@ -66,8 +63,8 @@ def insert(datas): @write -def update(id, dates): - datas = format_datas(dates) +def update(id, datas): + datas = mapper_columns(datas=datas, columns=__COLUMNS__) return f''' update robo_rebalance_signal set {','.join([f"{x[0]} = '{x[1]}'" for x in datas.items()])} diff --git a/rebalance/dao/robo_weight_drift.py b/rebalance/dao/robo_weight_drift.py index 3aabe82019bc1c7d68c6584cdf6851722d82e8b1..9daae49bc095320e683a6689c7aedd993b2e11da 100644 --- a/rebalance/dao/robo_weight_drift.py +++ b/rebalance/dao/robo_weight_drift.py @@ -1,4 +1,4 @@ -from framework import read, write, where, format_date +from framework import read, write, where, format_date, mapper_columns from api import PortfoliosRisk __COLUMNS__ = { @@ -23,17 +23,9 @@ def get_last_one(max_date, risk: PortfoliosRisk): ''' - - - @write def insert(datas): - datas = {x[0]: datas[x[1]] for x in __COLUMNS__.items() if x[1] in datas and datas[x[1]] is not None} - datas = { - **datas, - **{x[0]: format_date(x[1]) for x in datas.items() if isinstance(x[1], datetime)}, - **{x[0]: x[1].value for x in datas.items() if isinstance(x[1], Enum)} - } + datas = mapper_columns(datas=datas, columns=__COLUMNS__) return f''' insert into robo_weight_drift({','.join([x for x in datas.keys()])}) values ({','.join([f"'{x[1]}'" for x in datas.items()])}) diff --git a/rebalance/drift_solver.py b/rebalance/drift_solver.py index 8488774f09af8ed4eaee30248fdc386ea66c60aa..b45c26865e02cc42e372cbd72d893adabd96f28a 100644 --- a/rebalance/drift_solver.py +++ b/rebalance/drift_solver.py @@ -1,6 +1,6 @@ -from api import DriftSolver, PortfoliosRisk -from framework import component, autowired, get_config -from rebalance.dao import robo_rebalance_signal as rrs +from api import DriftSolver, PortfoliosRisk, PortfoliosBuilder, Datum +from framework import component, autowired, get_config, workday_range, filter_weekend, next_workday +from rebalance.dao import robo_rebalance_signal as rrs, robo_weight_drift as rwd def get_start_date(): @@ -48,10 +48,10 @@ class PortfolioHighWeight(DriftSolver): last_one = rwd.get_last_one(max_date=day, risk=risk) start = (next_workday(last_one['date'])) if last_one else get_start_date() last_drift = last_one['drift'] if last_one else 0 - for date in [x for x in pd.date_range(start, day, freq='D') if is_workday(x)]: + for date in workday_range(start, day): portfolio = self._builder.get_portfolios(date, risk) weight = round(sum([x[1] for x in portfolio.items() if x[0] in datum_ids]), 2) - last_drift = (weight * self.drift_coef + (1 - self.drift_coef) * last_drift) if last_drift else weight + last_drift = round((weight * self.drift_coef + (1 - self.drift_coef) * last_drift) if last_drift else weight, 2) rwd.insert({ 'date': date, 'risk': risk, diff --git a/rebalance/ruler.py b/rebalance/ruler.py index 77302fa6a4e07ce369a1b0de79b4b189c5aba172..7c473eaf0c40dc86192fdef270ee5eb607af2791 100644 --- a/rebalance/ruler.py +++ b/rebalance/ruler.py @@ -1,5 +1,5 @@ -from framework import component, autowired, get_config -from api import RebalanceRuler, PortfoliosRisk, RebalanceSignal, SignalType, PortfoliosType +from framework import component, autowired, get_config, workday_range, next_workday +from api import RebalanceRuler, PortfoliosRisk, RebalanceSignal, SignalType, PortfoliosType, PortfoliosHolder from typing import List from rebalance.dao import robo_rebalance_signal as rrs @@ -22,8 +22,9 @@ class LevelRebalanceRuler(RebalanceRuler): ''' @autowired - def __init__(self, signals: List[RebalanceSignal] = None): + def __init__(self, signals: List[RebalanceSignal] = None, hold: PortfoliosHolder = None): self._signals = signals + self._hold = hold self._config = get_config(__name__) @property @@ -33,17 +34,33 @@ class LevelRebalanceRuler(RebalanceRuler): def take_next_signal(self, day, risk: PortfoliosRisk): last_re = rrs.get_last_one(max_date=day, risk=risk, effective=True) - if last_re and last_re['date'] == day: - return last_re - if last_re: - disable_period = self.disable_period[PortfoliosType(last_re['p_type'])] - signals = [x.get_signal(day, risk) for x in self._signals] - signals = sorted([x for x in signals if x is not None], key=lambda x: SignalType(x['type']).level) - if signals: - use_signal = signals[0] - rrs.update(use_signal['id'], {'effective': True}) - return use_signal + if not last_re: + builder = [x for x in self._signals if x.signal_type is SignalType.INIT][0] + return builder.get_signal(day, risk) + + long_signals = [x for x in self._signals if x.signal_type.p_type is not PortfoliosType.NORMAL and x.signal_type.level < SignalType(last_re['type']).level] + for long_signal in sorted(long_signals, key=lambda x: x.signal_type.level): + workdays = workday_range(next_workday(last_re['date']), day) + if len(workdays) <= self._hold.interval_days: + for date in workdays: + signal = long_signal.get_signal(date, risk) + if signal: + return signal + else: + signal = long_signal.get_signal(day, risk) + if signal: + return signal + if SignalType(last_re['type']).p_type in self.disable_period: + re_date = self._hold.get_last_rebalance_date(risk=risk, max_date=day) + if re_date: + workdays = workday_range(re_date, day) + if len(workdays) < self.disable_period[SignalType(last_re['type']).p_type]: + return None + for temp_signal in sorted([x for x in self._signals if x.signal_type.p_type is PortfoliosType.NORMAL], key=lambda x: x.signal_type.level): + signal = temp_signal.get_signal(day, risk) + if signal: + return signal return None - def cancel_signal(self, sign_id): - pass + def commit_signal(self, sign_id): + rrs.update(sign_id, {'effective': True}) diff --git a/rebalance/signals/curve_drift.py b/rebalance/signals/curve_drift.py index d61249966f035636a87ee905fd810153ce9cfaeb..6d9aaada8a8cde414e36bd0608b1fd95e4b7ba63 100644 --- a/rebalance/signals/curve_drift.py +++ b/rebalance/signals/curve_drift.py @@ -28,13 +28,13 @@ class CurveDrift(BaseRebalanceSignal): def is_trigger(self, day, risk: PortfoliosRisk) -> bool: last_re = rrs.get_last_one(max_date=day, risk=risk, effective=True) if last_re is None or SignalType(last_re['type']) in self.exclude_last_type: - return None + return False hr_datums = self._datum.get_high_risk_datums(risk) datum_ids = [x['id'] for x in hr_datums] normal_portfolio = self._builder.get_portfolios(day, risk) normal_weight = round(sum([x[1] for x in normal_portfolio.items() if x[0] in datum_ids]), 2) hold_portfolio = self._hold.get_portfolios_weight(day, risk) - hold_weight = round(sum([x[1] for x in normal_portfolio.items() if x[0] in datum_ids]), 2) + hold_weight = round(sum([x[1] for x in hold_portfolio.items() if x[0] in datum_ids]), 2) return normal_weight - hold_weight >= self._solver.get_drift(day, risk) @property diff --git a/rebalance/signals/high_low_buy.py b/rebalance/signals/high_low_buy.py index f3c1f0f5c2d20ff197ffda57d99270fb7edffebd..dac02ff258a096b7194b17b39a8b35bff59883a9 100644 --- a/rebalance/signals/high_low_buy.py +++ b/rebalance/signals/high_low_buy.py @@ -21,7 +21,8 @@ class HighBuySignal(BaseRebalanceSignal): SignalType.CRISIS_ONE, SignalType.CRISIS_TWO, SignalType.MARKET_RIGHT, - SignalType.LOW_BUY + SignalType.LOW_BUY, + SignalType.INIT ] @property @@ -40,7 +41,7 @@ class HighBuySignal(BaseRebalanceSignal): return False drift = self._solver.get_drift(day, risk) threshold = self.get_threshold(risk) - return drift > threshold[1] + return drift >= threshold[1] @component(bean_name='low-buy') @@ -51,7 +52,8 @@ class LowBuySignal(HighBuySignal): return [ SignalType.CRISIS_ONE, SignalType.CRISIS_TWO, - SignalType.MARKET_RIGHT + SignalType.MARKET_RIGHT, + SignalType.INIT ] @property @@ -64,4 +66,4 @@ class LowBuySignal(HighBuySignal): return False drift = self._solver.get_drift(day, risk) threshold = self.get_threshold(risk) - return threshold[0] < drift < threshold[1] + return threshold[0] <= drift < threshold[1] diff --git a/rebalance/signals/init_signal.py b/rebalance/signals/init_signal.py index 90e1096db5c87cd3ba7a92b63457a46fd32790e1..b7130d485d7ac424875bb62115b66fb7d04de99a 100644 --- a/rebalance/signals/init_signal.py +++ b/rebalance/signals/init_signal.py @@ -1,9 +1,14 @@ from api import PortfoliosRisk, SignalType -from framework import component +from framework import component, filter_weekend, get_config from rebalance.base_signal import BaseRebalanceSignal from rebalance.dao import robo_rebalance_signal as rrs +def get_start_date(): + config = get_config('main') + return filter_weekend(config['start-date']) + + @component(bean_name='init') class InitSignalBuilder(BaseRebalanceSignal): @@ -11,5 +16,11 @@ class InitSignalBuilder(BaseRebalanceSignal): def signal_type(self) -> SignalType: return SignalType.INIT + def get_signal(self, day, risk: PortfoliosRisk): + signal = rrs.get_last_one(max_date=day, risk=risk, type=SignalType.INIT) + if signal: + return None if signal['effective'] else signal + return super().get_signal(get_start_date(), risk) + def is_trigger(self, day, risk: PortfoliosRisk) -> bool: - return rrs.get_count(risk=risk) == 0 + return True diff --git a/rebalance/signals/right_side.py b/rebalance/signals/right_side.py index a75b247ba5710df3dadd5e6480b5c4433b5779a0..298e91a1b9bd9115b5bb624a246858b63d366cf5 100644 --- a/rebalance/signals/right_side.py +++ b/rebalance/signals/right_side.py @@ -32,9 +32,13 @@ class MarketRight(BaseRebalanceSignal): def signal_type(self) -> SignalType: return SignalType.MARKET_RIGHT + @property + def cvar_min_volume(self): + return self._config['cvar-min-volume'] + def is_trigger(self, day, risk: PortfoliosRisk) -> bool: last_re = rrs.get_last_one(risk=risk, max_date=day, effective=True) - if last_re is not None and SignalType(last_re['type']) in [SignalType.CRISIS_ONE, SignalType.CRISIS_TWO, SignalType.MARKET_RIGHT]: + if last_re is not None and SignalType(last_re['type']) in [SignalType.CRISIS_ONE, SignalType.CRISIS_TWO, SignalType.MARKET_RIGHT, SignalType.INIT]: return False spx = self.load_spx_close_rtns(day) if spx[-1]['rtn'] > self.min_threshold: @@ -48,11 +52,12 @@ class MarketRight(BaseRebalanceSignal): start_date = self.find_cvar_start_date(day, risk, spx=spx) if start_date: spx = pd.DataFrame(spx) - spx = spx[(pd_spx.date >= start_date) & (spx.date <= day)] - alpha = 1 - self.coef - mean = spx.rtn.mean() - std = spx.rtn.std() - return mean - std * norm.pdf(norm.ppf(alpha)) / alpha + spx = spx[(spx.date >= start_date) & (spx.date <= day)] + if len(spx) >= self.cvar_min_volume: + alpha = round(1 - self.coef, 2) + mean = spx.rtn.mean() + std = spx.rtn.std() + return mean - std * norm.pdf(norm.ppf(alpha)) / alpha return None def find_cvar_start_date(self, day, risk: PortfoliosRisk, spx = None): @@ -63,7 +68,7 @@ class MarketRight(BaseRebalanceSignal): last_buy = rrs.get_last_one(type=(SignalType.LOW_BUY, SignalType.HIGH_BUY), max_date=day, risk=risk, effective=True) if not last_buy or not last_right or last_buy['date'] <= last_right['date']: return None - spx = spx[(spx['date'] >= last_right['date']) & (spx['date'] <= last_buy)] + spx = spx[(spx['date'] >= last_right['date']) & (spx['date'] <= last_buy['date'])] if not spx.empty and len(spx) > 2: return spx.loc[spx.close.idxmin()].date return None