Commit 3bcbe8b1 authored by jichao's avatar jichao

再平衡规则调整完毕

parent bf40eee2
......@@ -125,6 +125,15 @@ class Navs(ABC):
'''
pass
@abstractmethod
def get_nav_start_date(self, fund_ids = None):
'''
获取指定id资产的净值开始时间
:param fund_ids: 指定id资产,如果为None,则返回全部资产的开始时间
:return: 资产的开始时间字典
'''
pass
@abstractmethod
def get_index_close(self, datum_ids=None, min_date=None, max_date=None, ticker=None):
'''
......@@ -350,6 +359,26 @@ class PortfoliosHolder(ABC):
'''
pass
@abstractmethod
def get_last_rebalance_date(self, risk: PortfoliosRisk, max_date=None, signal_id=None):
'''
获取最后一次实际调仓的时间
:param risk: 持仓风险等级类型,必须
:param max_date: 指定日期之前的最后一次,可选
:param signal_id: 指定信号的最后一次调仓,可选
:return: 最后一次实际调仓的日期
'''
pass
@property
@abstractmethod
def interval_days(self):
'''
返回实际交易的最小间隔交易日数
:return: 实际交易的最小间隔交易日数
'''
pass
class DriftSolver(ABC):
'''
......@@ -402,10 +431,9 @@ class RebalanceRuler(ABC):
pass
@abstractmethod
def cancel_signal(self, sign_id):
def commit_signal(self, sign_id):
'''
取消信号ID为已消费状态,即设置信号为未消费状态
提交信号ID为已消费状态
:param sign_id: 信号ID
:return 取消成功则返回True, 否则返回False
'''
pass
import json
from abc import ABC, abstractmethod
from datetime import datetime as dt
from datetime import datetime as dt, timedelta
import pandas as pd
import numpy as np
from dateutil.relativedelta import relativedelta
from empyrical import sortino_ratio
from api import AssetOptimize, Navs, Datum, AssetPoolType
from asset_pool.dao import robo_assets_pool as rop
from framework import filter_weekend, dict_remove, get_config, component, autowired, get_quarter_start
from framework import filter_weekend, dict_remove, get_config, component, autowired, get_quarter_start, next_workday, is_workday
class SortinoAssetOptimize(AssetOptimize, ABC):
......@@ -20,15 +21,22 @@ class SortinoAssetOptimize(AssetOptimize, ABC):
'name': [f"sortino_{y[1]}_{y[0]}" for y in x.items() if y[0] != 'weight'][0]
} for x in optimize_config['sortino-weight']] if 'sortino-weight' in optimize_config else []
@property
def delta_kwargs(self):
result = []
for item in self._config:
delta_kwargs = item.copy()
del delta_kwargs['weight'], delta_kwargs['name']
result.append(delta_kwargs)
return result
def find_optimize(self, fund_ids, day):
assert self._config, "find optimize, but not found sortino config."
pct_change = pd.DataFrame(self.get_pct_change(fund_ids, day))
pct_change.set_index('date', inplace=True)
sortino = pd.DataFrame()
for item in self._config:
delta_kwargs = item.copy()
del delta_kwargs['weight'], delta_kwargs['name']
ratio = dict(sortino_ratio(pct_change.truncate(before=(day - relativedelta(**delta_kwargs)))))
ratio = dict(sortino_ratio(pct_change.truncate(before=(day - relativedelta(**dict_remove(item, ('weight', 'name')))))))
sortino = pd.concat([sortino, pd.DataFrame([ratio], index=[item['name']])])
sortino = sortino.T
sortino['score'] = sortino.apply(lambda r: sum([x['weight'] * r[x['name']] for x in self._config]), axis=1)
......@@ -38,17 +46,30 @@ class SortinoAssetOptimize(AssetOptimize, ABC):
def get_optimize_pool(self, day):
last_one = rop.get_last_one(day=day, type=AssetPoolType.OPTIMIZE)
start = get_quarter_start(day or dt.today())
if not last_one or start > last_one['date']:
if not last_one or start > last_one['date'] or self.has_incept_asset(last_one['date'] + timedelta(1), day):
pool = []
min_dates = self.nav_min_dates
max_incept_date = sorted([(day - relativedelta(**x)) for x in self.delta_kwargs])[0]
max_incept_date = max_incept_date if is_workday(max_incept_date) else next_workday(max_incept_date)
for fund_group in self.get_groups():
fund_group = [x for x in fund_group if min_dates[x] <= max_incept_date]
if len(fund_group) > 1:
pool.append(self.find_optimize(fund_group, day))
else:
pool.append(self.find_optimize(tuple(fund_group), day))
elif len(fund_group) == 1:
pool.append(fund_group[0])
rop.insert(day, AssetPoolType.OPTIMIZE, sorted(pool))
last_one = rop.get_last_one(day=day, type=AssetPoolType.OPTIMIZE)
return json.loads(last_one['asset_ids'])
@abstractmethod
def has_incept_asset(self, start_date, end_date):
pass
@property
@abstractmethod
def nav_min_dates(self) -> dict:
pass
@abstractmethod
def get_groups(self):
'''
......@@ -79,8 +100,18 @@ class FundSortinoAssetOptimize(SortinoAssetOptimize):
self._navs = navs
self._datum = datum
@property
def nav_min_dates(self) -> dict:
return self._navs.get_nav_start_date()
def has_incept_asset(self, start_date, end_date):
start_date = sorted([(start_date - relativedelta(**x)) for x in self.delta_kwargs])[0]
end_date = sorted([(end_date - relativedelta(**x)) for x in self.delta_kwargs])[0]
return len([x for x in self.nav_min_dates.items() if start_date <= x[1] <= end_date]) > 0
def get_groups(self):
funds = pd.DataFrame(self._datum.get_fund_datums())
min_dates = self._navs.get_nav_start_date()
result = []
for (category, asset_type), fund_group in funds.groupby(by=['category', 'assetType']):
result.append(tuple(fund_group['id']))
......@@ -92,10 +123,12 @@ class FundSortinoAssetOptimize(SortinoAssetOptimize):
start = filter_weekend(
sorted([day - relativedelta(days=1, **dict_remove(x, ('weight', 'name'))) for x in self._config])[0])
fund_navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=tuple(fund_ids), min_date=start, max_date=day))
fund_navs.sort_values('nav_date', inplace=True)
fund_navs = fund_navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
fund_navs.fillna(method='ffill', inplace=True)
result = round(fund_navs.pct_change().dropna(), 4)
result.reset_index(inplace=True)
result.rename(columns={'nav_date': 'date'}, inplace=True)
return result.to_dict('records')
if not fund_navs.empty:
fund_navs.sort_values('nav_date', inplace=True)
fund_navs = fund_navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
fund_navs.fillna(method='ffill', inplace=True)
result = round(fund_navs.pct_change().dropna(), 4)
result.reset_index(inplace=True)
result.rename(columns={'nav_date': 'date'}, inplace=True)
return result.to_dict('records')
return []
import json
import logging
from datetime import datetime as dt
import pandas as pd
......@@ -8,9 +7,9 @@ from scipy.stats import norm
from api import AssetRisk, Navs, AssetRiskDateType as DateType, Datum, AssetPoolType
from asset_pool.dao import asset_risk_dates as ard, asset_ewma_value as aev, robo_assets_pool as rap
from framework import component, autowired, get_config, format_date, block_execute
from framework import component, autowired, get_config, format_date, block_execute, get_logger
logger = logging.getLogger(__name__)
logger = get_logger(__name__)
def get_risk_start_date():
......@@ -44,18 +43,20 @@ class CvarEwmaAssetRisk(AssetRisk):
asset_pool = rap.get_one(day, AssetPoolType.RISK)
if asset_pool:
return id in json.loads(asset_pool['asset_ids'])
self.build_risk_date(id, day)
last = ard.get_last_one(id, day)
return DateType(last['type']) is DateType.START_DATE if last else False
last = ard.get_last_one(fund_id=id)
if last and last['date'] < day:
self.build_risk_date(id, day)
result = ard.get_last_one(id, day)
return DateType(result['type']) is DateType.START_DATE if result else True
def build_risk_date(self, asset_id, day=dt.today()):
risk_date = not None
try:
logger.info(f"start build risk date for asset[{asset_id}] to date[{format_date(day)}]")
logger.debug(f"start build risk date for asset[{asset_id}] to date[{format_date(day)}]")
while risk_date is not None:
risk_date = self.get_next_date(asset_id, day=day)
except Exception as e:
logger.exception(f"build risk date for asset[{asset_id}] after date[{risk_date}] error", e)
logger.exception(f"build risk date for asset[{asset_id}] after date[{risk_date}] to date[{format_date(day)}] error", e)
def get_next_date(self, asset_id, day=dt.today()):
last = ard.get_last_one(asset_id, day)
......@@ -120,10 +121,12 @@ class CvarEwmaAssetRisk(AssetRisk):
def get_income_return(self, asset_id, min_date=None, max_date=None):
fund_navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=asset_id, max_date=max_date))
fund_navs['rtn'] = fund_navs['nav_cal'] / fund_navs['nav_cal'].shift(self._config['rtn-days']) - 1
fund_navs.dropna(inplace=True)
if min_date:
fund_navs = fund_navs[fund_navs.nav_date >= pd.to_datetime(min_date)]
fund_navs.rename(columns={'nav_date': 'date'}, inplace=True)
fund_navs = fund_navs[['date', 'rtn']]
return fund_navs.to_dict('records')
if not fund_navs.empty:
fund_navs['rtn'] = fund_navs['nav_cal'] / fund_navs['nav_cal'].shift(self._config['rtn-days']) - 1
fund_navs.dropna(inplace=True)
if min_date:
fund_navs = fund_navs[fund_navs.nav_date >= pd.to_datetime(min_date)]
fund_navs.rename(columns={'nav_date': 'date'}, inplace=True)
fund_navs = fund_navs[['date', 'rtn']]
return fund_navs.to_dict('records')
return []
......@@ -18,7 +18,7 @@ def insert(asset_id, type: DateType, date):
@read(one=True)
def get_last_one(fund_id, date, type: DateType = None):
def get_last_one(fund_id, date=None, type: DateType = None):
kwargs = {
'ard_asset_id': fund_id,
'ard_type': type.value if type is not None else None
......
......@@ -18,3 +18,11 @@ def get_navs(fund_id=None, min_date=None, max_date=None):
select {','.join([f"`{x[0]}` as `{x[1]}`" for x in __COLUMNS__.items()])} from robo_fund_navs
{where(*sqls, rfn_fund_id=to_tuple(fund_id))} order by rfn_fund_id, rfn_date
'''
@read
def get_min_dates(fund_ids=None):
return f'''
select rfn_fund_id as fund_id, min(rfn_date) as min_date from robo_fund_navs {where(rfn_fund_id=to_tuple(fund_ids))}
group by rfn_fund_id
'''
......@@ -2,16 +2,23 @@ import json
from api import DatumType, Datum, PortfoliosRisk
from basic.dao import robo_base_datum as rbd
from framework import component, parse_date
from framework import component, parse_date, get_config
@component
class DefaultDatum(Datum):
def __init__(self):
self._config = get_config(__name__)
@property
def excludes(self):
return self._config['excludes'] if 'excludes' in self._config else []
def get_fund_datums(self, crncy=None, risk=None, fund_ids=None):
result = rbd.get_base_datums(type=DatumType.FUND, crncy=crncy, risk=risk, datum_ids=fund_ids)
result = [{**json.loads(x['datas']), 'id': x['id']} for x in result]
return [{**x, 'inceptDate': parse_date(x['inceptDate'])} for x in result]
return [{**x, 'inceptDate': parse_date(x['inceptDate'])} for x in result if x['bloombergTicker'] not in self.excludes]
def get_index_datums(self, ticker=None, index_ids=None):
result = rbd.get_base_datums(type=DatumType.INDEX, ticker=ticker, datum_ids=index_ids)
......
......@@ -15,7 +15,7 @@ class DefaultNavs(Navs):
def get_fund_navs(self, fund_ids=None, min_date=None, max_date=None):
navs = rfn.get_navs(fund_id=fund_ids, min_date=min_date, max_date=max_date)
if 'exrate' in self._config:
if navs and 'exrate' in self._config:
navs = pd.DataFrame(navs)
navs = navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
for exrate_config in self._config['exrate']:
......@@ -33,6 +33,9 @@ class DefaultNavs(Navs):
navs = navs.to_dict('records')
return navs
def get_nav_start_date(self, fund_ids = None):
return {x['fund_id']: x['min_date'] for x in rfn.get_min_dates(fund_ids=fund_ids)}
def get_index_close(self, datum_ids=None, min_date=None, max_date=None, ticker=None):
datum_ids = to_tuple(datum_ids)
if ticker:
......
......@@ -46,8 +46,18 @@ framework:
level: INFO
handlers: [ console ]
main:
start-date: 2022-09-01
start-date: 2008-01-02
basic:
datum:
excludes:
- 'FKUQX US Equity'
- 'FTAAUSH LX Equity'
- 'FTJAPAU LX Equity'
- 'TEGAUH1 LX Equity'
- 'TMEEAAU LX Equity'
- 'TEUSAAU LX Equity'
- 'FTEAUH1 LX Equity'
- 'TFIAAUS LX Equity'
navs:
exrate:
- from: EUR
......@@ -98,17 +108,20 @@ portfolios:
poem:
cvar-scale-factor: 0.1
right_side:
asset-count: [3, 5]
navs:
risk: [1, 2]
exclude-asset-type: ['STOCK', 'BALANCED']
mpt:
quantile: 0.3
crisis_1:
asset-count: [3, 5]
navs:
risk: [1, 2]
mpt:
quantile: 0.1
crisis_2:
asset-count: [3, 5]
navs:
risk: [ 1, 2 ]
mpt:
......@@ -121,7 +134,7 @@ rebalance:
high-weight:
coef: 0.2
ruler:
disable-period: #自然日
disable-period:
normal: 10
crisis_1: 15
crisis_2: 15
......@@ -131,7 +144,7 @@ rebalance:
date: 2022-09-01
crisis-signal:
exp-years: 3
exp-init: 2022-03-04
exp-init: 2008-01-01
inversion-years: 1
inversion-threshold: 0.3
crisis-1:
......@@ -145,6 +158,7 @@ rebalance:
rtn-days: 5
min-threshold: -0.05
coef: 0.95
cvar-min-volume: 30
curve-drift:
diff-threshold: 0.4
init-factor: 0.000000002
......
from .date_utils import *
from .base import *
from .database import read, write, transaction, where, to_columns
from .database import read, write, transaction, where, mapper_columns
from .env_config import config, get_config
from .logs import build_logger, get_logger
from .injectable import component, autowired, get_instance, init_injectable as _init_injectable
......
import functools
import json
import threading
from enum import Enum
......@@ -154,5 +155,12 @@ def where(*args, **kwargs) -> str:
return f"where {' and '.join(result)}" if result else ''
def to_columns(columns: dict, datas: dict) -> dict:
return dict([(x[0], datas[x[1]]) for x in columns.items()])
def mapper_columns(datas: dict, columns: dict) -> dict:
datas = {x[0]: datas[x[1]] for x in columns.items() if x[1] in datas and datas[x[1]] is not None}
return {
**datas,
**{x[0]: format_date(x[1]) for x in datas.items() if isinstance(x[1], datetime)},
**{x[0]: x[1].value for x in datas.items() if isinstance(x[1], Enum)},
**{x[0]: json.dumps(x[1]) for x in datas.items() if isinstance(x[1], dict)},
**{x[0]: (1 if x[1] else 0) for x in datas.items() if isinstance(x[1], bool)}
}
import calendar
from datetime import timedelta, datetime, date
import pandas as pd
def filter_weekend(day):
while calendar.weekday(day.year, day.month, day.day) in [5, 6]:
day = day - timedelta(1)
return day
return pd.to_datetime(day)
def next_workday(day):
......@@ -15,10 +17,21 @@ def next_workday(day):
return result
def prev_workday(day):
result = day
while result == day or result.weekday() in [5, 6]:
result = result - timedelta(1)
return result
def is_workday(day):
return day.weekday() in range(5)
def workday_range(start, end):
return [datetime.combine(x.date(), datetime.min.time()) for x in pd.date_range(start, end) if is_workday(x)]
def format_date(date, has_time=False):
return date.strftime('%Y-%m-%d %H:%M:%S' if has_time else '%Y-%m-%d')
......@@ -33,4 +46,4 @@ def get_quarter_start(today=datetime.today()):
if __name__ == '__main__':
print(get_quarter_start())
print(workday_range(datetime.today() - timedelta(1), datetime.today()))
......@@ -26,7 +26,7 @@ CREATE TABLE IF NOT EXISTS robo_hold_portfolios
rhp_rrs_id BIGINT UNSIGNED DEFAULT NULL COMMENT '调仓信号id',
rhp_rebalance TINYINT NOT NULL DEFAULT 0 COMMENT '是否调仓',
rhp_portfolios JSON NOT NULL COMMENT '投组信息',
rhp_nav DOUBLE NOT NULL COMMENT '资产值',
rhp_nav DOUBLE(12, 4) NOT NULL COMMENT '资产值',
rhp_create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
rhp_update_time DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (rhp_id),
......
from framework import read, where, write
from framework import read, where, write, format_date, mapper_columns
from api import PortfoliosRisk
__COLUMNS__ = {
......@@ -17,9 +17,29 @@ def get_one(day, risk: PortfoliosRisk):
return f'''select {','.join([f'{x[0]} as {x[1]}' for x in __COLUMNS__.items()])} from robo_hold_portfolios {where(rhp_date=day, rhp_risk=risk)}'''
@read(one=True)
def get_last_one(risk: PortfoliosRisk, max_date=None, rebalance: bool = None, signal_id=None):
sql = "rhp_date <= '{format_date(max_date)}'" if max_date else None
return f'''
select {','.join([f'{x[0]} as {x[1]}' for x in __COLUMNS__.items()])} from robo_hold_portfolios
{where(sql, rhp_risk=risk, rhp_rrs_id=signal_id, rhp_rebalance=rebalance)}
order by rhp_date desc limit 1
'''
def get_count(risk: PortfoliosRisk = None):
@read(one=True)
def exec():
return f'''select count(*) as `count` from robo_hold_portfolios {where(rhp_risk=risk)}'''
result = exec()
return result['count']
@write
def insert(datas):
datas = mapper_columns(datas=datas, columns=__COLUMNS__)
return f'''
insert into robo_hold_portfolios({','.join([x for x in datas.keys()])})
values ({','.join([f"'{x[1]}'" for x in datas.items()])})
'''
......@@ -2,7 +2,7 @@ from datetime import datetime
from enum import Enum
from api import PortfoliosRisk, PortfoliosType
from framework import read, write, where, format_date
from framework import read, write, where, format_date, mapper_columns
__COLUMNS__ = {
'rmp_id': 'id',
......@@ -17,12 +17,7 @@ __COLUMNS__ = {
@write
def insert(datas):
datas = {x[0]: datas[x[1]] for x in __COLUMNS__.items() if x[1] in datas and datas[x[1]] is not None}
datas = {
**datas,
**{x[0]: format_date(x[1]) for x in datas.items() if isinstance(x[1], datetime)},
**{x[0]: x[1].value for x in datas.items() if isinstance(x[1], Enum)}
}
datas = mapper_columns(datas=datas, columns=__COLUMNS__)
return f'''
insert into robo_mpt_portfolios({','.join([x for x in datas.keys()])})
values ({','.join([f"'{x[1]}'" for x in datas.items()])})
......
from framework import component, autowired
from api import PortfoliosHolder, PortfoliosRisk, RebalanceRuler
from portfolios.dao import robo_hold_portfolios as rhp
import json
import pandas as pd
from api import PortfoliosHolder, PortfoliosRisk, RebalanceRuler, Navs, SignalType
from framework import (
component, autowired, get_config, next_workday, filter_weekend,
prev_workday, transaction, workday_range, format_date, get_logger
)
from portfolios.dao import robo_hold_portfolios as rhp
from portfolios.utils import format_weight
logger = get_logger(__name__)
def get_start_date():
config = get_config('main')
return pd.to_datetime(filter_weekend(config['start-date']))
@component(bean_name='next-re')
class NextReblanceHolder(PortfoliosHolder):
@autowired
def __init__(self, rebalance: RebalanceRuler):
self._rebalance = rebalance
def __init__(self, rule: RebalanceRuler, navs: Navs = None):
self._rule = rule
self._navs = navs
self._config = get_config(__name__)
def get_last_rebalance_date(self, risk: PortfoliosRisk, max_date=None, signal_id=None):
assert risk, f"get last rebalance date, risk can not be none"
last = rhp.get_last_one(max_date=max_date, risk=risk, signal_id=signal_id, rebalance=True)
return last['date'] if last else None
def get_portfolios_weight(self, day, risk: PortfoliosRisk):
hold = rhp.get_one(day, risk)
if hold:
result = json.loads(hold['portfolios']['weight'])
result = json.loads(hold['portfolios'])['weight']
return {int(x[0]): x[1] for x in result.items()}
return None
......@@ -22,5 +43,78 @@ class NextReblanceHolder(PortfoliosHolder):
return rhp.get_count(risk=risk) > 0
def build_hold_portfolio(self, day, risk: PortfoliosRisk):
last_nav = rhp.get_last_one(max_date=day, risk=risk)
start = next_workday(last_nav['date'] if last_nav else get_start_date())
while start <= day:
logger.info(f"start to build hold portfolio for date[{format_date(start)}]")
signal = None
if last_nav:
last_re = rhp.get_last_one(max_date=start, risk=risk, rebalance=True)
if len(workday_range(last_re['date'], start)) > self.interval_days:
signal = self._rule.take_next_signal(prev_workday(start), risk)
else:
signal = self._rule.take_next_signal(prev_workday(start), risk)
if signal and not signal['effective']:
logger.info(f"start to rebalance hold portfolio for date[{format_date(start)}] "
f"with signal[{SignalType(signal['type']).name}]")
self.do_rebalance(start, risk, signal, last_nav)
elif last_nav and signal is None:
self.no_rebalance(start, risk, last_nav)
start = next_workday(start)
last_nav = rhp.get_last_one(max_date=day, risk=risk)
@transaction
def do_rebalance(self, day, risk: PortfoliosRisk, signal, last_nav):
weight = {int(x[0]): x[1] for x in json.loads(signal['portfolio']).items()}
if last_nav:
share = {int(x): y for x, y in json.loads(last_nav['portfolios'])['share'].items()}
navs = self.get_navs(fund_ids=tuple(set(weight) | set(share)), day=day)
nav = round(sum([navs[x] * y for x, y in share.items()]), 4)
else:
nav = self.init_nav
navs = self.get_navs(fund_ids=tuple(weight), day=day)
share = {x: nav * w / navs[x] for x, w in weight.items()}
rhp.insert({
'date': day,
'risk': risk,
'signal_id': signal['id'],
'rebalance': True,
'portfolios': {
'weight': weight,
'share': share,
},
'nav': nav,
})
self._rule.commit_signal(signal['id'])
def no_rebalance(self, day, risk: PortfoliosRisk, last_nav):
share = {int(x): y for x, y in json.loads(last_nav['portfolios'])['share'].items()}
navs = self.get_navs(fund_ids=tuple(share), day=day)
nav = round(sum([navs[x] * y for x, y in share.items()]), 4)
weight = {x: round(y * navs[x] / nav, 2) for x, y in share.items()}
weight = format_weight(weight)
rhp.insert({
'date': day,
'risk': risk,
'signal_id': last_nav['signal_id'],
'rebalance': False,
'portfolios': {
'weight': weight,
'share': share,
},
'nav': nav,
})
def get_navs(self, day, fund_ids):
navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=fund_ids, max_date=day))
navs = navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
navs.fillna(method='ffill', inplace=True)
return dict(navs.iloc[-1])
@property
def interval_days(self):
return self._config['min-interval-days']
pass
@property
def init_nav(self):
return self._config['init-nav']
......@@ -9,6 +9,7 @@ from pyomo.environ import *
from api import SolverFactory as Factory, PortfoliosRisk, PortfoliosType, AssetPool, Navs, Solver, Datum
from framework import component, autowired, get_config, get_logger
from portfolios.utils import format_weight
logger = get_logger(__name__)
......@@ -164,22 +165,10 @@ class DefaultSolver(Solver):
df_w = pd.DataFrame(data=weight_list, index=id_list, columns=['weight'])
df_w.replace(0, NAN, inplace=True)
df_w.dropna(axis=0, inplace=True)
df_w['weight'] = self.format_weight(df_w['weight'])
df_w['weight'] = pd.Series(format_weight(dict(df_w['weight'])))
dict_w = df_w.to_dict()['weight']
return dict_w
@staticmethod
def format_weight(weight_series):
weight_series = weight_series.fillna(0)
minidx = weight_series[weight_series > 0].idxmin()
maxidx = weight_series.idxmax()
weight_series = weight_series.apply(lambda x: round(x, 2))
if weight_series.sum() < 1:
weight_series[minidx] += 1 - weight_series.sum()
elif weight_series.sum() > 1:
weight_series[maxidx] += 1 - weight_series.sum()
return weight_series.apply(lambda x: round(float(x), 2))
def calc_port_rtn(self, model):
return sum([model.w[i]._value * self.rtn_annualized[i] for i in model.indices])
......@@ -199,6 +188,7 @@ class DefaultSolver(Solver):
count = self.get_config('asset-count')
min_count = count[0] if isinstance(count, list) else count
max_count = count[1] if isinstance(count, list) else count
min_count = min(min_count, len(self.rtn_annualized))
low_weight = self.get_config('mpt.low-weight')
high_weight = self.get_config('mpt.high-weight')
......
......@@ -27,6 +27,7 @@ class PortfoliosTest(unittest.TestCase):
@autowired(names={'hold': 'next-re'})
def test_build_hold(self, hold: PortfoliosHolder = None):
hold.build_hold_portfolio(parse_date('2009-01-01'), PortfoliosRisk.FT9)
pass
......
import pandas as pd
def format_weight(weight: dict) -> dict:
weight_series = pd.Series(weight)
weight_series = weight_series.fillna(0)
minidx = weight_series[weight_series > 0].idxmin()
maxidx = weight_series.idxmax()
weight_series = weight_series.apply(lambda x: round(x, 2))
if weight_series.sum() == 1:
return dict(weight_series)
elif weight_series.sum() < 1:
weight_series[minidx] += 1 - weight_series.sum()
elif weight_series.sum() > 1:
weight_series[maxidx] += 1 - weight_series.sum()
return dict(weight_series.apply(lambda x: round(float(x), 2)))
if __name__ == '__main__':
print(format_weight({19: 0.13, 27: 0.17, 31: 0.35, 56: 0.36}))
......@@ -12,6 +12,9 @@ class BaseRebalanceSignal(RebalanceSignal, ABC):
self._builder = builder
def get_signal(self, day, risk: PortfoliosRisk):
signal = rrs.get_one(type=self.signal_type, risk=risk, date=day)
if signal:
return signal
trigger = self.is_trigger(day, risk)
if trigger:
portfolio = self._builder.get_portfolios(day, risk, self.signal_type.p_type)
......
from framework import read, write, where, format_date
from framework import read, write, where, format_date, mapper_columns
from api import SignalType, PortfoliosRisk
import json
from datetime import datetime
......@@ -21,6 +21,14 @@ def get_by_id(id):
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal {where(rrs_id=id)}'''
@read(one=True)
def get_one(type: SignalType, risk: PortfoliosRisk, date):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
{where(rrs_date=date, rrs_type=type, rrs_risk=risk)}
'''
@read(one=True)
def get_first_after(type: SignalType, risk: PortfoliosRisk, min_date, effective=None):
return f'''
......@@ -33,7 +41,7 @@ def get_first_after(type: SignalType, risk: PortfoliosRisk, min_date, effective=
def get_last_one(max_date, risk: PortfoliosRisk, type: SignalType = None, effective=None):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
{where(f"rrs_date <= {format_date(max_date)}", rrs_type=type, rrs_risk=risk, rrs_effective=effective)} order by rrs_date desc limit 1
{where(f"rrs_date <= '{format_date(max_date)}'", rrs_type=type, rrs_risk=risk, rrs_effective=effective)} order by rrs_date desc limit 1
'''
......@@ -45,20 +53,9 @@ def get_count(risk: PortfoliosRisk = None, day=None, effective=None):
return result['count']
def format_datas(datas):
datas = {x[0]: datas[x[1]] for x in __COLUMNS__.items() if x[1] in datas and datas[x[1]] is not None}
return {
**datas,
**{x[0]: format_date(x[1]) for x in datas.items() if isinstance(x[1], datetime)},
**{x[0]: x[1].value for x in datas.items() if isinstance(x[1], Enum)},
**{x[0]: json.dumps(x[1]) for x in datas.items() if isinstance(x[1], dict)},
**{x[0]: (1 if x[1] else 0) for x in datas.items() if isinstance(x[1], bool)}
}
@write
def insert(datas):
datas = format_datas(datas)
datas = mapper_columns(datas=datas, columns=__COLUMNS__)
return f'''
insert into robo_rebalance_signal({','.join([x for x in datas.keys()])})
values ({','.join([f"'{x[1]}'" for x in datas.items()])})
......@@ -66,8 +63,8 @@ def insert(datas):
@write
def update(id, dates):
datas = format_datas(dates)
def update(id, datas):
datas = mapper_columns(datas=datas, columns=__COLUMNS__)
return f'''
update robo_rebalance_signal
set {','.join([f"{x[0]} = '{x[1]}'" for x in datas.items()])}
......
from framework import read, write, where, format_date
from framework import read, write, where, format_date, mapper_columns
from api import PortfoliosRisk
__COLUMNS__ = {
......@@ -23,17 +23,9 @@ def get_last_one(max_date, risk: PortfoliosRisk):
'''
@write
def insert(datas):
datas = {x[0]: datas[x[1]] for x in __COLUMNS__.items() if x[1] in datas and datas[x[1]] is not None}
datas = {
**datas,
**{x[0]: format_date(x[1]) for x in datas.items() if isinstance(x[1], datetime)},
**{x[0]: x[1].value for x in datas.items() if isinstance(x[1], Enum)}
}
datas = mapper_columns(datas=datas, columns=__COLUMNS__)
return f'''
insert into robo_weight_drift({','.join([x for x in datas.keys()])})
values ({','.join([f"'{x[1]}'" for x in datas.items()])})
......
from api import DriftSolver, PortfoliosRisk
from framework import component, autowired, get_config
from rebalance.dao import robo_rebalance_signal as rrs
from api import DriftSolver, PortfoliosRisk, PortfoliosBuilder, Datum
from framework import component, autowired, get_config, workday_range, filter_weekend, next_workday
from rebalance.dao import robo_rebalance_signal as rrs, robo_weight_drift as rwd
def get_start_date():
......@@ -48,10 +48,10 @@ class PortfolioHighWeight(DriftSolver):
last_one = rwd.get_last_one(max_date=day, risk=risk)
start = (next_workday(last_one['date'])) if last_one else get_start_date()
last_drift = last_one['drift'] if last_one else 0
for date in [x for x in pd.date_range(start, day, freq='D') if is_workday(x)]:
for date in workday_range(start, day):
portfolio = self._builder.get_portfolios(date, risk)
weight = round(sum([x[1] for x in portfolio.items() if x[0] in datum_ids]), 2)
last_drift = (weight * self.drift_coef + (1 - self.drift_coef) * last_drift) if last_drift else weight
last_drift = round((weight * self.drift_coef + (1 - self.drift_coef) * last_drift) if last_drift else weight, 2)
rwd.insert({
'date': date,
'risk': risk,
......
from framework import component, autowired, get_config
from api import RebalanceRuler, PortfoliosRisk, RebalanceSignal, SignalType, PortfoliosType
from framework import component, autowired, get_config, workday_range, next_workday
from api import RebalanceRuler, PortfoliosRisk, RebalanceSignal, SignalType, PortfoliosType, PortfoliosHolder
from typing import List
from rebalance.dao import robo_rebalance_signal as rrs
......@@ -22,8 +22,9 @@ class LevelRebalanceRuler(RebalanceRuler):
'''
@autowired
def __init__(self, signals: List[RebalanceSignal] = None):
def __init__(self, signals: List[RebalanceSignal] = None, hold: PortfoliosHolder = None):
self._signals = signals
self._hold = hold
self._config = get_config(__name__)
@property
......@@ -33,17 +34,33 @@ class LevelRebalanceRuler(RebalanceRuler):
def take_next_signal(self, day, risk: PortfoliosRisk):
last_re = rrs.get_last_one(max_date=day, risk=risk, effective=True)
if last_re and last_re['date'] == day:
return last_re
if last_re:
disable_period = self.disable_period[PortfoliosType(last_re['p_type'])]
signals = [x.get_signal(day, risk) for x in self._signals]
signals = sorted([x for x in signals if x is not None], key=lambda x: SignalType(x['type']).level)
if signals:
use_signal = signals[0]
rrs.update(use_signal['id'], {'effective': True})
return use_signal
if not last_re:
builder = [x for x in self._signals if x.signal_type is SignalType.INIT][0]
return builder.get_signal(day, risk)
long_signals = [x for x in self._signals if x.signal_type.p_type is not PortfoliosType.NORMAL and x.signal_type.level < SignalType(last_re['type']).level]
for long_signal in sorted(long_signals, key=lambda x: x.signal_type.level):
workdays = workday_range(next_workday(last_re['date']), day)
if len(workdays) <= self._hold.interval_days:
for date in workdays:
signal = long_signal.get_signal(date, risk)
if signal:
return signal
else:
signal = long_signal.get_signal(day, risk)
if signal:
return signal
if SignalType(last_re['type']).p_type in self.disable_period:
re_date = self._hold.get_last_rebalance_date(risk=risk, max_date=day)
if re_date:
workdays = workday_range(re_date, day)
if len(workdays) < self.disable_period[SignalType(last_re['type']).p_type]:
return None
for temp_signal in sorted([x for x in self._signals if x.signal_type.p_type is PortfoliosType.NORMAL], key=lambda x: x.signal_type.level):
signal = temp_signal.get_signal(day, risk)
if signal:
return signal
return None
def cancel_signal(self, sign_id):
pass
def commit_signal(self, sign_id):
rrs.update(sign_id, {'effective': True})
......@@ -28,13 +28,13 @@ class CurveDrift(BaseRebalanceSignal):
def is_trigger(self, day, risk: PortfoliosRisk) -> bool:
last_re = rrs.get_last_one(max_date=day, risk=risk, effective=True)
if last_re is None or SignalType(last_re['type']) in self.exclude_last_type:
return None
return False
hr_datums = self._datum.get_high_risk_datums(risk)
datum_ids = [x['id'] for x in hr_datums]
normal_portfolio = self._builder.get_portfolios(day, risk)
normal_weight = round(sum([x[1] for x in normal_portfolio.items() if x[0] in datum_ids]), 2)
hold_portfolio = self._hold.get_portfolios_weight(day, risk)
hold_weight = round(sum([x[1] for x in normal_portfolio.items() if x[0] in datum_ids]), 2)
hold_weight = round(sum([x[1] for x in hold_portfolio.items() if x[0] in datum_ids]), 2)
return normal_weight - hold_weight >= self._solver.get_drift(day, risk)
@property
......
......@@ -21,7 +21,8 @@ class HighBuySignal(BaseRebalanceSignal):
SignalType.CRISIS_ONE,
SignalType.CRISIS_TWO,
SignalType.MARKET_RIGHT,
SignalType.LOW_BUY
SignalType.LOW_BUY,
SignalType.INIT
]
@property
......@@ -40,7 +41,7 @@ class HighBuySignal(BaseRebalanceSignal):
return False
drift = self._solver.get_drift(day, risk)
threshold = self.get_threshold(risk)
return drift > threshold[1]
return drift >= threshold[1]
@component(bean_name='low-buy')
......@@ -51,7 +52,8 @@ class LowBuySignal(HighBuySignal):
return [
SignalType.CRISIS_ONE,
SignalType.CRISIS_TWO,
SignalType.MARKET_RIGHT
SignalType.MARKET_RIGHT,
SignalType.INIT
]
@property
......@@ -64,4 +66,4 @@ class LowBuySignal(HighBuySignal):
return False
drift = self._solver.get_drift(day, risk)
threshold = self.get_threshold(risk)
return threshold[0] < drift < threshold[1]
return threshold[0] <= drift < threshold[1]
from api import PortfoliosRisk, SignalType
from framework import component
from framework import component, filter_weekend, get_config
from rebalance.base_signal import BaseRebalanceSignal
from rebalance.dao import robo_rebalance_signal as rrs
def get_start_date():
config = get_config('main')
return filter_weekend(config['start-date'])
@component(bean_name='init')
class InitSignalBuilder(BaseRebalanceSignal):
......@@ -11,5 +16,11 @@ class InitSignalBuilder(BaseRebalanceSignal):
def signal_type(self) -> SignalType:
return SignalType.INIT
def get_signal(self, day, risk: PortfoliosRisk):
signal = rrs.get_last_one(max_date=day, risk=risk, type=SignalType.INIT)
if signal:
return None if signal['effective'] else signal
return super().get_signal(get_start_date(), risk)
def is_trigger(self, day, risk: PortfoliosRisk) -> bool:
return rrs.get_count(risk=risk) == 0
return True
......@@ -32,9 +32,13 @@ class MarketRight(BaseRebalanceSignal):
def signal_type(self) -> SignalType:
return SignalType.MARKET_RIGHT
@property
def cvar_min_volume(self):
return self._config['cvar-min-volume']
def is_trigger(self, day, risk: PortfoliosRisk) -> bool:
last_re = rrs.get_last_one(risk=risk, max_date=day, effective=True)
if last_re is not None and SignalType(last_re['type']) in [SignalType.CRISIS_ONE, SignalType.CRISIS_TWO, SignalType.MARKET_RIGHT]:
if last_re is not None and SignalType(last_re['type']) in [SignalType.CRISIS_ONE, SignalType.CRISIS_TWO, SignalType.MARKET_RIGHT, SignalType.INIT]:
return False
spx = self.load_spx_close_rtns(day)
if spx[-1]['rtn'] > self.min_threshold:
......@@ -48,11 +52,12 @@ class MarketRight(BaseRebalanceSignal):
start_date = self.find_cvar_start_date(day, risk, spx=spx)
if start_date:
spx = pd.DataFrame(spx)
spx = spx[(pd_spx.date >= start_date) & (spx.date <= day)]
alpha = 1 - self.coef
mean = spx.rtn.mean()
std = spx.rtn.std()
return mean - std * norm.pdf(norm.ppf(alpha)) / alpha
spx = spx[(spx.date >= start_date) & (spx.date <= day)]
if len(spx) >= self.cvar_min_volume:
alpha = round(1 - self.coef, 2)
mean = spx.rtn.mean()
std = spx.rtn.std()
return mean - std * norm.pdf(norm.ppf(alpha)) / alpha
return None
def find_cvar_start_date(self, day, risk: PortfoliosRisk, spx = None):
......@@ -63,7 +68,7 @@ class MarketRight(BaseRebalanceSignal):
last_buy = rrs.get_last_one(type=(SignalType.LOW_BUY, SignalType.HIGH_BUY), max_date=day, risk=risk, effective=True)
if not last_buy or not last_right or last_buy['date'] <= last_right['date']:
return None
spx = spx[(spx['date'] >= last_right['date']) & (spx['date'] <= last_buy)]
spx = spx[(spx['date'] >= last_right['date']) & (spx['date'] <= last_buy['date'])]
if not spx.empty and len(spx) > 2:
return spx.loc[spx.close.idxmin()].date
return None
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment