Commit 61860c1c authored by jichao's avatar jichao

导出模块完毕

parent 3f327ee0
......@@ -5,3 +5,4 @@
*.log
/venv/
/logs
/excels
\ No newline at end of file
from abc import ABC, abstractmethod
from datetime import datetime as dt
from enum import Enum, unique
from typing import List
from typing import List, Dict
from py_jftech import get_config
......@@ -435,16 +435,24 @@ class PortfoliosHolder(ABC):
pass
@abstractmethod
def get_last_rebalance_date(self, risk: PortfoliosRisk, max_date=None, signal_id=None):
def get_last_rebalance_date(self, risk: PortfoliosRisk, max_date=None):
'''
获取最后一次实际调仓的时间
:param risk: 持仓风险等级类型,必须
:param max_date: 指定日期之前的最后一次,可选
:param signal_id: 指定信号的最后一次调仓,可选
:return: 最后一次实际调仓的日期
'''
pass
@abstractmethod
def get_rebalance_date_by_signal(self, signal_id):
'''
获取指定调仓信号触发的实际调仓日期
:param signal_id: 指定的调仓信号
:return: 实际调仓日期
'''
pass
@property
@abstractmethod
def interval_days(self):
......@@ -523,10 +531,10 @@ class RebalanceRuler(ABC):
pass
@abstractmethod
def get_signal_type(self, sign_id) -> SignalType:
def get_signal_type(self, sign_id) -> SignalType | Dict[int, SignalType]:
'''
获取指定id的信号类型
:param sign_id: 信号id
:param sign_id: 信号id, 可以多个,使用元祖包裹
:return: 信号类型
'''
pass
......@@ -566,8 +574,11 @@ class RoboExecutor(ABC):
return get_config('robo-executor')['use']
class RoboReport(ABC):
class RoboReportor(ABC):
'''
投组报告器
'''
@property
@abstractmethod
def report_name(self) -> str:
'''
......@@ -577,10 +588,27 @@ class RoboReport(ABC):
pass
@abstractmethod
def load_report(self, day=dt.today()) -> List[dict]:
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
'''
获取指定日期的报告
:param day: 指定日期
:param max_date: 指定截止日期
:param min_date: 指定开始日期
:return: 报告数据
'''
pass
class RoboExportor(ABC):
'''
投组导出器
'''
@abstractmethod
def export(self, max_date=dt.today(), min_date=None):
'''
导出指定日期的数据到excel
:param max_date: 指定截止日期
:param min_date: 指定开始日期
:return: 导出文件路径
'''
pass
......@@ -155,6 +155,57 @@ rebalance: # 再平衡模块
cvar-min-volume: 30 # 计算cvar至少需要多少交易日数据
high-low-buy: # 高低买入相关
threshold: [ 0.5, 0.8 ] # [ 低买入阀值,高买入阀值 ]
reports:
benchmark:
init-amount: 100
stock-rate:
RR3: 0.3
RR4: 0.5
RR5: 0.7
fixed-range:
range-dates:
- start: 2008-01-01
end: 2008-10-27
- start: 2011-05-02
end: 2011-10-04
- start: 2013-05-08
end: 2013-06-24
- start: 2014-09-03
end: 2014-12-16
- start: 2015-04-28
end: 2016-01-21
- start: 2018-01-26
end: 2018-10-29
- start: 2020-01-20
end: 2020-03-23
relative-range:
range-dates:
- months: 1
name: '一个月'
- months: 3
name: '三个月'
- months: 6
name: '六个月'
- years: 1
name: '一年'
- years: 2
name: '两年'
- years: 3
name: '三年'
- years: 5
name: '五年'
- years: 10
name: '十年'
backtest:
exist-build: on
save-path: ${EXPORT_PATH:excels}
include-report:
- hold-report
- signal-report
- benckmark-report
- indicators-report
- fixed-range-report
- relative-range-report
robo-executor: # 执行器相关
use: ${ROBO_EXECUTOR:backtest} #执行哪个执行器,优先取系统环境变量ROBO_EXECUTOR的值,默认backtest
sync-data: ${SYNC_DATA:on}
......
......@@ -13,13 +13,26 @@ __COLUMNS__ = {
}
@read
def get_list(risk: PortfoliosRisk = None, min_date=None, max_date=None, rebalance: bool = None):
sqls = []
if min_date:
sqls.append(f"rhp_date >= '{format_date(min_date)}'")
if max_date:
sqls.append(f"rhp_date <= '{format_date(max_date)}'")
return f'''
select {','.join([f'{x[0]} as {x[1]}' for x in __COLUMNS__.items()])} from robo_hold_portfolios
{where(*sqls, rhp_risk=risk, rhp_rebalance=rebalance)} order by rhp_risk, rhp_date
'''
@read(one=True)
def get_one(day, risk: PortfoliosRisk):
return f'''select {','.join([f'{x[0]} as {x[1]}' for x in __COLUMNS__.items()])} from robo_hold_portfolios {where(rhp_date=day, rhp_risk=risk)}'''
@read(one=True)
def get_last_one(risk: PortfoliosRisk, max_date=None, rebalance: bool = None, signal_id=None):
def get_last_one(risk: PortfoliosRisk = None, max_date=None, rebalance: bool = None, signal_id=None):
sql = "rhp_date <= '{format_date(max_date)}'" if max_date else None
return f'''
select {','.join([f'{x[0]} as {x[1]}' for x in __COLUMNS__.items()])} from robo_hold_portfolios
......
import json
import logging
from datetime import datetime as dt
from typing import List
import pandas as pd
from py_jftech import (
component, autowired, get_config, next_workday, prev_workday, transaction, workday_range, format_date
)
from api import PortfoliosHolder, PortfoliosRisk, RebalanceRuler, Navs, SignalType, RoboExecutor, PortfoliosType
from api import PortfoliosHolder, PortfoliosRisk, RebalanceRuler, Navs, SignalType, RoboExecutor, PortfoliosType, RoboReportor
from portfolios.dao import robo_hold_portfolios as rhp
from portfolios.utils import format_weight
......@@ -30,9 +32,13 @@ class NextReblanceHolder(PortfoliosHolder):
return signal_type.p_type if signal_type else PortfoliosType.NORMAL
return PortfoliosType.NORMAL
def get_last_rebalance_date(self, risk: PortfoliosRisk, max_date=None, signal_id=None):
def get_last_rebalance_date(self, risk: PortfoliosRisk, max_date=None):
assert risk, f"get last rebalance date, risk can not be none"
last = rhp.get_last_one(max_date=max_date, risk=risk, signal_id=signal_id, rebalance=True)
last = rhp.get_last_one(max_date=max_date, risk=risk, rebalance=True)
return last['date'] if last else None
def get_rebalance_date_by_signal(self, signal_id):
last = rhp.get_last_one(signal_id=signal_id, rebalance=True)
return last['date'] if last else None
def get_portfolios_weight(self, day, risk: PortfoliosRisk):
......@@ -127,3 +133,25 @@ class NextReblanceHolder(PortfoliosHolder):
@property
def init_nav(self):
return self._config['init-nav']
@component(bean_name='hold-report')
class HoldReportor(RoboReportor):
@autowired
def __init__(self, rule: RebalanceRuler = None):
self._rule = rule
@property
def report_name(self) -> str:
return '投组净值'
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
holds = pd.DataFrame(rhp.get_list(max_date=max_date, min_date=min_date))
if not holds.empty:
signal_types = self._rule.get_signal_type(tuple(set(holds['signal_id'])))
holds['signal_type'] = holds.apply(lambda row: signal_types[row['signal_id']].name, axis=1)
holds['risk'] = holds.apply(lambda row: PortfoliosRisk(row['risk']).name, axis=1)
holds = holds[['risk', 'date', 'nav', 'signal_type']]
return holds.to_dict('records')
return []
import logging
import unittest
from py_jftech import autowired, parse_date
from py_jftech import autowired, parse_date, to_str
from api import PortfoliosBuilder, PortfoliosType, PortfoliosRisk, PortfoliosHolder, SolveType
from portfolios.dao import robo_mpt_portfolios as rmp
from pymysql import IntegrityError, constants
from api import PortfoliosBuilder, PortfoliosType, PortfoliosRisk, PortfoliosHolder, RoboReportor
class PortfoliosTest(unittest.TestCase):
......@@ -34,6 +32,11 @@ class PortfoliosTest(unittest.TestCase):
hold.build_hold_portfolio(parse_date('2016-01-01'), PortfoliosRisk.FT9)
pass
@autowired(names={'reportor': 'hold-report'})
def test_hold_report(self, reportor: RoboReportor = None):
report = reportor.load_report()
self.logger.info(to_str(report))
if __name__ == '__main__':
unittest.main()
from py_jftech import read, write, where, format_date, mapper_columns
from py_jftech import read, write, where, format_date, mapper_columns, to_tuple
from api import SignalType, PortfoliosRisk
......@@ -13,10 +13,27 @@ __COLUMNS__ = {
}
@read
def get_list(min_date=None, max_date=None, risk: PortfoliosRisk = None, type: SignalType = None, effective: bool = None):
sqls = []
if min_date:
sqls.append(f"rrs_date >= '{format_date(min_date)}'")
if max_date:
sqls.append(f"rrs_date <= '{format_date(max_date)}'")
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal
{where(*sqls, rrs_risk=risk, rrs_type=type, rrs_effective=effective)} order by rrs_risk, rrs_date
'''
@read
def get_by_ids(ids):
return f'''select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal {where(rrs_id=to_tuple(ids))}'''
@read(one=True)
def get_by_id(id):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal {where(rrs_id=id)}'''
return f'''select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_rebalance_signal {where(rrs_id=id)}'''
@read(one=True)
......@@ -47,6 +64,7 @@ def get_count(risk: PortfoliosRisk = None, day=None, effective=None):
@read(one=True)
def exec():
return f"select count(*) as `count` from robo_rebalance_signal {where(rrs_risk=risk, rrs_date=day, rrs_effective=effective)}"
result = exec()
return result['count']
......
from typing import List
from datetime import datetime as dt
from typing import List, Dict
import json
from py_jftech import component, autowired, get_config, workday_range, next_workday
import pandas as pd
import numpy as np
from py_jftech import component, autowired, get_config, workday_range, next_workday, to_tuple
from api import RebalanceRuler, PortfoliosRisk, RebalanceSignal, SignalType, PortfoliosType, PortfoliosHolder
from api import RebalanceRuler, PortfoliosRisk, RebalanceSignal, SignalType, PortfoliosType, PortfoliosHolder, RoboReportor, Datum, DatumType
from rebalance.dao import robo_rebalance_signal as rrs
......@@ -64,8 +68,12 @@ class LevelRebalanceRuler(RebalanceRuler):
return signal
return None
def get_signal_type(self, sign_id) -> SignalType:
signal = rrs.get_by_id(sign_id)
def get_signal_type(self, sign_id) -> SignalType | Dict[int, SignalType]:
sign_id = to_tuple(sign_id)
if len(sign_id) > 1:
return {x['id']: SignalType(x['type']) for x in rrs.get_by_ids(sign_id)}
else:
signal = rrs.get_by_id(sign_id[0])
return SignalType(signal['type']) if signal else None
def commit_signal(self, sign_id):
......@@ -73,3 +81,35 @@ class LevelRebalanceRuler(RebalanceRuler):
def clear_signal(self, day=None, risk: PortfoliosRisk = None):
rrs.delete(min_date=day, risk=risk)
@component(bean_name='signal-report')
class SignalExportor(RoboReportor):
@autowired
def __init__(self, hold: PortfoliosHolder = None, datum: Datum = None):
self._hold = hold
self._datum = datum
@property
def report_name(self) -> str:
return '调仓信号'
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
result = []
datums = {str(x['id']): x for x in self._datum.get_datums(type=DatumType.FUND)}
for signal in rrs.get_list(max_date=max_date, min_date=min_date, effective=True):
rebalance_date = self._hold.get_rebalance_date_by_signal(signal['id'])
for fund_id, weight in json.loads(signal['portfolio']).items():
result.append({
'risk': PortfoliosRisk(signal['risk']).name,
'type': SignalType(signal['type']).name,
'signal_date': signal['date'],
'rebalance_date': rebalance_date,
'portfolio_type': PortfoliosType(signal['portfolio_type']).name,
'ft_ticker': datums[fund_id]['ftTicker'],
'blooberg_ticker': datums[fund_id]['bloombergTicker'],
'fund_name': datums[fund_id]['chineseName'],
'weight': weight
})
return result
import logging
import unittest
from py_jftech import autowired, parse_date
from py_jftech import autowired, parse_date, to_str
from api import RebalanceSignal, PortfoliosRisk, RebalanceRuler
from api import RebalanceSignal, PortfoliosRisk, RebalanceRuler, RoboReportor
class RebalanceTest(unittest.TestCase):
......@@ -33,6 +33,11 @@ class RebalanceTest(unittest.TestCase):
def test_rebalance_builder(self, builder: RebalanceRuler = None):
builder.take_next_signal(parse_date('2022-09-01'), PortfoliosRisk.FT3)
@autowired(names={'reportor': 'signal-report'})
def test_signal_report(self, reportor: RoboReportor = None):
result = reportor.load_report()
self.logger.info(to_str(result, show_line=10))
if __name__ == '__main__':
unittest.main()
import os
from datetime import datetime as dt
from typing import List
import pandas as pd
from py_jftech import component, autowired, get_config, get_instance_name, get_project_path, format_date
from api import RoboReportor, RoboExportor
def include_report():
return get_config(__name__)['include-report']
@component(bean_name='backtest-export')
class BacktestExportor(RoboExportor):
@autowired(includes={'reportors': include_report()})
def __init__(self, reportors: List[RoboReportor] = None):
reportors = {get_instance_name(x): x for x in reportors}
self._reportors: List[RoboReportor] = [reportors[x] for x in include_report()]
self._config = get_config(__name__)
@property
def save_path(self):
save_path: str = self._config['save-path']
if save_path.startswith('.'):
return os.path.abspath(os.path.join(os.path.dirname(__file__), save_path))
elif save_path.startswith('/'):
return os.path.abspath(save_path)
return os.path.abspath(os.path.join(get_project_path(), save_path))
@property
def exist_build(self):
return self._config['exist-build']
def export(self, max_date=dt.today(), min_date=None):
root = self.save_path
os.makedirs(root, exist_ok=True)
filename = f"backtest_{format_date(max_date)}.xlsx"
if min_date:
filename = f"backtest_{format_date(min_date)}_to_{format_date(max_date)}.xlsx"
file = os.path.join(root, filename)
if os.path.exists(file):
if not self.exist_build:
return file
os.remove(file)
with pd.ExcelWriter(file) as writer:
for reportor in self._reportors:
datas = pd.DataFrame(reportor.load_report(max_date=max_date, min_date=min_date))
if not datas.empty:
datas.to_excel(writer, sheet_name=reportor.report_name, index=False)
return file
from datetime import datetime as dt, timedelta
from typing import List
import pandas as pd
from py_jftech import component, autowired, prev_workday, filter_weekend, next_workday, get_config
from api import RoboReportor, PortfoliosRisk, RoboExecutor, Navs, Datum, DatumType
from reports.dao import robo_benckmark as rb
@component(bean_name='benckmark-report')
class BenchmarkReportor(RoboReportor):
@autowired(names={'executor': RoboExecutor.use_name()})
def __init__(self, executor: RoboExecutor = None, navs: Navs = None, datum: Datum = None):
self._exec = executor
self._navs = navs
self._datum = datum
self._config = get_config(__name__)
@property
def report_name(self) -> str:
return 'benchmark'
@property
def risks(self):
return self._config['stock-rate'].keys()
@property
def init_amount(self):
return self._config['init-amount']
def stock_rate(self, risk):
return self._config['stock-rate'][risk]
def load_nav_rtn(self, risk, day):
last = rb.get_last_one(risk=risk, max_date=day, re=True)
start_date = last['date'] if last else next_workday(self._exec.start_date)
datums = {x['id']: x for x in self._datum.get_datums(type=DatumType.FUND)}
navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=tuple(datums.keys()), min_date=prev_workday(start_date - timedelta(10)), max_date=day))
navs = navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
navs.fillna(method='ffill', inplace=True)
nav_index = navs.shape[1]
for i in range(nav_index):
navs[f'rtn_{navs.columns[i]}'] = navs[navs.columns[i]] / navs[navs.columns[i]].shift() - 1
navs = navs[navs.index >= start_date]
return navs, nav_index
def find_datum_asset(self):
return {x['id']: x['assetType'] for x in self._datum.get_datums(type=DatumType.FUND)}
def build_benchmark(self, risk, day=dt.today()):
nav_rtn, nav_index = self.load_nav_rtn(risk=risk, day=day)
asset_types = {x['id']: x['assetType'] for x in self._datum.get_datums(type=DatumType.FUND)}
last = rb.get_last_one(risk=risk, max_date=day, re=True)
init_amount = last['nav'] if last else self.init_amount
stock_rate = self.stock_rate(risk)
other_rate = 1 - stock_rate
five_rtn = 0
last_day = None
fund_ids = None
for index, row in nav_rtn.iterrows():
if last_day is None or fund_ids is None:
fund_ids = list(row.iloc[:nav_index].dropna().index)
stock_count = len([x for x in fund_ids if asset_types[x] == 'STOCK'])
stock_average = init_amount * stock_rate / stock_count
other_average = init_amount * other_rate / (len(fund_ids) - stock_count)
nav_rtn.loc[index, f'{risk}_result'] = init_amount
nav_rtn.loc[index, f'{risk}_re'] = 1
for fund_id in fund_ids:
if fund_id and asset_types[fund_id] == 'STOCK':
nav_rtn.loc[index, f'stock_{fund_id}'] = stock_average
else:
nav_rtn.loc[index, f'other_{fund_id}'] = other_average
else:
for fund_id in fund_ids:
if fund_id and asset_types[fund_id] == 'STOCK':
nav_rtn.loc[index, f'stock_{fund_id}'] = nav_rtn.loc[last_day, f'stock_{fund_id}'] * (
1 + nav_rtn.loc[index, f'rtn_{fund_id}'])
else:
nav_rtn.loc[index, f'other_{fund_id}'] = nav_rtn.loc[last_day, f'other_{fund_id}'] * (
1 + nav_rtn.loc[index, f'rtn_{fund_id}'])
nav_rtn.loc[index, f'{risk}_result'] = nav_rtn.loc[index][-len(fund_ids):].sum()
nav_rtn.loc[index, f'{risk}_re'] = 0
if five_rtn == 5:
five_rtn = 0
fund_ids = list(row.iloc[:nav_index].dropna().index)
stock_count = len([x for x in fund_ids if asset_types[x] == 'STOCK'])
stock_average = nav_rtn.loc[index, f'{risk}_result'] * stock_rate / stock_count
other_average = nav_rtn.loc[index, f'{risk}_result'] * other_rate / (len(fund_ids) - stock_count)
nav_rtn.loc[index, f'{risk}_re'] = 1
for fund_id in fund_ids:
if fund_id and asset_types[fund_id] == 'STOCK':
nav_rtn.loc[index, f'stock_{fund_id}'] = stock_average
else:
nav_rtn.loc[index, f'other_{fund_id}'] = other_average
five_rtn += 1
last_day = index
result = nav_rtn.reindex(columns=[f'{risk}_result', f'{risk}_re'])
result.reset_index(inplace=True)
result['risk'] = risk
result.rename(columns={f'{risk}_result': 'nav', f'{risk}_re': 're', 'nav_date': 'date'}, inplace=True)
result['nav'] = round(result['nav'], 4)
return result.to_dict('records')
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
for risk in self.risks:
last = rb.get_last_one(max_date=max_date, risk=risk)
if not last or last['date'] < filter_weekend(max_date):
benchmarks = pd.DataFrame(self.build_benchmark(risk=risk, day=max_date))
if last:
benchmarks = benchmarks[benchmarks.date > last['date']]
if not benchmarks.empty:
rb.batch_insert(benchmarks.to_dict('records'))
result = pd.DataFrame(rb.get_list(max_date=max_date, min_date=min_date))
result = result.pivot_table(index='date', columns='risk', values='nav')
result.reset_index(inplace=True)
return result.to_dict('records')
from datetime import datetime as dt
from typing import List
import pandas as pd
from py_jftech import component, autowired
from empyrical import annual_return, annual_volatility, max_drawdown, sharpe_ratio
from api import RoboReportor, Navs
@component(bean_name='combo-report')
class ComboDatasReport(RoboReportor):
@autowired(names={'hold_reportor': 'hold-report', 'benchmark': 'benckmark-report'})
def __init__(self, hold_reportor: RoboReportor = None, benchmark: RoboReportor = None, navs: Navs = None):
self._hold_reportor = hold_reportor
self._benchmark = benchmark
self._navs = navs
@property
def report_name(self) -> str:
return '混合数据'
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
holds = pd.DataFrame(self._hold_reportor.load_report(max_date=max_date, min_date=min_date))
if not holds.empty:
holds['risk'] = holds.apply(lambda row: row.risk, axis=1)
datas = holds.pivot_table(index='date', columns='risk', values='nav')
benchmark = pd.DataFrame(self._benchmark.load_report(max_date=max_date, min_date=min_date))
datas = datas.join(benchmark.set_index('date'))
spx = pd.DataFrame(self._navs.get_index_close(ticker='SPX Index', min_date=min_date, max_date=max_date))
spx = spx.pivot_table(index='date', columns='index_id', values='close')
spx.columns = ['SPX']
datas = datas.join(spx)
datas.fillna(method='ffill', inplace=True)
datas.reset_index(inplace=True)
return datas.to_dict('records')
return []
CREATE TABLE IF NOT EXISTS robo_benchmark
(
rb_id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
rb_date DATETIME NOT NULL COMMENT '日期',
rb_risk VARCHAR(255) NOT NULL COMMENT '风险等级',
rb_nav DOUBLE(16, 4) NOT NULL COMMENT '资产值',
rb_re TINYINT NOT NULL DEFAULT 0 COMMENT '是否再分配',
rb_create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
rb_update_time DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (rb_id),
UNIQUE INDEX (rb_date, rb_risk),
INDEX (rb_risk)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT 'BENCHMARK数据表';
\ No newline at end of file
from py_jftech import read, write, where, mapper_columns, format_date
__COLUMNS__ = {
'rb_id': 'id',
'rb_date': 'date',
'rb_risk': 'risk',
'rb_nav': 'nav',
'rb_re': 're',
}
@write
def batch_insert(datas):
datas = [mapper_columns(x, __COLUMNS__) for x in datas]
values = ','.join([f'''({','.join([(f"'{x[j]}'" if j in x and x[j] is not None else 'null') for j in __COLUMNS__.keys() if j != 'rb_id'])})''' for x in datas])
return f'''insert into robo_benchmark({','.join([x for x in __COLUMNS__.keys() if x != 'rb_id'])}) values {values}'''
@read(one=True)
def get_last_one(max_date=None, risk=None, re: bool = None):
sql = f"rb_date <= '{format_date(max_date)}'" if max_date else None
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_benchmark
{where(sql, rb_risk=risk, rb_re=re)} order by rb_date desc limit 1
'''
@read
def get_list(max_date=None, min_date=None, risk=None, re: bool = None):
sqls = []
if max_date:
sqls.append(f"rb_date <= '{format_date(max_date)}'")
if min_date:
sqls.append(f"rb_date >= '{format_date(min_date)}'")
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_benchmark
{where(*sqls, rb_risk=risk, rb_re=re)} order by rb_risk, rb_date
'''
from datetime import datetime as dt
from typing import List
import pandas as pd
from py_jftech import component, autowired, get_config, format_date
from api import RoboReportor
@component(bean_name='fixed-range-report')
class FixedRangeReport(RoboReportor):
@autowired(names={'combo': 'combo-report'})
def __init__(self, combo: RoboReportor = None):
self._combo = combo
self._config = get_config(__name__)
@property
def report_name(self) -> str:
return '固定区间收益率'
@property
def range_dates(self):
return self._config['range-dates']
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
datas = pd.DataFrame(self._combo.load_report(max_date=max_date, min_date=min_date))
datas.set_index('date', inplace=True)
if not datas.empty:
result = pd.DataFrame(columns=datas.columns)
for range in self.range_dates:
row_name = f"{format_date(range['start'])}~{format_date(range['end'])}"
result.loc[row_name] = datas[range['start']:range['end']].values[-1] / datas[range['start']:range['end']].values[0] - 1
result = round(result, 4) * 100
result.reset_index(inplace=True)
result.rename(columns={'index': 'range-date'}, inplace=True)
return result.to_dict('records')
return []
from datetime import datetime as dt
from typing import List
import pandas as pd
from empyrical import annual_return, annual_volatility, max_drawdown, sharpe_ratio
from py_jftech import component, autowired
from api import RoboReportor
@component(bean_name='indicators-report')
class IndicatorsReportor(RoboReportor):
@autowired(names={'combo': 'combo-report'})
def __init__(self, combo: RoboReportor = None):
self._combo = combo
@property
def report_name(self) -> str:
return '指标'
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
datas = pd.DataFrame(self._combo.load_report(max_date=max_date, min_date=min_date))
datas.set_index('date', inplace=True)
if not datas.empty:
returns = round(datas.pct_change(), 5)
indicators = {
'annual_return': list(annual_return(returns, period='daily', annualization=None) * 100),
'annual_volatility': annual_volatility(returns, period='daily', annualization=None) * 100,
'max_drawdown': max_drawdown(returns, out=None) * 100,
'sharp': sharpe_ratio(returns, risk_free=0, period='daily', annualization=None),
}
indicators['calmar'] = abs(indicators['annual_return'] / indicators['max_drawdown'])
result = pd.DataFrame(indicators.values(), index=indicators.keys(), columns=list(returns.columns)).round(2)
result.reset_index(inplace=True)
result.rename(columns={'index': 'indicators'}, inplace=True)
return result.to_dict('records')
return []
from datetime import datetime as dt
from typing import List
import pandas as pd
from dateutil.relativedelta import relativedelta
from py_jftech import component, autowired, get_config, format_date
from api import RoboReportor
@component(bean_name='relative-range-report')
class RelativeRangeReport(RoboReportor):
@autowired(names={'combo': 'combo-report'})
def __init__(self, combo: RoboReportor = None):
self._combo = combo
self._config = get_config(__name__)
@property
def report_name(self) -> str:
return '相对区间收益率'
@property
def range_dates(self):
return self._config['range-dates']
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
datas = pd.DataFrame(self._combo.load_report(max_date=max_date, min_date=min_date))
datas.set_index('date', inplace=True)
if not datas.empty:
result = pd.DataFrame(columns=datas.columns)
for range in self.range_dates:
kwargs = range.copy()
del kwargs['name']
start = max_date - relativedelta(**kwargs)
row_name = f"{range['name']}({format_date(start)}~{format_date(max_date)})"
result.loc[row_name] = datas[start:max_date].values[-1] / datas[start:max_date].values[0] - 1
result = round(result, 4) * 100
result.reset_index(inplace=True)
result.rename(columns={'index': 'range-date'}, inplace=True)
return result.to_dict('records')
return []
import unittest
import logging
from py_jftech import autowired, to_str, parse_date
from api import RoboReportor, RoboExportor
logger = logging.getLogger(__name__)
class ReportTest(unittest.TestCase):
@autowired(names={'reportor': 'benckmark-report'})
def test_benchmark_report(self, reportor: RoboReportor = None):
result = reportor.load_report(max_date=parse_date('2022-11-01'))
logger.info(to_str(result))
@autowired(names={'reportor': 'indicators-report'})
def test_indicator_report(self, reportor: RoboReportor = None):
result = reportor.load_report(max_date=parse_date('2022-11-01'))
logger.info(to_str(result))
@autowired(names={'reportor': 'fixed-range-report'})
def test_fixed_report(self, reportor: RoboReportor = None):
result = reportor.load_report(max_date=parse_date('2022-11-01'))
logger.info(to_str(result))
@autowired(names={'reportor': 'relative-range-report'})
def test_relative_report(self, reportor: RoboReportor = None):
result = reportor.load_report(max_date=parse_date('2022-11-01'))
logger.info(to_str(result))
@autowired(names={'reportor': 'backtest-export'})
def test_backtest_export(self, exportor: RoboExportor = None):
path = exportor.export(max_date=parse_date('2022-11-01'))
logger.info(path)
if __name__ == '__main__':
unittest.main()
certifi==2022.9.24
charset-normalizer==2.1.1
empyrical==0.5.5
et-xmlfile==1.1.0
idna==3.4
lxml==4.9.0
numpy==1.23.4
openpyxl==3.0.10
pandas==1.5.1
pandas-datareader==0.10.0
ply==3.11
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment