Commit 39077d30 authored by wenwen.tang's avatar wenwen.tang 😕

报表相关

parent 5722cfad
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import datetime as dt from datetime import datetime as dt
from enum import Enum, unique from enum import Enum, unique
from typing import List
from py_jftech import get_config from py_jftech import get_config
...@@ -473,3 +474,43 @@ class RoboExecutor(ABC): ...@@ -473,3 +474,43 @@ class RoboExecutor(ABC):
@staticmethod @staticmethod
def use_name(): def use_name():
return get_config('robo-executor')['use'] return get_config('robo-executor')['use']
class RoboReportor(ABC):
'''
投组报告器
'''
@property
@abstractmethod
def report_name(self) -> str:
'''
返回报告名称
:return: 报告名称
'''
pass
@abstractmethod
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
'''
获取指定日期的报告
:param max_date: 指定截止日期
:param min_date: 指定开始日期
:return: 报告数据
'''
pass
class RoboExportor(ABC):
'''
投组导出器
'''
@abstractmethod
def export(self, max_date=dt.today(), min_date=None):
'''
根据参数以及配置信息执行导出相关操作
:param max_date: 指定截止日期
:param min_date: 指定开始日期
'''
pass
...@@ -38,10 +38,10 @@ py-jftech: ...@@ -38,10 +38,10 @@ py-jftech:
injectable: injectable:
types: types:
api.PortfoliosBuilder: portfolios.builder.PoemPortfoliosBuilder api.PortfoliosBuilder: portfolios.builder.PoemPortfoliosBuilder
email: # email:
server: smtphz.qiye.163.com # server: smtphz.qiye.163.com
user: jft-ra@thizgroup.com # user: jft-ra@thizgroup.com
password: 5dbb#30ec6d3 # password: 5dbb#30ec6d3
mulit-process: mulit-process:
max-workers: ${MAX_PROCESS:4} max-workers: ${MAX_PROCESS:4}
basic: # 基础信息模块 basic: # 基础信息模块
...@@ -102,6 +102,66 @@ portfolios: # 投组模块 ...@@ -102,6 +102,66 @@ portfolios: # 投组模块
high-weight: [ 1 ] # 最高权重比例,可给一个值,也可以给多个值,当多个值时,第一个表示只有一个资产时权重,第二个表示只有两个资产时权重,以此类推,最后一个表示其他资产个数时的权重 high-weight: [ 1 ] # 最高权重比例,可给一个值,也可以给多个值,当多个值时,第一个表示只有一个资产时权重,第二个表示只有两个资产时权重,以此类推,最后一个表示其他资产个数时的权重
poem: # poem相关 poem: # poem相关
cvar-scale-factor: 0.1 # 计算时用到的系数 cvar-scale-factor: 0.1 # 计算时用到的系数
reports: # 报告模块相关
navs:
type: FUND
tickers:
- TEMTECI LX Equity
- TEPLX US Equity
- FRDPX US Equity
- FKRCX US Equity
- FTNRACU LX Equity
benchmark: # benchmark报告
init-amount: 100 # 初始金额
stock-rate: # stock型基金比例
RR3: 0.3
RR4: 0.5
RR5: 0.7
fixed-range: # 固定区间收益率
range-dates: # 固定起始截止日期
- start: 2008-01-01
end: 2008-10-27
- start: 2011-05-02
end: 2011-10-04
- start: 2013-05-08
end: 2013-06-24
- start: 2014-09-03
end: 2014-12-16
- start: 2015-04-28
end: 2016-01-21
- start: 2018-01-26
end: 2018-10-29
- start: 2020-01-20
end: 2020-03-23
relative-range: # 相对区间收益率
range-dates: # 相对时间周期
- months: 1
name: '一个月'
- months: 3
name: '三个月'
- months: 6
name: '六个月'
- years: 1
name: '一年'
- years: 2
name: '两年'
- years: 3
name: '三年'
- years: 5
name: '五年'
- years: 10
name: '十年'
backtest: # 回测导出曹策略
exist-build: on # 如果报告文件存在,是否重新构建文件
save-path: ${EXPORT_PATH:excels} # 导出报告文件存放路径,如果以./或者../开头,则会以执行python文件为根目录,如果以/开头,则为系统绝对路径,否则,以项目目录为根目录
file-name: ${EXPORT_FILENAME:real}
include-report: # 需要导出的报告类型列表,下面的顺序,也代表了excel中sheet的顺序
- hold-report # 持仓报告
- signal-report # 信号报告
- benckmark-report # benckmark报告
- indicators-report # 各种特殊指标报告
- fixed-range-report # 固定区间收益报告
- relative-range-report # 相对区间收益报告
robo-executor: # 执行器相关 robo-executor: # 执行器相关
use: ${ROBO_EXECUTOR:backtest} # 执行哪个执行器,优先取系统环境变量ROBO_EXECUTOR的值,默认backtest use: ${ROBO_EXECUTOR:backtest} # 执行哪个执行器,优先取系统环境变量ROBO_EXECUTOR的值,默认backtest
sync-data: ${SYNC_DATA:off} # 是否开启同步资料数据 sync-data: ${SYNC_DATA:off} # 是否开启同步资料数据
......
import json import json
import logging import logging
from datetime import datetime as dt
from typing import List
from py_jftech import component, autowired, format_date from py_jftech import component, autowired, format_date
from pymysql import IntegrityError, constants from pymysql import IntegrityError, constants
from api import PortfoliosBuilder, PortfoliosRisk, AssetPool, Navs, PortfoliosType, Datum, SolveType, SolverFactory from api import PortfoliosBuilder, PortfoliosRisk, AssetPool, Navs, PortfoliosType, Datum, SolveType, SolverFactory, \
RoboReportor, DatumType
from portfolios.dao import robo_mpt_portfolios as rmp from portfolios.dao import robo_mpt_portfolios as rmp
from portfolios.dao.robo_mpt_portfolios import get_list from portfolios.dao.robo_mpt_portfolios import get_list
...@@ -109,3 +112,30 @@ class PoemPortfoliosBuilder(MptPortfoliosBuilder): ...@@ -109,3 +112,30 @@ class PoemPortfoliosBuilder(MptPortfoliosBuilder):
} }
return result return result
@component(bean_name='signal-report')
class SignalReportor(RoboReportor):
@autowired
def __init__(self, datum: Datum = None):
self._datum = datum
@property
def report_name(self) -> str:
return '调仓信号'
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
result = []
datums = {str(x['id']): x for x in self._datum.get_datums(type=DatumType.FUND, exclude=False)}
for signal in rmp.get_list(max_date=max_date, min_date=min_date):
for fund_id, weight in json.loads(signal['portfolio']).items():
result.append({
'risk': PortfoliosRisk(signal['risk']).name,
'rebalance_date': signal['date'],
'portfolio_type': PortfoliosType(signal['portfolio_type']).name,
'ft_ticker': datums[fund_id]['ftTicker'],
'blooberg_ticker': datums[fund_id]['bloombergTicker'],
'fund_name': datums[fund_id]['chineseName'],
'weight': weight
})
return result
from datetime import datetime as dt, timedelta
from typing import List
import pandas as pd
from py_jftech import component, autowired, prev_workday, filter_weekend, next_workday, get_config
from api import RoboReportor, PortfoliosRisk, RoboExecutor, Navs, Datum, DatumType
from reports.dao import robo_benckmark as rb
@component(bean_name='benckmark-report')
class BenchmarkReportor(RoboReportor):
@autowired(names={'executor': RoboExecutor.use_name()})
def __init__(self, executor: RoboExecutor = None, navs: Navs = None, datum: Datum = None):
self._exec = executor
self._navs = navs
self._datum = datum
self._config = get_config(__name__)
@property
def report_name(self) -> str:
return 'benchmark'
@property
def risks(self):
return self._config['stock-rate'].keys()
@property
def init_amount(self):
return self._config['init-amount']
def stock_rate(self, risk):
return self._config['stock-rate'][risk]
def load_nav_rtn(self, risk, day):
last = rb.get_last_one(risk=risk, max_date=day, re=True)
start_date = next_workday(last['date']) if last else self._exec.start_date
datums = {x['id']: x for x in self._datum.get_datums(type=DatumType.FUND)}
navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=tuple(datums.keys()), min_date=prev_workday(start_date - timedelta(10)), max_date=day))
navs = navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
navs.fillna(method='ffill', inplace=True)
nav_index = navs.shape[1]
for i in range(nav_index):
navs[f'rtn_{navs.columns[i]}'] = navs[navs.columns[i]] / navs[navs.columns[i]].shift() - 1
navs = navs[navs.index >= start_date]
return navs, nav_index
def find_datum_asset(self):
return {x['id']: x['assetType'] for x in self._datum.get_datums(type=DatumType.FUND)}
def build_benchmark(self, risk, day=dt.today()):
nav_rtn, nav_index = self.load_nav_rtn(risk=risk, day=day)
asset_types = {x['id']: x['assetType'] for x in self._datum.get_datums(type=DatumType.FUND)}
last = rb.get_last_one(risk=risk, max_date=day, re=True)
init_amount = last['nav'] if last else self.init_amount
stock_rate = self.stock_rate(risk)
other_rate = 1 - stock_rate
five_rtn = 0
last_day = None
fund_ids = None
for index, row in nav_rtn.iterrows():
if last_day is None or fund_ids is None:
fund_ids = list(row.iloc[:nav_index].dropna().index)
stock_count = len([x for x in fund_ids if asset_types[x] == 'STOCK'])
stock_average = init_amount * stock_rate / stock_count
other_average = init_amount * other_rate / (len(fund_ids) - stock_count)
nav_rtn.loc[index, f'{risk}_result'] = init_amount
nav_rtn.loc[index, f'{risk}_re'] = 1
for fund_id in fund_ids:
if fund_id and asset_types[fund_id] == 'STOCK':
nav_rtn.loc[index, f'stock_{fund_id}'] = stock_average
else:
nav_rtn.loc[index, f'other_{fund_id}'] = other_average
else:
for fund_id in fund_ids:
if fund_id and asset_types[fund_id] == 'STOCK':
nav_rtn.loc[index, f'stock_{fund_id}'] = nav_rtn.loc[last_day, f'stock_{fund_id}'] * (
1 + nav_rtn.loc[index, f'rtn_{fund_id}'])
else:
nav_rtn.loc[index, f'other_{fund_id}'] = nav_rtn.loc[last_day, f'other_{fund_id}'] * (
1 + nav_rtn.loc[index, f'rtn_{fund_id}'])
nav_rtn.loc[index, f'{risk}_result'] = nav_rtn.loc[index][-len(fund_ids):].sum()
nav_rtn.loc[index, f'{risk}_re'] = 0
if five_rtn == 5:
five_rtn = 0
fund_ids = list(row.iloc[:nav_index].dropna().index)
stock_count = len([x for x in fund_ids if asset_types[x] == 'STOCK'])
stock_average = nav_rtn.loc[index, f'{risk}_result'] * stock_rate / stock_count
other_average = nav_rtn.loc[index, f'{risk}_result'] * other_rate / (len(fund_ids) - stock_count)
nav_rtn.loc[index, f'{risk}_re'] = 1
for fund_id in fund_ids:
if fund_id and asset_types[fund_id] == 'STOCK':
nav_rtn.loc[index, f'stock_{fund_id}'] = stock_average
else:
nav_rtn.loc[index, f'other_{fund_id}'] = other_average
five_rtn += 1
last_day = index
result = nav_rtn.reindex(columns=[f'{risk}_result', f'{risk}_re'])
result.reset_index(inplace=True)
result['risk'] = risk
result.rename(columns={f'{risk}_result': 'nav', f'{risk}_re': 're', 'nav_date': 'date'}, inplace=True)
result['nav'] = round(result['nav'], 4)
return result.to_dict('records')
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
for risk in self.risks:
last = rb.get_last_one(max_date=max_date, risk=risk)
if not last or last['date'] < filter_weekend(max_date):
benchmarks = pd.DataFrame(self.build_benchmark(risk=risk, day=max_date))
if last:
benchmarks = benchmarks[benchmarks.date > last['date']]
if not benchmarks.empty:
rb.batch_insert(benchmarks.to_dict('records'))
result = pd.DataFrame(rb.get_list(max_date=max_date, min_date=min_date))
result = result.pivot_table(index='date', columns='risk', values='nav')
result.reset_index(inplace=True)
return result.to_dict('records')
from datetime import datetime as dt
from typing import List
import pandas as pd
from py_jftech import component, autowired
from empyrical import annual_return, annual_volatility, max_drawdown, sharpe_ratio
from api import RoboReportor, Navs
@component(bean_name='combo-report')
class ComboDatasReport(RoboReportor):
@autowired(names={'hold_reportor': 'hold-report', 'benchmark': 'benckmark-report'})
def __init__(self, hold_reportor: RoboReportor = None, benchmark: RoboReportor = None, navs: Navs = None):
self._hold_reportor = hold_reportor
self._benchmark = benchmark
self._navs = navs
@property
def report_name(self) -> str:
return '混合数据'
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
holds = pd.DataFrame(self._hold_reportor.load_report(max_date=max_date, min_date=min_date))
if not holds.empty:
holds['risk'] = holds.apply(lambda row: row.risk, axis=1)
datas = holds.pivot_table(index='date', columns='risk', values='nav')
benchmark = pd.DataFrame(self._benchmark.load_report(max_date=max_date, min_date=min_date))
datas = datas.join(benchmark.set_index('date'))
spx = pd.DataFrame(self._navs.get_index_close(ticker='SPX Index', min_date=min_date, max_date=max_date))
spx = spx.pivot_table(index='date', columns='index_id', values='close')
spx.columns = ['SPX']
datas = datas.join(spx)
datas.fillna(method='ffill', inplace=True)
datas.reset_index(inplace=True)
return datas.to_dict('records')
return []
CREATE TABLE IF NOT EXISTS robo_benchmark
(
rb_id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
rb_date DATETIME NOT NULL COMMENT '日期',
rb_risk VARCHAR(255) NOT NULL COMMENT '风险等级',
rb_nav DOUBLE(16, 4) NOT NULL COMMENT '资产值',
rb_re TINYINT NOT NULL DEFAULT 0 COMMENT '是否再分配',
rb_create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
rb_update_time DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (rb_id),
UNIQUE INDEX (rb_date, rb_risk),
INDEX (rb_risk)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT 'BENCHMARK数据表';
\ No newline at end of file
from py_jftech import read, write, where, mapper_columns, format_date
__COLUMNS__ = {
'rb_id': 'id',
'rb_date': 'date',
'rb_risk': 'risk',
'rb_nav': 'nav',
'rb_re': 're',
}
@write
def batch_insert(datas):
datas = [mapper_columns(x, __COLUMNS__) for x in datas]
values = ','.join([f'''({','.join([(f"'{x[j]}'" if j in x and x[j] is not None else 'null') for j in __COLUMNS__.keys() if j != 'rb_id'])})''' for x in datas])
return f'''insert into robo_benchmark({','.join([x for x in __COLUMNS__.keys() if x != 'rb_id'])}) values {values}'''
@read(one=True)
def get_last_one(max_date=None, risk=None, re: bool = None):
sql = f"rb_date <= '{format_date(max_date)}'" if max_date else None
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_benchmark
{where(sql, rb_risk=risk, rb_re=re)} order by rb_date desc limit 1
'''
@read
def get_list(max_date=None, min_date=None, risk=None, re: bool = None):
sqls = []
if max_date:
sqls.append(f"rb_date <= '{format_date(max_date)}'")
if min_date:
sqls.append(f"rb_date >= '{format_date(min_date)}'")
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_benchmark
{where(*sqls, rb_risk=risk, rb_re=re)} order by rb_risk, rb_date
'''
import os
from datetime import datetime as dt
from typing import List
from abc import abstractmethod, ABCMeta
from tempfile import TemporaryDirectory
from shutil import copyfile
from copy import deepcopy
import pandas as pd
from py_jftech import component, autowired, get_config, get_instance_name, get_project_path, format_date, sendmail
from api import RoboReportor, RoboExportor
def include_report():
return get_config(__name__)['include-report']
class DefaultExportor(RoboExportor):
@autowired
def __init__(self, reportors: List[RoboReportor] = None):
self._reportors = {get_instance_name(x): x for x in reportors}
def export(self, max_date=dt.today(), min_date=None):
if not self.include_report:
return None
with TemporaryDirectory() as tmpdir:
filename = f"{self.file_name}_{format_date(max_date)}.xlsx"
if min_date:
filename = f"{self.file_name}_{format_date(min_date)}_to_{format_date(max_date)}.xlsx"
filepath = os.path.join(tmpdir, filename)
with pd.ExcelWriter(filepath) as writer:
for reportor_name in self.include_report:
reportor = self._reportors[reportor_name]
datas = pd.DataFrame(reportor.load_report(max_date=max_date, min_date=min_date))
if not datas.empty:
datas.to_excel(writer, sheet_name=reportor.report_name, index=False)
email = self.get_email(filepath)
if email:
receives = email['receives']
copies = email['copies'] if 'copies' in email else []
attach_paths = [filepath]
subject = email['subject'].format(today=format_date(dt.today()), max_date=max_date, min_date=min_date)
content = email['content'].format(today=format_date(dt.today()), max_date=max_date, min_date=min_date)
sendmail(receives=receives, copies=copies, attach_paths=attach_paths, subject=subject, content=content)
if self.save_path is not None:
os.makedirs(self.save_path, exist_ok=True)
save_file = os.path.join(self.save_path, filename)
copyfile(filepath, save_file)
def get_email(self, file):
return deepcopy(self.config['email']) if 'email' in self.config else None
@property
def save_path(self):
if 'save-path' not in self.config:
return None
save_path: str = self.config['save-path']
if save_path.startswith('.'):
return os.path.abspath(os.path.join(os.path.dirname(__file__), save_path))
elif save_path.startswith('/'):
return os.path.abspath(save_path)
return os.path.abspath(os.path.join(get_project_path(), save_path))
@property
def exist_build(self):
return self.config['exist-build'] if 'exist-build' in self.config else False
@property
def file_name(self):
return self.config['file-name'] if 'file-name' in self.config else 'export'
@property
def include_report(self):
return self.config['include-report'] if 'include-report' in self.config else []
@property
@abstractmethod
def config(self):
pass
@component(bean_name='backtest-export')
class BacktestExportor(DefaultExportor):
def __init__(self):
super(BacktestExportor, self).__init__()
self.__config = deepcopy(get_config(__name__))
@property
def config(self):
return self.__config['backtest']
@component(bean_name='real-daily-export')
class RealDailyExportor(DefaultExportor):
@autowired(names={'signal_reportor': 'daily-signal-report'})
def __init__(self, signal_reportor: RoboReportor = None):
super(RealDailyExportor, self).__init__()
self.__config = get_config(__name__)
self._signal_reportor = signal_reportor
def get_email(self, file):
result = super(RealDailyExportor, self).get_email(file)
if result is None:
return None
content = pd.read_excel(file, sheet_name=None)
if self._signal_reportor.report_name in content:
result['subject'] = str(result['subject']['rebalance'])
result['content'] = result['content']['rebalance']
else:
result['subject'] = result['subject']['default']
result['content'] = result['content']['rebalance']
return result
@property
def config(self):
return self.__config['real-daily']
from datetime import datetime as dt
from typing import List
import pandas as pd
from py_jftech import component, autowired, get_config, format_date, filter_weekend
from api import RoboReportor
@component(bean_name='fixed-range-report')
class FixedRangeReport(RoboReportor):
@autowired(names={'combo': 'combo-report'})
def __init__(self, combo: RoboReportor = None):
self._combo = combo
self._config = get_config(__name__)
@property
def report_name(self) -> str:
return '固定区间收益率'
@property
def range_dates(self):
return self._config['range-dates']
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
datas = pd.DataFrame(self._combo.load_report(max_date=max_date, min_date=min_date))
datas.set_index('date', inplace=True)
if not datas.empty:
result = pd.DataFrame(columns=datas.columns)
for range in self.range_dates:
start = filter_weekend(range['start'])
end = filter_weekend(range['end'])
if not datas[start:end].empty:
row_name = f"{format_date(start)}~{format_date(end)}"
result.loc[row_name] = datas[start:end].values[-1] / datas[start:end].values[0] - 1
result = round(result, 4) * 100
result.reset_index(inplace=True)
result.rename(columns={'index': 'range-date'}, inplace=True)
return result.to_dict('records')
return []
from datetime import datetime as dt
from typing import List
import pandas as pd
from empyrical import annual_return, annual_volatility, max_drawdown, sharpe_ratio
from py_jftech import component, autowired
from api import RoboReportor
@component(bean_name='indicators-report')
class IndicatorsReportor(RoboReportor):
@autowired(names={'combo': 'combo-report'})
def __init__(self, combo: RoboReportor = None):
self._combo = combo
@property
def report_name(self) -> str:
return '指标'
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
datas = pd.DataFrame(self._combo.load_report(max_date=max_date, min_date=min_date))
datas.set_index('date', inplace=True)
if not datas.empty:
returns = round(datas.pct_change(), 5)
indicators = {
'annual_return': list(annual_return(returns, period='daily', annualization=None) * 100),
'annual_volatility': annual_volatility(returns, period='daily', annualization=None) * 100,
'max_drawdown': max_drawdown(returns, out=None) * 100,
'sharp': sharpe_ratio(returns, risk_free=0, period='daily', annualization=None),
}
indicators['calmar'] = abs(indicators['annual_return'] / indicators['max_drawdown'])
result = pd.DataFrame(indicators.values(), index=indicators.keys(), columns=list(returns.columns)).round(2)
result.reset_index(inplace=True)
result.rename(columns={'index': 'indicators'}, inplace=True)
return result.to_dict('records')
return []
from datetime import datetime as dt
from typing import List
import pandas as pd
from dateutil.relativedelta import relativedelta
from py_jftech import component, autowired, get_config, format_date, filter_weekend
from api import RoboReportor
@component(bean_name='relative-range-report')
class RelativeRangeReport(RoboReportor):
@autowired(names={'combo': 'combo-report'})
def __init__(self, combo: RoboReportor = None):
self._combo = combo
self._config = get_config(__name__)
@property
def report_name(self) -> str:
return '相对区间收益率'
@property
def range_dates(self):
return self._config['range-dates']
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
datas = pd.DataFrame(self._combo.load_report(max_date=max_date, min_date=min_date))
datas.set_index('date', inplace=True)
if not datas.empty:
result = pd.DataFrame(columns=datas.columns)
for range in self.range_dates:
kwargs = range.copy()
del kwargs['name']
start = filter_weekend(max_date - relativedelta(**kwargs))
end = filter_weekend(max_date)
row_name = f"{range['name']}({format_date(start)}~{format_date(end)})"
result.loc[row_name] = datas[start:end].values[-1] / datas[start:end].values[0] - 1
result = round(result, 4) * 100
result.reset_index(inplace=True)
result.rename(columns={'index': 'range-date'}, inplace=True)
return result.to_dict('records')
return []
import unittest
import logging
import tempfile
from datetime import datetime as dt
from py_jftech import autowired, to_str, parse_date, prev_workday
from api import RoboReportor, RoboExportor
logger = logging.getLogger(__name__)
class ReportTest(unittest.TestCase):
@autowired(names={'reportor': 'benckmark-report'})
def test_benchmark_report(self, reportor: RoboReportor = None):
result = reportor.load_report(max_date=parse_date('2022-11-01'))
logger.info(to_str(result))
@autowired(names={'reportor': 'indicators-report'})
def test_indicator_report(self, reportor: RoboReportor = None):
result = reportor.load_report(max_date=parse_date('2022-11-01'))
logger.info(to_str(result))
@autowired(names={'reportor': 'fixed-range-report'})
def test_fixed_report(self, reportor: RoboReportor = None):
result = reportor.load_report(max_date=parse_date('2022-11-01'))
logger.info(to_str(result))
@autowired(names={'reportor': 'relative-range-report'})
def test_relative_report(self, reportor: RoboReportor = None):
result = reportor.load_report(max_date=parse_date('2022-11-01'))
logger.info(to_str(result))
@autowired(names={'exportor': 'backtest-export'})
def test_backtest_export(self, exportor: RoboExportor = None):
exportor.export(max_date=parse_date('2022-11-01'))
@autowired(names={'exportor': 'real-daily-export'})
def test_daliy_export(self, exportor: RoboExportor = None):
exportor.export(max_date=prev_workday(dt.today()))
if __name__ == '__main__':
unittest.main()
import logging import logging
import os
import sys import sys
from concurrent.futures import wait from concurrent.futures import wait
from datetime import datetime as dt from datetime import datetime as dt
...@@ -13,7 +14,7 @@ from py_jftech import ( ...@@ -13,7 +14,7 @@ from py_jftech import (
from api import ( from api import (
RoboExecutor, Datum, AssetPool, PortfoliosBuilder, RoboExecutor, Datum, AssetPool, PortfoliosBuilder,
PortfoliosRisk, PortfoliosHolder, DataSync PortfoliosRisk, PortfoliosHolder, DataSync, RoboExportor
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -37,13 +38,14 @@ class BacktestExecutor(RoboExecutor): ...@@ -37,13 +38,14 @@ class BacktestExecutor(RoboExecutor):
@autowired(names={'optimize': 'dividend'}) @autowired(names={'optimize': 'dividend'})
def __init__(self, datum: Datum = None, pool: AssetPool = None, def __init__(self, datum: Datum = None, pool: AssetPool = None,
syncs: List[DataSync] = None, syncs: List[DataSync] = None, export: RoboExportor = None,
builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None): builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None):
self._datum = datum self._datum = datum
self._pool = pool self._pool = pool
self._builder = builder self._builder = builder
self._hold = hold self._hold = hold
self._syncs = syncs self._syncs = syncs
self._export = export
self._config = get_config(__name__)['backtest'] self._config = get_config(__name__)['backtest']
@staticmethod @staticmethod
...@@ -111,7 +113,7 @@ class BacktestExecutor(RoboExecutor): ...@@ -111,7 +113,7 @@ class BacktestExecutor(RoboExecutor):
BacktestStep.NORMAL_PORTFOLIO): BacktestStep.NORMAL_PORTFOLIO):
logger.info("start to build normal portfolios".center(50, '-')) logger.info("start to build normal portfolios".center(50, '-'))
now = dt.now() now = dt.now()
wait([self.async_build_portfolios(day, risk) for risk in PortfoliosRisk for day in wait([self.async_build_portfolios(day, risk) for risk in PortfoliosRisk for day in
self.get_first_business_day(self.start_date, self.end_date)]) self.get_first_business_day(self.start_date, self.end_date)])
logger.info(f"build normal portfolios success, use[{(dt.now() - now).seconds}s]") logger.info(f"build normal portfolios success, use[{(dt.now() - now).seconds}s]")
if self.start_step.within(BacktestStep.HOLD_PORTFOLIO) and self.end_step.without(BacktestStep.HOLD_PORTFOLIO): if self.start_step.within(BacktestStep.HOLD_PORTFOLIO) and self.end_step.without(BacktestStep.HOLD_PORTFOLIO):
...@@ -119,6 +121,10 @@ class BacktestExecutor(RoboExecutor): ...@@ -119,6 +121,10 @@ class BacktestExecutor(RoboExecutor):
now = dt.now() now = dt.now()
wait([self.async_build_hold(x) for x in PortfoliosRisk]) wait([self.async_build_hold(x) for x in PortfoliosRisk])
logger.info(f"build hold portfolios success, use[{(dt.now() - now).seconds}s]") logger.info(f"build hold portfolios success, use[{(dt.now() - now).seconds}s]")
# logger.info("start to export report".center(50, '-'))
# now = dt.now()
# file = self._export.export(max_date=self.end_date, min_date=self.start_date)
# logger.info(f"report file[{os.path.basename(file)}] exported successfully. use[{(dt.now() - now).seconds}s].")
@asynchronized(isolate=True) @asynchronized(isolate=True)
def async_build_risk_date(self, asset_id): def async_build_risk_date(self, asset_id):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment