import logging import sys from abc import ABC, abstractmethod from datetime import datetime as dt from enum import Enum, unique from typing import List from py_jftech import get_config, parse_date logger = logging.getLogger(__name__) @unique class BacktestStep(Enum): ASSET_POOL = 1 NORMAL_PORTFOLIO = 2 HOLD_PORTFOLIO = 3 def within(self, step: Enum): return self.value <= step.value def without(self, step: Enum): return self.value >= step.value @unique class DatumType(Enum): FUND = 'FUND' INDEX = 'INDEX' ECO = 'ECO' @unique class AssetPoolType(Enum): OPTIMIZE = 1 @unique class PortfoliosRisk(Enum): FT3 = 3 @unique class PortfoliosType(Enum): NORMAL = 'normal' CUSTOM = 'custom' @unique class SolveType(Enum): INFEASIBLE = 0 MPT = 1 POEM = 2 RISK_PARITY = 3 @unique class LoggerType(Enum): SIGNAL = 'signal' @unique class SignalType(Enum): NORMAL = 1 SignalType.NORMAL.p_type = PortfoliosType.NORMAL class DataSync(ABC): ''' 数据同步服务,需要同步数据的服务,可以实现该接口 ''' @abstractmethod def do_sync(self, max_date=dt.today()): ''' 开始同步数据,到指定日期,如果没给则同步到当前日期 ''' pass class Cleanable(ABC): ''' 可清除服务 ''' @property @abstractmethod def clean_name(self): ''' 清除数据的名称 ''' pass @property @abstractmethod def clean_step(self): ''' 清除数据所属的步骤 ''' pass @abstractmethod def clean_up(self, min_date=None, risk: PortfoliosRisk = None): ''' 清理指定的数据 :param min_date: 指定的起始时间 :param risk: 指定的风险等级 ''' pass class Datum(ABC): ''' 基础资料服务,基金资料数据,各种指数,指标资料数据 ''' @abstractmethod def get_datums(self, type: DatumType = None, crncy=None, risk=None, datum_ids=None, ticker=None, exclude=True): ''' 获取资料信息,当id和ticker都有时,取二者并集 :param type: 资料类型 :param crncy: 货币类型,仅对基金资料有效 :param risk: 风险等级,仅对基金资料有效 :param datum_ids: 资料ID列表 :param ticker: 资料ticker列表 :param exclude: 是否排除过滤的资料,默认为True :return: 资料信息数据 ''' pass @abstractmethod def get_high_risk_datums(self, risk: PortfoliosRisk): ''' 根据指定的投组风险等级,获取高风险资产资料数据 :param risk: 投组风险等级 :return: 高风险资料信息 ''' pass @abstractmethod def update_change(self, date): ''' 在指定对日期,执行资料变更,如果有变更则返回True, 否则返回False,注意,改方法非幂等,变更后无法复原 :param date: 执行变更的日期 :return: 如果有变更则返回True,否则返回False ''' pass class Navs(ABC): ''' 基础数据相关服务,基金净值,各种指标 高开低收 ''' @abstractmethod def get_fund_navs(self, fund_ids=None, min_date=None, max_date=None): ''' 获取基金净值信息 :param fund_ids: 基金id,可以多个id,使用tuple包裹 :param min_date: 起始时间 :param max_date: 截止时间 :return: 基金净值信息 ''' pass @abstractmethod def get_nav_start_date(self, fund_ids=None): ''' 获取指定id资产的净值开始时间 :param fund_ids: 指定id资产,如果为None,则返回全部资产的开始时间 :return: 资产的开始时间字典 ''' pass @abstractmethod def get_index_close(self, datum_ids=None, min_date=None, max_date=None, ticker=None): ''' 获取指标收盘价 :param datum_ids: 指标资料id :param min_date: 起始时间 :param max_date: 截止时间 :param ticker: 指标资料ticker :return: 指标收盘价信息 ''' pass @abstractmethod def get_last_index_close(self, max_date, datum_id=None, ticker=None, count=1): ''' 获取指定资料或ticker,指定日期之前最后count个收盘价,当指定datum_id后,ticker参数无效 :param max_date: 指定日期 :param datum_id: 指标id,只能指定一个 :param ticker: 指标ticker,只能指定一个,当指标id有值后,该参数无效 :param count: 指定要返回数据的个数 :return: 如果存在,则返回指定日期最后count个收盘价,否则返回None ''' pass @abstractmethod def get_eco_values(self, datum_ids=None, min_date=None, max_date=None, ticker=None, by_release_date=False): ''' 获取经济指标数据,若同时给出ID,和ticker,则取二者并集 :param datum_ids: 经济指标id :param min_date: 起始日期 :param max_date: 截止日期 :param ticker: 经济指标ticker :param by_release_date: 如果为True,则使用公告日期查询,否则使用抓取日期 :return: 经济指标的值,包括查询日期,指标和公告日期 ''' pass @abstractmethod def get_last_eco_values(self, max_date, datum_id=None, ticker=None, count=1, by_release_date=False): ''' 获取指定资料或ticker,指定日期之前最后count个指标数据,当指定datum_id后,ticker参数无效 :param max_date: 指定日期 :param datum_id: 指标id,只能指定一个 :param ticker: 指标ticker,只能指定一个,当指标id有值后,该参数无效 :param count: 指定要返回数据的个数 :param by_release_date: 如果为True,则使用公告日期查询,否则使用抓取日期 :return: 如果存在,则返回指定日期最后count个指标项(查询日期,指标,公告日期),否则返回None ''' pass class AssetOptimize(ABC): ''' 优选相关服务ABC ''' @abstractmethod def find_optimize(self, fund_ids, day): ''' 从多id中,选出指定日期最优的id :param fund_ids: 待选id列表 :param day: 指定日期 :return: 最优的id ''' pass @abstractmethod def get_optimize_pool(self, day): ''' 根据优选规则获取指定日期的优选池 :param day: 指定日期 :return: 优选id列表 ''' pass class AssetPool(ABC): ''' 资产池相关服务 ''' @abstractmethod def get_pool(self, day): ''' 返回指定日期的可用资产池 :param day: 指定日期 :return: 资产id列表 ''' pass @abstractmethod def clear(self, day=None): ''' 清除指定日期之后的资产池数据,如果没有给日期,则全部清空 :param day: 指定清除的开始日期,可选 ''' pass class PortfoliosBuilder(ABC): ''' 投组组合构建器 ''' @abstractmethod def get_portfolios(self, day, risk: PortfoliosRisk, type: PortfoliosType = PortfoliosType.NORMAL): ''' 获取指定日期,指定风险等级,指定类型的投资组合 :param type: 投组的类型 :param day: 指定日期 :param risk: 风险等级 :return: 资产组合字典{id: weight} ''' pass @abstractmethod def build_portfolio(self, day, type: PortfoliosType): ''' 构建指定日期,指定类型的投资组合 :param day: 指定日期 :param type: 指定类型 :return 投资组合数据{risk: {...}},计算明细数据 {...} ''' pass @abstractmethod def clear(self, day=None, risk: PortfoliosRisk = None): ''' 清除指定风险等级,指定日期之后的最优投组 :param day: 指定清除的开始日期,可选,如果没给,则清除全部日期 :param risk: 指定风险等级,如果没给,则清除全部风险等级 ''' pass class PortfoliosChecker(ABC): ''' 投组组合检测器 ''' @abstractmethod def check(self, day=None, portfolios=None): """ 检测避免出现最优投组同时出现全部是ft或美盛基金的情况,增加一步替换动作。 @param day: @param portfolios: @return: """ pass class Solver(ABC): ''' 解算器 ''' @abstractmethod def solve_max_rtn(self): ''' :return: max_rtn, max_var, minCVaR_whenMaxR ''' pass @abstractmethod def solve_min_rtn(self): ''' :return: min_rtn, min_var, maxCVaR_whenMinR ''' pass @abstractmethod def solve_mpt(self, min_rtn, max_rtn): ''' 常规mpt计算 :param min_rtn: 最小回报率 :param max_rtn: 最大回报率 :return: 投组,cvar ''' pass @abstractmethod def solve_poem(self, min_rtn, max_rtn, base_cvar, max_cvar): ''' poem方式的mpt计算 :param min_rtn: 最小回报率 :param max_rtn: 最大回报率 :param base_cvar: 基础cvar :param max_cvar: 最大cvar :return: 投组,cvar ''' pass @abstractmethod def solve_risk_parity(self): ''' risk_parity计算 :return: 投组 ''' pass @abstractmethod def reset_navs(self, day): ''' 根据指定的日期,重置当前解算器,其他计算,全部依赖这里重置后的基金净值数据 :param day: 指定的日期 :return: 根据指定日期获取的,基金净值数据 ''' pass @abstractmethod def set_navs(self, navs): ''' 根据指定的navs,重置当前解算器 :param navs: 指定的navs ''' pass @abstractmethod def set_category(self, category): ''' 根据指定的category,重置当前解算器 :param category: 指定的category ''' pass @property @abstractmethod def category(self): pass @property @abstractmethod def navs(self): ''' :return: 当前解算器使用的基金净值 ''' pass @property @abstractmethod def transfer_type(self): """ 得出调仓类型 @return: """ pass class SolverFactory(ABC): ''' 解算器工厂 ''' @abstractmethod def create_solver(self, risk: PortfoliosRisk, type: PortfoliosType = PortfoliosType.NORMAL) -> Solver: ''' 根据指定的投组风险等级,以及投组类型,创建解算器 :param risk: 投组风险等级 :param type: 投组类型 :return: 解算器 ''' pass class PortfoliosHolder(ABC): ''' 投资组合持仓器 ''' @abstractmethod def get_portfolio_type(self, day, risk: PortfoliosRisk) -> PortfoliosType: ''' 获取指定日期指定风险等级持仓投组的类型 :param day: 指定日期 :param risk: 指定风险等级 :return: 持仓投组类型 ''' pass @abstractmethod def get_portfolios_weight(self, day, risk: PortfoliosRisk): ''' 获取指定日期指定风险等级的持仓投组比重 :param day: 指定日期 :param risk: 指定风险等级 :return: 持仓投组占比 ''' pass @abstractmethod def has_hold(self, risk: PortfoliosRisk) -> bool: ''' 是否存在指定分线等级的投组持仓 :param risk: 指定风险等级 :return: 如果已经存在持仓,则返回True, 否则返回False ''' pass @abstractmethod def build_hold_portfolio(self, day, risk: PortfoliosRisk, force_mpt=False): ''' 构建指定日期,指定风险等级的持仓投组,以day为截止日期,会持续补满 :param day: 指定日期 :param risk: 指定风险等级 :param force_mpt: 如果为True,则强制计算当天mpt,否则不强制计算 :return: ''' pass @abstractmethod def get_last_rebalance_date(self, risk: PortfoliosRisk, max_date=None): ''' 获取最后一次实际调仓的时间 :param risk: 持仓风险等级类型,必须 :param max_date: 指定日期之前的最后一次,可选 :return: 最后一次实际调仓的日期 ''' pass @abstractmethod def get_rebalance_date_by_signal(self, signal_id): ''' 获取指定调仓信号触发的实际调仓日期 :param signal_id: 指定的调仓信号 :return: 实际调仓日期 ''' pass @property @abstractmethod def interval_days(self): ''' 返回实际交易的最小间隔交易日数 :return: 实际交易的最小间隔交易日数 ''' pass @abstractmethod def is_dividend_date(self, day): """ 是否为配息日 :param day: 日期 :return: 是否为配息日 """ pass @abstractmethod def clear(self, day=None, risk: PortfoliosRisk = None): ''' 清除指定风险等级,指定日期之后的持仓投组 :param day: 指定清除的开始日期,可选,如果没给,则清除全部日期 :param risk: 指定风险等级,如果没给,则清除全部风险等级 ''' pass @property @abstractmethod def month_dividend(self): """ 获取当月配息 """ pass class RoboExecutor(ABC): ''' ROBO执行器,整合以上逻辑,进行实盘或回测 ''' @abstractmethod def start_exec(self): ''' 开始执行测试逻辑 ''' pass @property @abstractmethod def start_date(self): ''' :return: 执行器开始日期 ''' pass @staticmethod def use_name(): return get_config('robo-executor')['use'] @property def curt_date(self): ''' :return: 当前运行的日期 ''' if len(sys.argv) > 1: try: return parse_date(sys.argv[1]) except Exception as e: logger.warning(f'get curt date from argv failure.', e) return dt.combine(dt.today().date(), dt.min.time()) class RoboReportor(ABC): ''' 投组报告器 ''' @property @abstractmethod def report_name(self) -> str: ''' 返回报告名称 :return: 报告名称 ''' pass @abstractmethod def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]: ''' 获取指定日期的报告 :param max_date: 指定截止日期 :param min_date: 指定开始日期 :return: 报告数据 ''' pass class RoboExportor(ABC): ''' 投组导出器 ''' @abstractmethod def export(self, max_date=dt.today(), min_date=None): ''' 根据参数以及配置信息执行导出相关操作 :param max_date: 指定截止日期 :param min_date: 指定开始日期 ''' pass class DataLogger(ABC): @abstractmethod def save_record(self, date: dt, risk: PortfoliosRisk, type: LoggerType, datas: dict, exist_merge=True): ''' 保存数据日志记录 :param date: 要保存的数据记录日期 :param risk: 要保存的数据记录风险等级 :param type: 要保存的数据记录类型 :param datas: 要保存的数据记录 :param exist_merge: 如果要保存的记录存在,需要处理的方式,与之前的数据做合并处理,则为Ture, 否则会直接覆盖 ''' pass @abstractmethod def load_records(self, max_date=None, min_date=None, risk: PortfoliosRisk = None, type: LoggerType = None): ''' 获取数据日志记录 :param max_date: 截止日期 :param min_date: 起始日期 :param risk: 风险等级 :param type: 日志类型 :return: 日志数据列表 ''' pass class RebalanceSignal(ABC): ''' 控制信号,发起是否调仓服务 ''' @abstractmethod def get_signal(self, day, risk: PortfoliosRisk): ''' 根据日期和风险等级,返回当天的调仓信号,如果没有则返回None :param day: 指定的日期,净值日 :param risk: 指定的风险等级 :return: 如果有信号,则返回信号数据,否则返回None ''' pass @property @abstractmethod def signal_type(self) -> SignalType: ''' 返回信号类型 :return: 信号类型 ''' pass @abstractmethod def get_last_signal(self, day, risk: PortfoliosRisk): ''' 根据日期和风险等级,返回最近的调仓信号,如果没有则返回None :param day: 指定的日期,净值日 :param risk: 指定的风险等级 :return: 如果有信号,则返回信号数据,否则返回None ''' @abstractmethod def clear(self, min_date=None, risk: PortfoliosRisk = None): ''' 清理指定的数据 :param min_date: 指定的起始时间 :param risk: 指定的风险等级 ''' pass