from abc import ABC, abstractmethod from datetime import datetime as dt from enum import Enum, unique from typing import List, Dict from py_jftech import get_config @unique class DatumType(Enum): FUND = 'FUND' INDEX = 'INDEX' ECO = 'ECO' @unique class AssetRiskDateType(Enum): START_DATE = 1 STOP_DATE = 2 @unique class AssetPoolType(Enum): OPTIMIZE = 1 RISK = 2 @unique class PortfoliosRisk(Enum): FT3 = 3 FT6 = 6 FT9 = 9 @unique class PortfoliosType(Enum): CRISIS_1 = 'crisis_1' CRISIS_2 = 'crisis_2' RIGHT_SIDE = 'right_side' NORMAL = 'normal' CUSTOM = 'custom' @unique class SolveType(Enum): INFEASIBLE = 0 MPT = 1 POEM = 2 @unique class SignalType(Enum): INIT = 0 CRISIS_EXP = 1 CRISIS_ONE = 2 CRISIS_TWO = 3 MARKET_RIGHT = 4 HIGH_BUY = 5 LOW_BUY = 6 DRIFT_BUY = 7 # 信号处理优先级 SignalType.CRISIS_ONE.level = 1 SignalType.CRISIS_TWO.level = 2 SignalType.MARKET_RIGHT.level = 3 SignalType.HIGH_BUY.level = 4 SignalType.LOW_BUY.level = 5 SignalType.DRIFT_BUY.level = 5 SignalType.INIT.level = 6 # 对应需要再平衡的投组类型 SignalType.CRISIS_ONE.p_type = PortfoliosType.CRISIS_1 SignalType.CRISIS_TWO.p_type = PortfoliosType.CRISIS_2 SignalType.MARKET_RIGHT.p_type = PortfoliosType.RIGHT_SIDE SignalType.HIGH_BUY.p_type = PortfoliosType.NORMAL SignalType.LOW_BUY.p_type = PortfoliosType.NORMAL SignalType.DRIFT_BUY.p_type = PortfoliosType.NORMAL SignalType.INIT.p_type = PortfoliosType.RIGHT_SIDE class DataSync(ABC): ''' 数据同步服务,需要同步数据的服务,可以实现该接口 ''' @abstractmethod def do_sync(self, max_date=dt.today()): ''' 开始同步数据,到指定日期,如果没给则同步到当前日期 ''' pass class Datum(ABC): ''' 基础资料服务,基金资料数据,各种指数,指标资料数据 ''' @abstractmethod def get_datums(self, type: DatumType = None, crncy=None, risk=None, datum_ids=None, ticker=None): ''' 获取资料信息,当id和ticker都有时,取二者并集 :param type: 资料类型 :param crncy: 货币类型,仅对基金资料有效 :param risk: 风险等级,仅对基金资料有效 :param datum_ids: 资料ID列表 :param ticker: 资料ticker列表 :return: 资料信息数据 ''' pass @abstractmethod def get_high_risk_datums(self, risk: PortfoliosRisk): ''' 根据指定的投组风险等级,获取高风险资产资料数据 :param risk: 投组风险等级 :return: 高风险资料信息 ''' pass class Navs(ABC): ''' 基础数据相关服务,基金净值,各种指标 高开低收 ''' @abstractmethod def get_fund_navs(self, fund_ids=None, min_date=None, max_date=None): ''' 获取基金净值信息 :param fund_ids: 基金id,可以多个id,使用tuple包裹 :param min_date: 起始时间 :param max_date: 截止时间 :return: 基金净值信息 ''' pass @abstractmethod def get_nav_start_date(self, fund_ids=None): ''' 获取指定id资产的净值开始时间 :param fund_ids: 指定id资产,如果为None,则返回全部资产的开始时间 :return: 资产的开始时间字典 ''' pass @abstractmethod def get_index_close(self, datum_ids=None, min_date=None, max_date=None, ticker=None): ''' 获取指标收盘价 :param datum_ids: 指标资料id :param min_date: 起始时间 :param max_date: 截止时间 :param ticker: 指标资料ticker :return: 指标收盘价信息 ''' pass @abstractmethod def get_last_index_close(self, max_date, datum_id=None, ticker=None, count=1): ''' 获取指定资料或ticker,指定日期之前最后count个收盘价,当指定datum_id后,ticker参数无效 :param max_date: 指定日期 :param datum_id: 指标id,只能指定一个 :param ticker: 指标ticker,只能指定一个,当指标id有值后,该参数无效 :param count: 指定要返回数据的个数 :return: 如果存在,则返回指定日期最后count个收盘价,否则返回None ''' pass @abstractmethod def get_eco_values(self, datum_ids=None, min_date=None, max_date=None, ticker=None, by_release_date=False): ''' 获取经济指标数据,若同时给出ID,和ticker,则取二者并集 :param datum_ids: 经济指标id :param min_date: 起始日期 :param max_date: 截止日期 :param ticker: 经济指标ticker :param by_release_date: 如果为True,则使用公告日期查询,否则使用抓取日期 :return: 经济指标的值,包括查询日期,指标和公告日期 ''' pass @abstractmethod def get_last_eco_values(self, max_date, datum_id=None, ticker=None, count=1, by_release_date=False): ''' 获取指定资料或ticker,指定日期之前最后count个指标数据,当指定datum_id后,ticker参数无效 :param max_date: 指定日期 :param datum_id: 指标id,只能指定一个 :param ticker: 指标ticker,只能指定一个,当指标id有值后,该参数无效 :param count: 指定要返回数据的个数 :param by_release_date: 如果为True,则使用公告日期查询,否则使用抓取日期 :return: 如果存在,则返回指定日期最后count个指标项(查询日期,指标,公告日期),否则返回None ''' pass class AssetOptimize(ABC): ''' 优选相关服务ABC ''' @abstractmethod def find_optimize(self, ids, day): ''' 从多id中,选出指定日期最优的id :param ids: 待选id列表 :param day: 指定日期 :return: 最优的id ''' pass @abstractmethod def get_optimize_pool(self, day): ''' 根据优选规则获取指定日期的优选池 :param day: 指定日期 :return: 优选id列表 ''' pass class AssetRisk(ABC): ''' ewma相关服务 ''' @abstractmethod def get_risk_pool(self, day): ''' 获取指定日期的风控池 :param day: 指定的日期 :return: 风控id列表 ''' pass @abstractmethod def is_risk(self, id, day) -> bool: ''' 判断指定的id,在指定的日期,是处于风控状态 :param id: 指定的资产id :param day: 指定的日期 :return: 如果处于风控状态则返回True,否则返回False ''' pass @abstractmethod def build_risk_date(self, asset_id, day=dt.today()): ''' 构建指定资产的所有风险时间点 :param asset_id: 指定的资产id :param day: 构建的截止日期 ''' pass @abstractmethod def clear(self, day=None): ''' 清除指定日期之后的资产风控ewma数据,如果没有给日期,则全部清空 :param day: 指定清除的开始日期,可选 ''' pass class AssetPool(ABC): ''' 资产池相关服务 ''' @abstractmethod def get_pool(self, day): ''' 返回指定日期的可用资产池 :param day: 指定日期 :return: 资产id列表 ''' pass @abstractmethod def clear(self, day=None): ''' 清除指定日期之后的资产池数据,如果没有给日期,则全部清空 :param day: 指定清除的开始日期,可选 ''' pass class PortfoliosBuilder(ABC): ''' 投组组合构建器 ''' @abstractmethod def get_portfolios(self, day, risk: PortfoliosRisk, type: PortfoliosType = PortfoliosType.NORMAL): ''' 获取指定日期,指定风险等级,指定类型的投资组合 :param type: 投组的类型 :param day: 指定日期 :param risk: 风险等级 :return: 资产组合字典{id: weight} ''' pass @abstractmethod def build_portfolio(self, day, type: PortfoliosType): ''' 构建指定日期,指定类型的投资组合 :param day: 指定日期 :param type: 指定类型 :return 投资组合数据{risk: {...}},计算明细数据 {...} ''' pass @abstractmethod def clear(self, day=None, risk: PortfoliosRisk = None): ''' 清除指定风险等级,指定日期之后的最优投组 :param day: 指定清除的开始日期,可选,如果没给,则清除全部日期 :param risk: 指定风险等级,如果没给,则清除全部风险等级 ''' pass class Solver(ABC): ''' 解算器 ''' @abstractmethod def solve_max_rtn(self): ''' :return: max_rtn, max_var, minCVaR_whenMaxR ''' pass @abstractmethod def solve_min_rtn(self): ''' :return: min_rtn, min_var, maxCVaR_whenMinR ''' pass @abstractmethod def solve_mpt(self, min_rtn, max_rtn): ''' 常规mpt计算 :param min_rtn: 最小回报率 :param max_rtn: 最大回报率 :return: 投组,cvar ''' pass @abstractmethod def solve_poem(self, min_rtn, max_rtn, base_cvar, max_cvar): ''' poem方式的mpt计算 :param min_rtn: 最小回报率 :param max_rtn: 最大回报率 :param base_cvar: 基础cvar :param max_cvar: 最大cvar :return: 投组,cvar ''' pass @abstractmethod def reset_navs(self, day): ''' 根据指定的日期,重置当前解算器,其他计算,全部依赖这里重置后的基金净值数据 :param day: 指定的日期 :return: 根据指定日期获取的,基金净值数据 ''' pass @property @abstractmethod def navs(self): ''' :return: 当前解算器使用的基金净值 ''' pass class SolverFactory(ABC): ''' 解算器工厂 ''' @abstractmethod def create_solver(self, risk: PortfoliosRisk, type: PortfoliosType = PortfoliosType.NORMAL) -> Solver: ''' 根据指定的投组风险等级,以及投组类型,创建解算器 :param risk: 投组风险等级 :param type: 投组类型 :return: 解算器 ''' pass class PortfoliosHolder(ABC): ''' 投资组合持仓器 ''' @abstractmethod def get_portfolio_type(self, day, risk: PortfoliosRisk) -> PortfoliosType: ''' 获取指定日期指定风险等级持仓投组的类型 :param day: 指定日期 :param risk: 指定风险等级 :return: 持仓投组类型 ''' pass @abstractmethod def get_portfolios_weight(self, day, risk: PortfoliosRisk): ''' 获取指定日期指定风险等级的持仓投组比重 :param day: 指定日期 :param risk: 指定风险等级 :return: 持仓投组占比 ''' pass @abstractmethod def has_hold(self, risk: PortfoliosRisk) -> bool: ''' 是否存在指定分线等级的投组持仓 :param risk: 指定风险等级 :return: 如果已经存在持仓,则返回True, 否则返回False ''' pass @abstractmethod def build_hold_portfolio(self, day, risk: PortfoliosRisk): ''' 构建指定日期,指定风险等级的持仓投组,以day为截止日期,会持续补满 :param day: 指定日期 :param risk: 指定风险等级 :return: ''' pass @abstractmethod def get_last_rebalance_date(self, risk: PortfoliosRisk, max_date=None): ''' 获取最后一次实际调仓的时间 :param risk: 持仓风险等级类型,必须 :param max_date: 指定日期之前的最后一次,可选 :return: 最后一次实际调仓的日期 ''' pass @abstractmethod def get_rebalance_date_by_signal(self, signal_id): ''' 获取指定调仓信号触发的实际调仓日期 :param signal_id: 指定的调仓信号 :return: 实际调仓日期 ''' pass @property @abstractmethod def interval_days(self): ''' 返回实际交易的最小间隔交易日数 :return: 实际交易的最小间隔交易日数 ''' pass @abstractmethod def clear(self, day=None, risk: PortfoliosRisk = None): ''' 清除指定风险等级,指定日期之后的持仓投组 :param day: 指定清除的开始日期,可选,如果没给,则清除全部日期 :param risk: 指定风险等级,如果没给,则清除全部风险等级 ''' pass class DriftSolver(ABC): ''' 漂移解算器 ''' @abstractmethod def get_drift(self, day, risk: PortfoliosRisk): ''' 获取指定日期,指定风险等级的漂移计算结果 :param day: 指定日期 :param risk: 指定风险等级 :return: 漂移计算结果 ''' pass class RebalanceSignal(ABC): ''' 控制信号,发起是否调仓服务 ''' @abstractmethod def get_signal(self, day, risk: PortfoliosRisk): pass @property @abstractmethod def signal_type(self) -> SignalType: ''' 返回信号类型 :return: 信号类型 ''' pass class RebalanceRuler(ABC): ''' 再平衡信号分配器,根据既定的规则,再众多信号中,选出进行再平衡的信号 ''' @abstractmethod def take_next_signal(self, day, risk: PortfoliosRisk): ''' 取出指定日期,指定风险等级的再平衡信号数据,注意取出消费后,无法退回,非幂等函数 :param day: 指定日期 :param risk: 指定风险等级 :return: 如果存在,则返回取出的再平衡信号信息,否则返回None ''' pass @abstractmethod def commit_signal(self, sign_id): ''' 提交信号ID为已消费状态 :param sign_id: 信号ID ''' pass @abstractmethod def get_signal_type(self, sign_id) -> SignalType | Dict[int, SignalType]: ''' 获取指定id的信号类型 :param sign_id: 信号id, 可以多个,使用元祖包裹 :return: 信号类型 ''' pass @abstractmethod def clear_signal(self, day=None, risk: PortfoliosRisk = None): ''' 清除指定风险等级,指定日期之后的调仓信号 :param day: 指定清除的开始日期,可选,如果没给,则清除全部日期 :param risk: 指定风险等级,如果没给,则清除全部风险等级 ''' pass class RoboExecutor(ABC): ''' ROBO执行器,整合以上逻辑,进行实盘或回测 ''' @abstractmethod def start_exec(self): ''' 开始执行测试逻辑 ''' pass @property @abstractmethod def start_date(self): ''' :return: 执行器开始日期 ''' pass @staticmethod def use_name(): return get_config('robo-executor')['use'] class RoboReportor(ABC): ''' 投组报告器 ''' @property @abstractmethod def report_name(self) -> str: ''' 返回报告名称 :return: 报告名称 ''' pass @abstractmethod def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]: ''' 获取指定日期的报告 :param max_date: 指定截止日期 :param min_date: 指定开始日期 :return: 报告数据 ''' pass class RoboExportor(ABC): ''' 投组导出器 ''' @abstractmethod def export(self, max_date=dt.today(), min_date=None): ''' 导出指定日期的数据到excel :param max_date: 指定截止日期 :param min_date: 指定开始日期 :return: 导出文件路径 ''' pass