import json from datetime import datetime as dt from typing import List, Dict from py_jftech import component, autowired, get_config, workday_range, next_workday, to_tuple from api import RebalanceRuler, PortfoliosRisk, RebalanceSignal, SignalType, PortfoliosType, PortfoliosHolder, RoboReportor, Datum, DatumType from rebalance.dao import robo_rebalance_signal as rrs @component class LevelRebalanceRuler(RebalanceRuler): ''' 定义: 1.定义所有调仓类型为非NORMAL类型的信号为清仓信号 2.定义所有调仓类型为NORMAL类型的信号为加仓信号 3.定义持久信号为上次选用调仓的信号时间到当前时间内,该信号都有效 4.定义临时信号为仅当天有效 规则: 1.所有清仓信号为持久信号,所有加仓信号为临时信号 2.对于持久信号规则如下: 2.1 上一次选用信号到当前时间内,是否有持久信号 2.2 如果有,则看级别是否高于上一次选用信号 2.3 如果高于,则输出该信号 3.如果没有持久信号,则从临时信号中根据级别排序找出第一个,作为输出信号 ''' @autowired def __init__(self, signals: List[RebalanceSignal] = None, hold: PortfoliosHolder = None): self._signals = signals self._hold = hold self._config = get_config(__name__) @property def disable_period(self): result = self._config['disable-period'] if isinstance(result, dict): return {PortfoliosType(x[0]): x[1] for x in result.items()} else: return {t: result for t in PortfoliosType} def without_disable_period(self, day, risk: PortfoliosRisk) -> bool: last_re = rrs.get_last_one(max_date=day, risk=risk, effective=True) if SignalType(last_re['type']).p_type in self.disable_period: return len(workday_range(last_re['date'], day)) > self.disable_period[SignalType(last_re['type']).p_type] return False def take_next_signal(self, day, risk: PortfoliosRisk, only_today=True): last_re = rrs.get_last_one(max_date=day, risk=risk, effective=True) if not last_re: builder = [x for x in self._signals if x.signal_type is SignalType.INIT][0] return builder.get_signal(day, risk) risk_signals = [x for x in self._signals if x.signal_type.p_type is not PortfoliosType.NORMAL] buy_signals = [x for x in self._signals if x.signal_type.p_type is PortfoliosType.NORMAL] last_signal = rrs.get_last_one(max_date=day, risk=risk) start = next_workday(last_signal['date']) while start <= day: # 检查风控信号 signals = {x.signal_type: x.get_signal(start, risk) for x in risk_signals if x.signal_type.level <= SignalType(last_re['type']).level} signals = {x[0]: x[1] for x in signals.items() if x[1] is not None} # 上次实际调仓类型为危机信号,本次危机信号不调仓 if signals and SignalType(last_re['type']) in [SignalType.CRISIS_ONE, SignalType.CRISIS_TWO]: signals = {x[0]: x[1] for x in signals.items() if x[0] not in [SignalType.CRISIS_ONE, SignalType.CRISIS_TWO]} # 检查买入信号,只有当天需要检查 if not signals and start == day and self.without_disable_period(day, risk): signals = {x.signal_type: x.get_signal(start, risk) for x in buy_signals} signals = {x[0]: x[1] for x in signals.items() if x[1] is not None} if signals: if SignalType(last_signal['type']) is SignalType.NONE: rrs.delete_by_id(last_signal['id']) return signals[sorted(signals.keys(), key=lambda x: x.level)[0]] start = next_workday(start) if SignalType(last_signal['type']) is SignalType.NONE: rrs.update(last_signal['id'], {'date': day}) else: rrs.insert({ 'date': day, 'type': SignalType.NONE, 'risk': risk }) return None def get_signal_type(self, sign_id) -> SignalType | Dict[int, SignalType]: sign_id = to_tuple(sign_id) if len(sign_id) > 1: return {x['id']: SignalType(x['type']) for x in rrs.get_by_ids(sign_id)} else: signal = rrs.get_by_id(sign_id[0]) return SignalType(signal['type']) if signal else None def commit_signal(self, sign_id): rrs.update(sign_id, {'effective': True}) def clear_signal(self, day=None, risk: PortfoliosRisk = None): rrs.delete(min_date=day, risk=risk) @component(bean_name='signal-report') class SignalExportor(RoboReportor): @autowired def __init__(self, hold: PortfoliosHolder = None, datum: Datum = None): self._hold = hold self._datum = datum @property def report_name(self) -> str: return '调仓信号' def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]: result = [] datums = {str(x['id']): x for x in self._datum.get_datums(type=DatumType.FUND)} for signal in rrs.get_list(max_date=max_date, min_date=min_date, effective=True): rebalance_date = self._hold.get_rebalance_date_by_signal(signal['id']) for fund_id, weight in json.loads(signal['portfolio']).items(): result.append({ 'risk': PortfoliosRisk(signal['risk']).name, 'type': SignalType(signal['type']).name, 'signal_date': signal['date'], 'rebalance_date': rebalance_date, 'portfolio_type': PortfoliosType(signal['portfolio_type']).name, 'ft_ticker': datums[fund_id]['ftTicker'], 'blooberg_ticker': datums[fund_id]['bloombergTicker'], 'fund_name': datums[fund_id]['chineseName'], 'weight': weight }) return result