Commit 65c7bb8d authored by jichao's avatar jichao

重构导出模块

添加邮件发送
parent cb6e2cc7
......@@ -557,6 +557,15 @@ class RebalanceRuler(ABC):
'''
pass
@abstractmethod
def get_signal_date(self, sign_id):
'''
获取指定id的信号日期
:param sign_id: 信号id, 可以多个,使用元祖包裹
:return: 信号日期
'''
pass
@abstractmethod
def clear_signal(self, day=None, risk: PortfoliosRisk = None):
'''
......@@ -625,9 +634,8 @@ class RoboExportor(ABC):
@abstractmethod
def export(self, max_date=dt.today(), min_date=None):
'''
导出指定日期的数据到excel
根据参数以及配置信息执行导出相关操作
:param max_date: 指定截止日期
:param min_date: 指定开始日期
:return: 导出文件路径
'''
pass
......@@ -55,7 +55,7 @@ class DefaultDatum(Datum):
datum_ids = tuple(set(datum_ids or []) | {x['id'] for x in datums})
result = rbd.get_base_datums(type=type, crncy=crncy, risk=risk, datum_ids=datum_ids)
result = [{**json.loads(x['datas']), 'id': x['id']} for x in result]
return [self.format_datum(x) for x in result if not exclude or x['bloombergTicker'] not in self.excludes]
return [self.format_datum(x) for x in result if not exclude or x['id'] in (datum_ids or []) or x['bloombergTicker'] not in self.excludes]
def get_high_risk_datums(self, risk: PortfoliosRisk):
risk3 = self.get_datums(type=DatumType.FUND, risk=3)
......
......@@ -227,13 +227,13 @@ reports: # 报告模块相关
name: '五年'
- years: 10
name: '十年'
exports:
backtest: # 回测导出曹策略
exist-build: on # 如果报告文件存在,是否重新构建文件
save-path: ${EXPORT_PATH:excels} # 导出报告文件存放路径,如果以./或者../开头,则会以执行python文件为根目录,如果以/开头,则为系统绝对路径,否则,以项目目录为根目录
file-name: ${EXPORT_FILENAME:real}
include-report: # 需要导出的报告类型列表,下面的顺序,也代表了excel中sheet的顺序
# - funds-report # 基金资料
# - navs-report # 净值报告
# - funds-report # 基金资料
# - navs-report # 净值报告
- hold-report # 持仓报告
- signal-report # 信号报告
- asset-pool-report # 基金池报告
......@@ -242,6 +242,27 @@ reports: # 报告模块相关
- indicators-report # 各种特殊指标报告
- fixed-range-report # 固定区间收益报告
- relative-range-report # 相对区间收益报告
real-daily:
file-name: ${EXPORT_FILENAME:svrobo3_portfolios}
include-report:
- daily-hold-report
- daily-signal-report
email:
receives:
- jichao@thizgroup.com
# copies:
# - Tony.Wu.Home@gmail.com
# - jinghan.yang@chifuinvestments.com
# - will.xu@thizgroup.com
# - brody_wu@chifufund.com
# - telan_qian@chifufund.com
# - tina.yang@thizgroup.com
subject:
default: "ROBO_TAIBEI-实盘版-每日投組推薦_{today}"
rebalance: "ROBO_TAIBEI-实盘版-每日投組推薦_{today}_今日有調倉信號!!!"
content:
default: "Dear All: 附件是今天生成的推薦組合,請驗收,謝謝! 注>:該郵件為自動發送,如有問題請聯繫矽谷團隊 telan_qian@chifufund.com"
rebalance: "Dear All: 附件是今天生成的推薦組合以及調倉信號,請驗收,謝謝! 注>:該郵件為自動發送,如有問題請聯繫矽谷團隊 telan_qian@chifufund.com"
robo-executor: # 执行器相关
use: ${ROBO_EXECUTOR:real} # 执行哪个执行器,优先取系统环境变量ROBO_EXECUTOR的值,默认backtest
sync-data: ${SYNC_DATA:on} # 是否开启同步资料数据
......
......@@ -123,7 +123,7 @@ class MptReportor(RoboReportor):
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
results = []
datums = {x['id']: x for x in self._datum.get_datums(type=DatumType.FUND)}
datums = {x['id']: x for x in self._datum.get_datums(type=DatumType.FUND, exclude=False)}
for portfolio in rmp.get_list(max_date=max_date, min_date=min_date):
solve_type = SolveType(portfolio['solve'])
datas = {
......
......@@ -2,13 +2,14 @@ import json
import logging
from datetime import datetime as dt
from typing import List
from functools import reduce
import pandas as pd
from py_jftech import (
component, autowired, get_config, next_workday, prev_workday, transaction, workday_range, format_date
)
from api import PortfoliosHolder, PortfoliosRisk, RebalanceRuler, Navs, SignalType, RoboExecutor, PortfoliosType, RoboReportor
from api import PortfoliosHolder, PortfoliosRisk, RebalanceRuler, Navs, SignalType, RoboExecutor, PortfoliosType, RoboReportor, Datum, DatumType
from portfolios.dao import robo_hold_portfolios as rhp
from portfolios.utils import format_weight
......@@ -155,3 +156,40 @@ class HoldReportor(RoboReportor):
holds = holds[['risk', 'date', 'nav', 'signal_type']]
return holds.to_dict('records')
return []
@component(bean_name='daily-hold-report')
class DailyHoldReportor(RoboReportor):
@autowired
def __init__(self, rule: RebalanceRuler = None, datum: Datum = None):
self._rule = rule
self._datum = datum
@property
def report_name(self) -> str:
return '每日持仓信息'
def load_report(self, max_date=prev_workday(dt.today()), min_date=None) -> List[dict]:
holds = pd.DataFrame(rhp.get_list(max_date=max_date, min_date=min_date))
holds = holds[holds['date'].dt.date == max_date.date()]
if not holds.empty:
signal_types = self._rule.get_signal_type(tuple(set(holds['signal_id'])))
signal_dates = self._rule.get_signal_date(tuple(set(holds['signal_id'])))
datum_ids = reduce(lambda x, y: x | y, holds['portfolios'].apply(lambda x: set(json.loads(x)['weight'].keys())))
datums = pd.DataFrame(self._datum.get_datums(type=DatumType.FUND, datum_ids=datum_ids))
datums.set_index('id', inplace=True)
holds['rebalance_type'] = holds.apply(lambda row: signal_types[row['signal_id']].name, axis=1)
holds['rebalance_date'] = holds.apply(lambda row: signal_dates[row['signal_id']], axis=1)
holds['risk'] = holds.apply(lambda row: PortfoliosRisk(row['risk']).name, axis=1)
holds['portfolios'] = holds.apply(lambda row: [x for x in json.loads(row['portfolios'])['weight'].items()], axis=1)
holds = holds.explode('portfolios', ignore_index=True)
holds['weight'] = holds.apply(lambda row: row['portfolios'][1], axis=1)
holds['asset_ids'] = holds.apply(lambda row: datums.loc[int(row['portfolios'][0])]['ftTicker'], axis=1)
holds['name'] = holds.apply(lambda row: datums.loc[int(row['portfolios'][0])]['chineseName'], axis=1)
holds['lipper_id'] = holds.apply(lambda row: datums.loc[int(row['portfolios'][0])]['lipperKey'], axis=1)
holds = holds[['risk', 'date', 'asset_ids', 'weight', 'rebalance_type', 'rebalance_date', 'name', 'lipper_id']]
return holds.to_dict('records')
return []
......@@ -40,6 +40,11 @@ class PortfoliosTest(unittest.TestCase):
def test_clear(self, hold: PortfoliosHolder = None):
hold.clear()
@autowired(names={'reportor': 'daily-hold-report'})
def test_daily_hold_report(self, reportor: RoboReportor = None):
report = reportor.load_report()
self.logger.info(to_str(report))
if __name__ == '__main__':
unittest.main()
import json
from datetime import datetime as dt
from typing import List, Dict
from functools import reduce
from py_jftech import component, autowired, get_config, workday_range, next_workday, to_tuple
import pandas as pd
from py_jftech import component, autowired, get_config, workday_range, next_workday, to_tuple, prev_workday
from api import RebalanceRuler, PortfoliosRisk, RebalanceSignal, SignalType, PortfoliosType, PortfoliosHolder, RoboReportor, Datum, DatumType
from rebalance.dao import robo_rebalance_signal as rrs
......@@ -93,6 +95,14 @@ class LevelRebalanceRuler(RebalanceRuler):
signal = rrs.get_by_id(sign_id[0])
return SignalType(signal['type']) if signal else None
def get_signal_date(self, sign_id):
sign_id = to_tuple(sign_id)
if len(sign_id) > 1:
return {x['id']: x['date'] for x in rrs.get_by_ids(sign_id)}
else:
signal = rrs.get_by_id(sign_id[0])
return signal['date'] if signal else None
def commit_signal(self, sign_id):
rrs.update(sign_id, {'effective': True})
......@@ -101,7 +111,7 @@ class LevelRebalanceRuler(RebalanceRuler):
@component(bean_name='signal-report')
class SignalExportor(RoboReportor):
class SignalReportor(RoboReportor):
@autowired
def __init__(self, hold: PortfoliosHolder = None, datum: Datum = None):
......@@ -114,7 +124,7 @@ class SignalExportor(RoboReportor):
def load_report(self, max_date=dt.today(), min_date=None) -> List[dict]:
result = []
datums = {str(x['id']): x for x in self._datum.get_datums(type=DatumType.FUND)}
datums = {str(x['id']): x for x in self._datum.get_datums(type=DatumType.FUND, exclude=False)}
for signal in rrs.get_list(max_date=max_date, min_date=min_date, effective=True):
rebalance_date = self._hold.get_rebalance_date_by_signal(signal['id'])
for fund_id, weight in json.loads(signal['portfolio']).items():
......@@ -130,3 +140,40 @@ class SignalExportor(RoboReportor):
'weight': weight
})
return result
@component(bean_name='daily-signal-report')
class DailySignalReportor(RoboReportor):
@autowired
def __init__(self, hold: PortfoliosHolder = None, datum: Datum = None):
self._hold = hold
self._datum = datum
@property
def report_name(self) -> str:
return '每日调仓信号'
def load_report(self, max_date=prev_workday(dt.today()), min_date=None) -> List[dict]:
signals = pd.DataFrame(rrs.get_list(max_date=max_date, min_date=min_date))
signals = signals[(signals['date'].dt.date == max_date.date()) & (signals['type'] != SignalType.NONE.value)]
if not signals.empty:
datum_ids = reduce(lambda x, y: x | y, signals['portfolio'].apply(lambda x: set(json.loads(x).keys())))
datums = pd.DataFrame(self._datum.get_datums(type=DatumType.FUND, datum_ids=datum_ids))
datums.set_index('id', inplace=True)
signals['risk'] = signals.apply(lambda row: PortfoliosRisk(row['risk']).name, axis=1)
signals['rebalance_type'] = signals.apply(lambda row: SignalType(row['type']).name, axis=1)
signals['portfolio_type'] = signals.apply(lambda row: PortfoliosType(row['portfolio_type']).name, axis=1)
signals['portfolio'] = signals.apply(lambda row: [x for x in json.loads(row['portfolio']).items()], axis=1)
signals = signals.explode('portfolio', ignore_index=True)
signals['weight'] = signals.apply(lambda row: row['portfolio'][1], axis=1)
signals['asset_ids'] = signals.apply(lambda row: datums.loc[int(row['portfolio'][0])]['ftTicker'], axis=1)
signals['name'] = signals.apply(lambda row: datums.loc[int(row['portfolio'][0])]['chineseName'], axis=1)
signals['lipper_id'] = signals.apply(lambda row: datums.loc[int(row['portfolio'][0])]['lipperKey'], axis=1)
signals = signals[['risk', 'date', 'rebalance_type', 'asset_ids', 'lipper_id', 'name', 'weight']]
return signals.to_dict('records')
return []
......@@ -54,6 +54,11 @@ class RebalanceTest(unittest.TestCase):
result = reportor.load_report()
logger.info(to_str(result, show_line=10))
@autowired(names={'reportor': 'daily-signal-report'})
def test_daily_signal_report(self, reportor: RoboReportor = None):
result = reportor.load_report(max_date=parse_date('2022-11-21'))
logger.info(to_str(result, show_line=10))
@autowired
def test_clear_signal(self, ruler: RebalanceRuler = None):
ruler.clear_signal()
......
import os
from datetime import datetime as dt
from typing import List
import pandas as pd
from py_jftech import component, autowired, get_config, get_instance_name, get_project_path, format_date
from api import RoboReportor, RoboExportor
def include_report():
return get_config(__name__)['include-report']
@component(bean_name='backtest-export')
class BacktestExportor(RoboExportor):
@autowired(includes={'reportors': include_report()})
def __init__(self, reportors: List[RoboReportor] = None):
reportors = {get_instance_name(x): x for x in reportors}
self._reportors: List[RoboReportor] = [reportors[x] for x in include_report()]
self._config = get_config(__name__)
@property
def save_path(self):
save_path: str = self._config['save-path']
if save_path.startswith('.'):
return os.path.abspath(os.path.join(os.path.dirname(__file__), save_path))
elif save_path.startswith('/'):
return os.path.abspath(save_path)
return os.path.abspath(os.path.join(get_project_path(), save_path))
@property
def exist_build(self):
return self._config['exist-build']
@property
def file_name(self):
return self._config['file-name'] if 'file-name' in self._config else 'backtest'
def export(self, max_date=dt.today(), min_date=None):
root = self.save_path
os.makedirs(root, exist_ok=True)
filename = f"{self.file_name}_{format_date(max_date)}.xlsx"
if min_date:
filename = f"{self.file_name}_{format_date(min_date)}_to_{format_date(max_date)}.xlsx"
file = os.path.join(root, filename)
if os.path.exists(file):
if not self.exist_build:
return file
os.remove(file)
with pd.ExcelWriter(file) as writer:
for reportor in self._reportors:
datas = pd.DataFrame(reportor.load_report(max_date=max_date, min_date=min_date))
if not datas.empty:
datas.to_excel(writer, sheet_name=reportor.report_name, index=False)
return file
......@@ -35,7 +35,7 @@ class BenchmarkReportor(RoboReportor):
def load_nav_rtn(self, risk, day):
last = rb.get_last_one(risk=risk, max_date=day, re=True)
start_date = last['date'] if last else next_workday(self._exec.start_date)
start_date = next_workday(last['date']) if last else self._exec.start_date
datums = {x['id']: x for x in self._datum.get_datums(type=DatumType.FUND)}
navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=tuple(datums.keys()), min_date=prev_workday(start_date - timedelta(10)), max_date=day))
......
import os
from datetime import datetime as dt
from typing import List
from abc import abstractmethod, ABCMeta
from tempfile import TemporaryDirectory
from shutil import copyfile
from copy import deepcopy
import pandas as pd
from py_jftech import component, autowired, get_config, get_instance_name, get_project_path, format_date, sendmail
from api import RoboReportor, RoboExportor
def include_report():
return get_config(__name__)['include-report']
class DefaultExportor(RoboExportor):
@autowired
def __init__(self, reportors: List[RoboReportor] = None):
self._reportors = {get_instance_name(x): x for x in reportors}
def export(self, max_date=dt.today(), min_date=None):
if not self.include_report:
return None
with TemporaryDirectory() as tmpdir:
filename = f"{self.file_name}_{format_date(max_date)}.xlsx"
if min_date:
filename = f"{self.file_name}_{format_date(min_date)}_to_{format_date(max_date)}.xlsx"
filepath = os.path.join(tmpdir, filename)
with pd.ExcelWriter(filepath) as writer:
for reportor_name in self.include_report:
reportor = self._reportors[reportor_name]
datas = pd.DataFrame(reportor.load_report(max_date=max_date, min_date=min_date))
if not datas.empty:
datas.to_excel(writer, sheet_name=reportor.report_name, index=False)
email = self.get_email(filepath)
if email:
receives = email['receives']
copies = email['copies'] if 'copies' in email else []
attach_paths = [filepath]
subject = email['subject'].format(today=format_date(dt.today()), max_date=max_date, min_date=min_date)
content = email['content'].format(today=format_date(dt.today()), max_date=max_date, min_date=min_date)
sendmail(receives=receives, copies=copies, attach_paths=attach_paths, subject=subject, content=content)
if self.save_path is not None:
os.makedirs(self.save_path, exist_ok=True)
save_file = os.path.join(self.save_path, filename)
copyfile(filepath, save_file)
def get_email(self, file):
return deepcopy(self.config['email']) if 'email' in self.config else None
@property
def save_path(self):
if 'save-path' not in self.config:
return None
save_path: str = self.config['save-path']
if save_path.startswith('.'):
return os.path.abspath(os.path.join(os.path.dirname(__file__), save_path))
elif save_path.startswith('/'):
return os.path.abspath(save_path)
return os.path.abspath(os.path.join(get_project_path(), save_path))
@property
def exist_build(self):
return self.config['exist-build'] if 'exist-build' in self.config else False
@property
def file_name(self):
return self.config['file-name'] if 'file-name' in self.config else 'export'
@property
def include_report(self):
return self.config['include-report'] if 'include-report' in self.config else []
@property
@abstractmethod
def config(self):
pass
@component(bean_name='backtest-export')
class BacktestExportor(DefaultExportor):
def __init__(self):
super(BacktestExportor, self).__init__()
self.__config = deepcopy(get_config(__name__))
@property
def config(self):
return self.__config['backtest']
@component(bean_name='real-daily-export')
class RealDailyExportor(DefaultExportor):
@autowired(names={'signal_reportor': 'daily-signal-report'})
def __init__(self, signal_reportor: RoboReportor = None):
super(RealDailyExportor, self).__init__()
self.__config = get_config(__name__)
self._signal_reportor = signal_reportor
def get_email(self, file):
result = super(RealDailyExportor, self).get_email(file)
if result is None:
return None
content = pd.read_excel(file, sheet_name=None)
if self._signal_reportor.report_name in content:
result['subject'] = str(result['subject']['rebalance'])
result['content'] = result['content']['rebalance']
else:
result['subject'] = result['subject']['default']
result['content'] = result['content']['rebalance']
return result
@property
def config(self):
return self.__config['real-daily']
......@@ -31,6 +31,7 @@ class FixedRangeReport(RoboReportor):
for range in self.range_dates:
start = filter_weekend(range['start'])
end = filter_weekend(range['end'])
if not datas[start:end].empty:
row_name = f"{format_date(start)}~{format_date(end)}"
result.loc[row_name] = datas[start:end].values[-1] / datas[start:end].values[0] - 1
result = round(result, 4) * 100
......
import unittest
import logging
import tempfile
from datetime import datetime as dt
from py_jftech import autowired, to_str, parse_date
from py_jftech import autowired, to_str, parse_date, prev_workday
from api import RoboReportor, RoboExportor
logger = logging.getLogger(__name__)
......@@ -29,10 +31,13 @@ class ReportTest(unittest.TestCase):
result = reportor.load_report(max_date=parse_date('2022-11-01'))
logger.info(to_str(result))
@autowired(names={'reportor': 'backtest-export'})
@autowired(names={'exportor': 'backtest-export'})
def test_backtest_export(self, exportor: RoboExportor = None):
path = exportor.export(max_date=parse_date('2022-11-01'))
logger.info(path)
exportor.export(max_date=parse_date('2022-11-01'))
@autowired(names={'exportor': 'real-daily-export'})
def test_daliy_export(self, exportor: RoboExportor = None):
exportor.export(max_date=prev_workday(dt.today()))
if __name__ == '__main__':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment