import json from datetime import datetime as dt from typing import List, Dict from py_jftech import component, autowired, get_config, workday_range, next_workday, to_tuple from api import RebalanceRuler, PortfoliosRisk, RebalanceSignal, SignalType, PortfoliosType, PortfoliosHolder, RoboReportor, Datum, DatumType from rebalance.dao import robo_rebalance_signal as rrs @component class LevelRebalanceRuler(RebalanceRuler): ''' 定义: 1.定义所有调仓类型为非NORMAL类型的信号为清仓信号 2.定义所有调仓类型为NORMAL类型的信号为加仓信号 3.定义持久信号为上次选用调仓的信号时间到当前时间内,该信号都有效 4.定义临时信号为仅当天有效 规则: 1.所有清仓信号为持久信号,所有加仓信号为临时信号 2.对于持久信号规则如下: 2.1 上一次选用信号到当前时间内,是否有持久信号 2.2 如果有,则看级别是否高于上一次选用信号 2.3 如果高于,则输出该信号 3.如果没有持久信号,则从临时信号中根据级别排序找出第一个,作为输出信号 ''' @autowired def __init__(self, signals: List[RebalanceSignal] = None, hold: PortfoliosHolder = None): self._signals = signals self._hold = hold self._config = get_config(__name__) @property def disable_period(self): result = self._config['disable-period'] if isinstance(result, dict): return {PortfoliosType(x[0]): x[1] for x in result.items()} else: return {t: result for t in PortfoliosType} def take_next_signal(self, day, risk: PortfoliosRisk): last_re = rrs.get_last_one(max_date=day, risk=risk, effective=True) if not last_re: builder = [x for x in self._signals if x.signal_type is SignalType.INIT][0] return builder.get_signal(day, risk) long_signals = [x for x in self._signals if x.signal_type.p_type is not PortfoliosType.NORMAL and x.signal_type.level < SignalType(last_re['type']).level] for long_signal in sorted(long_signals, key=lambda x: x.signal_type.level): workdays = workday_range(next_workday(last_re['date']), day) if len(workdays) <= self._hold.interval_days: for date in workdays: signal = long_signal.get_signal(date, risk) if signal: return signal else: signal = long_signal.get_signal(day, risk) if signal: return signal if SignalType(last_re['type']).p_type in self.disable_period: re_date = self._hold.get_last_rebalance_date(risk=risk, max_date=day) if re_date: workdays = workday_range(re_date, day) if len(workdays) < self.disable_period[SignalType(last_re['type']).p_type]: return None for temp_signal in sorted([x for x in self._signals if x.signal_type.p_type is PortfoliosType.NORMAL], key=lambda x: x.signal_type.level): signal = temp_signal.get_signal(day, risk) if signal: return signal return None def get_signal_type(self, sign_id) -> SignalType | Dict[int, SignalType]: sign_id = to_tuple(sign_id) if len(sign_id) > 1: return {x['id']: SignalType(x['type']) for x in rrs.get_by_ids(sign_id)} else: signal = rrs.get_by_id(sign_id[0]) return SignalType(signal['type']) if signal else None def commit_signal(self, sign_id): rrs.update(sign_id, {'effective': True}) def clear_signal(self, day=None, risk: PortfoliosRisk = None): rrs.delete(min_date=day, risk=risk) @component(bean_name='signal-report') class SignalExportor(RoboReportor): @autowired def __init__(self, hold: PortfoliosHolder = None, datum: Datum = None): self._hold = hold self._datum = datum @property def report_name(self) -> str: return '调仓信号' def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]: result = [] datums = {str(x['id']): x for x in self._datum.get_datums(type=DatumType.FUND)} for signal in rrs.get_list(max_date=max_date, min_date=min_date, effective=True): rebalance_date = self._hold.get_rebalance_date_by_signal(signal['id']) for fund_id, weight in json.loads(signal['portfolio']).items(): result.append({ 'risk': PortfoliosRisk(signal['risk']).name, 'type': SignalType(signal['type']).name, 'signal_date': signal['date'], 'rebalance_date': rebalance_date, 'portfolio_type': PortfoliosType(signal['portfolio_type']).name, 'ft_ticker': datums[fund_id]['ftTicker'], 'blooberg_ticker': datums[fund_id]['bloombergTicker'], 'fund_name': datums[fund_id]['chineseName'], 'weight': weight }) return result