Commit 1a14d64c authored by wenwen.tang's avatar wenwen.tang 😕

update

parent 06a587a5
......@@ -297,14 +297,6 @@ class PortfoliosBuilder(ABC):
'''
pass
@abstractmethod
def get_all_portfolios(self, risk: PortfoliosRisk = None):
"""
查询所有优选基金
@param risk:
"""
pass
class Solver(ABC):
'''
......
......@@ -82,6 +82,7 @@ portfolios: # 投组模块
min-interval-days: 10 # 两次实际调仓最小间隔期,单位交易日
dividend-rate: 0.09 #设定年化配息率
dividend-date: 15 #配息日,每月15号
dividend-adjust-day: [1,4,7,10] #每年的首个季度调整配息
warehouse-frequency: 1 #每隔1个月调一次仓
redeem-list: [ 'TEUSAAU LX Equity', 'LIGTRAA ID Equity', 'TEMFHAC LX Equity', 'LUSHUAA ID Equity' ] #从持仓中的低风险资产“直接”按序赎回
solver: # 解算器相关
......@@ -231,17 +232,17 @@ reports: # 报告模块相关
content: "Dear All: 附件是今天生成的监测数据,請驗收,謝謝! 注>:該郵件為自動發送,如有問題請聯繫矽谷團隊 telan_qian@chifufund.com"
robo-executor: # 执行器相关
use: ${ROBO_EXECUTOR:real} # 执行哪个执行器,优先取系统环境变量ROBO_EXECUTOR的值,默认backtest
sync-data: ${SYNC_DATA:on} # 是否开启同步资料数据
sync-data: ${SYNC_DATA:off} # 是否开启同步资料数据
backtest: # 回测执行器相关
start-date: 2022-09-30 # 回测起始日期
end-date: 2023-03-01 # 回测截止日期
end-date: 2023-07-03 # 回测截止日期
sealing-period: 10 #调仓封闭期
start-step: ${BACKTEST_START_STEP:3} # 回测从哪一步开始执行 1:计算资产池;2:计算最优投组:3:计算再平衡信号以及持仓投组
end-step: ${BACKTEST_END_STEP:3} # 回测从哪一步执行完成后结束执行 1:计算资产池;2:计算最优投组:3:计算再平衡信号以及持仓投组
clean-up: true
real: # 实盘执行器
export: ${EXPORT_ENABLE:on} # 是否开启报告
start-date: 2023-01-01 # 实盘开始时间
start-date: 2022-09-01 # 实盘开始时间
include-date: []
......
......@@ -4,13 +4,12 @@ from datetime import datetime as dt, timedelta
from typing import List
import pandas as pd
from py_jftech import component, autowired, format_date, prev_workday, is_workday
from py_jftech import component, autowired, format_date, prev_workday, is_workday, get_config
from pymysql import IntegrityError, constants
from api import PortfoliosBuilder, PortfoliosRisk, AssetPool, Navs, PortfoliosType, Datum, SolveType, SolverFactory, \
RoboReportor, DatumType
from portfolios.dao import robo_mpt_portfolios as rmp
from portfolios.dao.robo_mpt_portfolios import get_list
logger = logging.getLogger(__name__)
......@@ -27,7 +26,18 @@ class MptPortfoliosBuilder(PortfoliosBuilder):
def get_portfolios(self, day, risk: PortfoliosRisk, type: PortfoliosType = PortfoliosType.NORMAL):
try:
portfolio = rmp.get_one(day, type, risk)
# 若记录为空则,将传入日期作为初始日期,进行build
portfolio = rmp.get_last_one(day, type, risk)
if portfolio:
frequency = get_config('portfolios')['holder']['warehouse-frequency']
date = pd.to_datetime(day.replace(day=1)) + pd.DateOffset(months=frequency)
date = date - timedelta(days=1)
# 指定周期末的工作日
date = date if is_workday(date) else prev_workday(date)
if date == day:
portfolio = None
elif portfolio['date'] != day:
return None
if not portfolio:
result = self.build_portfolio(day, type)
for build_risk, datas in result.items():
......@@ -87,9 +97,6 @@ class MptPortfoliosBuilder(PortfoliosBuilder):
def clear(self, day=None, risk: PortfoliosRisk = None):
rmp.delete(min_date=day, risk=risk)
def get_all_portfolios(self, risk: PortfoliosRisk = None):
return get_list(risk=risk)
@component(bean_name='poem')
class PoemPortfoliosBuilder(MptPortfoliosBuilder):
......@@ -146,45 +153,6 @@ class SignalReportor(RoboReportor):
return result
@component(bean_name='daily-hold-report')
class DailyHoldReportor(RoboReportor):
@autowired
def __init__(self, datum: Datum = None):
self._datum = datum
@property
def report_name(self) -> str:
return '每日持仓信息'
def load_report(self, max_date=prev_workday(dt.today()), min_date=None) -> List[dict]:
# 月初调仓,实际相当于调仓信号在上月月末
first_day = max_date.replace(day=1)
prev_month = first_day - timedelta(days=1)
prev_month.replace(day=prev_month.day)
prev_month = prev_month if is_workday(prev_month) else prev_workday(prev_month)
portfolio = rmp.get_one(prev_month, type=PortfoliosType.NORMAL, risk=PortfoliosRisk.FT3)
result = {}
if portfolio:
datum_ids = list(json.loads(portfolio['portfolio']).keys())
datums = pd.DataFrame(self._datum.get_datums(type=DatumType.FUND, datum_ids=datum_ids))
datums.set_index('id', inplace=True)
result['risk'] = [portfolio['risk'] for i in datum_ids]
result['rebalance_type'] = [portfolio['type'] for i in datum_ids]
result['weight'] = [format(i, '.0%') for i in json.loads(portfolio['portfolio']).values()]
result['asset_ids'] = [datums.loc[int(i)]['ftTicker'] for i in datum_ids]
result['name'] = [datums.loc[int(i)]['chineseName'] for i in datum_ids]
result['lipper_id'] = [datums.loc[int(i)]['lipperKey'] for i in datum_ids]
result['date'] = [max_date for i in datum_ids]
result['rebalance_date'] = [portfolio['date'] for i in datum_ids]
result = pd.DataFrame(result)
result = result[
['lipper_id', 'asset_ids', 'name', 'weight', 'risk', 'date', 'rebalance_type', 'rebalance_date']]
return result.to_dict('records')
return []
@component(bean_name='daily-signal-report')
......
......@@ -14,6 +14,7 @@ __COLUMNS__ = {
'rhp_nav': 'nav',
'rhp_fund_av': 'fund_av',
'rhp_fund_div': 'fund_div',
'rhp_div_forecast': 'div_forecast',
'rhp_asset_nav': 'asset_nav',
'rhp_port_div': 'port_div',
'v_nav_div_acc': 'acc_av',
......@@ -40,7 +41,7 @@ def get_one(day, risk: PortfoliosRisk):
@read(one=True)
def get_last_one(risk: PortfoliosRisk = None, max_date=None, rebalance: bool = None, signal_id=None):
sql = "rhp_date <= '{format_date(max_date)}'" if max_date else None
sql = f"rhp_date <= '{format_date(max_date)}'" if max_date else None
return f'''
select {','.join([f'{x[0]} as {x[1]}' for x in __COLUMNS__.items()])} from robo_hold_portfolios
{where(sql, rhp_risk=risk, rhp_rrs_id=signal_id, rhp_rebalance=rebalance)}
......
......@@ -51,3 +51,14 @@ def get_list(max_date=None, min_date=None, type: PortfoliosType = None, risk: Po
{where(*sqls, rmp_risk=risk, rmp_type=type)}
order by rmp_date
'''
@read(one=True)
def get_last_one(date=None, type: PortfoliosType = None, risk: PortfoliosRisk = None):
sqls = []
if date:
sqls.append(f"rmp_date <= '{format_date(date)}'")
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_mpt_portfolios
{where(*sqls, rmp_risk=risk, rmp_type=type)}
order by rmp_date desc limit 1
'''
\ No newline at end of file
import json
import logging
from datetime import datetime as dt, date
from functools import reduce
from typing import List
import pandas as pd
from py_jftech import (
component, autowired, get_config, next_workday, format_date, is_workday
component, autowired, get_config, next_workday, format_date, is_workday, prev_workday, workday_range
)
from api import PortfoliosHolder, PortfoliosRisk, Navs, RoboExecutor, PortfoliosType, PortfoliosBuilder, RoboReportor, \
......@@ -21,13 +22,13 @@ class DividendPortfoliosHolder(PortfoliosHolder):
@autowired(names={'executor': RoboExecutor.use_name()})
def __init__(self, navs: Navs = None, executor: RoboExecutor = None, builder: PortfoliosBuilder = None,
datum: Datum = None):
datum: Datum = None, mpt: PortfoliosBuilder = None):
self._navs = navs
self._executor = executor
self._builder = builder
self._config = get_config(__name__)
self._last_div = None
self._datum = datum
self._mpt = mpt
def get_portfolio_type(self, day, risk: PortfoliosRisk) -> PortfoliosType:
return PortfoliosType.NORMAL
......@@ -49,15 +50,19 @@ class DividendPortfoliosHolder(PortfoliosHolder):
def build_hold_portfolio(self, day, risk: PortfoliosRisk):
last_nav = rhp.get_last_one(max_date=day, risk=risk)
# 从基金优选池选取所有调仓日基金
portfolios = self._builder.get_all_portfolios(risk)
portfoliosMap = {p['date']: p['portfolio'] for p in portfolios}
start = last_nav['date'] if last_nav else list(portfoliosMap.keys())[0]
start = next_workday(last_nav['date']) if last_nav else self._executor.start_date
try:
while start <= day:
logger.info(f'start to get normal portfolio for date[{format_date(start)}]')
portfolios = self._mpt.get_portfolios(day=prev_workday(start), type=PortfoliosType.NORMAL, risk=risk)
logger.info(f"start to build hold portfolio[{risk.name}] for date[{format_date(start)}]")
if start in portfoliosMap.keys():
self.do_rebalance(start, risk, portfoliosMap[start], last_nav)
if portfolios:
last_re_date = self.get_last_rebalance_date(risk=risk, max_date=start)
# 两次实际调仓最小间隔期,单位交易日
if last_re_date and len(workday_range(last_re_date, start)) <= self.interval_days:
self.no_rebalance(start, risk, last_nav)
else:
self.do_rebalance(start, risk, portfolios, last_nav)
else:
self.no_rebalance(start, risk, last_nav)
start = next_workday(start)
......@@ -82,7 +87,7 @@ class DividendPortfoliosHolder(PortfoliosHolder):
if day.month in self._config.get('dividend-adjust-day'):
asset_nav = last_nav['asset_nav']
# 配息率
div_rate = self._last_div * 12 / asset_nav
div_rate = last_nav['div_forecast'] * 12 / asset_nav
# 年配息率减去配息率差值超过基准配息率上下10%触发配息率重置
if self.month_dividend > 0 and abs(
(self._config['dividend-rate'] - div_rate) / self._config['dividend-rate']) > \
......@@ -90,18 +95,18 @@ class DividendPortfoliosHolder(PortfoliosHolder):
# 以本月前一天的单位净值进行配息计算
dividend = last_nav['asset_nav'] * self.month_dividend
else:
dividend = self._last_div
dividend = last_nav['div_forecast']
fund_av = fund_av - dividend
dividend_acc = dividend + dividend_acc
self._last_div = dividend
div_forecast = dividend
else:
# 如果有未配息,则不再配息
if last_nav['dividend'] > 0:
dividend = last_nav['dividend']
else:
dividend = self._last_div
fund_av = fund_av - self._last_div
dividend_acc = self._last_div + dividend_acc
dividend = last_nav['div_forecast']
fund_av = fund_av - last_nav['div_forecast']
dividend_acc = last_nav['div_forecast'] + dividend_acc
asset_nav = fund_av + fund_dividend + dividend
nav = last_nav['nav'] * asset_nav / last_nav['asset_nav']
share = {x: fund_av * w / navs[x] for x, w in weight.items()}
......@@ -110,7 +115,7 @@ class DividendPortfoliosHolder(PortfoliosHolder):
fund_div_tuple = self.get_navs_and_div(fund_ids=tuple(weight), day=day)
navs = fund_div_tuple[0]
dividend = fund_av * self.month_dividend
self._last_div = dividend
div_forecast = dividend
fund_av = fund_av - dividend
dividend_acc = dividend + dividend_acc
nav = self.init_nav
......@@ -125,6 +130,7 @@ class DividendPortfoliosHolder(PortfoliosHolder):
'date': day,
'risk': risk,
'dividend': dividend,
'div_forecast': div_forecast if div_forecast else last_nav['div_forecast'] if last_nav else None,
'fund_div': fund_dividend,
'div_acc': dividend_acc,
'rebalance': True,
......@@ -163,6 +169,7 @@ class DividendPortfoliosHolder(PortfoliosHolder):
'date': day,
'risk': risk,
'dividend': dividend,
'div_forecast': last_nav['div_forecast'],
'fund_div': fund_dividend,
'div_acc': dividend_acc,
'signal_id': last_nav['signal_id'],
......@@ -213,7 +220,7 @@ class DividendPortfoliosHolder(PortfoliosHolder):
class InvTrustPortfoliosHolder(DividendPortfoliosHolder):
def do_rebalance(self, day, risk: PortfoliosRisk, portfolio, last_nav):
weight = {int(x[0]): x[1] for x in json.loads(portfolio).items()}
weight = portfolio
dividend_acc = 0
fund_dividend = 0
if last_nav:
......@@ -229,13 +236,8 @@ class InvTrustPortfoliosHolder(DividendPortfoliosHolder):
asset_nav = fund_av + fund_dividend
nav = last_nav['nav'] * asset_nav / last_nav['asset_nav']
share = {x: fund_av * w / navs[x] for x, w in weight.items()}
# 如果是第一个工作日则进行配息份额记录
if self.is_first_workday(day):
funds = self._datum.get_datums(type=DatumType.FUND, ticker=self._config['redeem-list'])
for fund in funds:
if fund['id'] in share.keys():
self._last_div = asset_nav * self.month_dividend / navs[fund['id']]
break
div_forecast = asset_nav * self.month_dividend
else:
fund_av = self.init_nav
nav = self.init_nav
......@@ -243,7 +245,7 @@ class InvTrustPortfoliosHolder(DividendPortfoliosHolder):
fund_div_tuple = self.get_navs_and_div(fund_ids=tuple(weight), day=day)
navs = fund_div_tuple[0]
# 首次配息金额,做记录
self._last_div = 0
div_forecast = 0
funds = self._datum.get_datums(type=DatumType.FUND)
funds_subscription_rate = {fund['id']: fund.get('subscriptionRate', 0) for fund in funds}
share = {x: (1 - funds_subscription_rate[x]) * (fund_av * w) / navs[x] for x, w in weight.items()}
......@@ -255,6 +257,7 @@ class InvTrustPortfoliosHolder(DividendPortfoliosHolder):
'risk': risk,
'dividend': 0,
'fund_div': fund_dividend,
'div_forecast': div_forecast if div_forecast else last_nav['div_forecast'] if last_nav else None,
'div_acc': dividend_acc,
'rebalance': True,
'portfolios': {
......@@ -281,15 +284,21 @@ class InvTrustPortfoliosHolder(DividendPortfoliosHolder):
navs = fund_div_tuple[0]
fund_dividend = fund_div_tuple[1]
# 配息日当天取得调仓日计算的应调仓金额,做实际份额赎回,这里的金额(即月初计算的赎回金额)用于转换成“赎回目标的份额”
if self.is_dividend_date(day) and self._last_div > 0:
need_div = last_nav['div_forecast']
if self.is_dividend_date(day) and need_div > 0:
funds = self._datum.get_datums(type=DatumType.FUND, ticker=self._config['redeem-list'])
# 获取需要配息的金额
for fund in funds:
if fund['id'] in share.keys():
{}.update()
share[fund['id']] = share[fund['id']] - self._last_div
port_div = self._last_div * navs[fund['id']]
dividend_acc = dividend_acc + port_div
break
# 按配息金额依次扣除对应基金份额
if share[fund['id']] * navs[fund['id']] <= need_div:
share[fund['id']] = 0
need_div = need_div - share[fund['id']] * navs[fund['id']]
else:
share[fund['id']] = (share[fund['id']] * navs[fund['id']] - need_div) / navs[fund['id']]
break
port_div = last_nav['div_forecast']
dividend_acc = dividend_acc + port_div
fund_av = round(sum([navs[x] * y for x, y in share.items()]), 4)
weight = {x: round(y * navs[x] / fund_av, 2) for x, y in share.items()}
weight = format_weight(weight)
......@@ -297,18 +306,15 @@ class InvTrustPortfoliosHolder(DividendPortfoliosHolder):
map(lambda k: share[k] * fund_dividend[k], filter(lambda k: k in fund_dividend, share.keys())))
asset_nav = fund_av + fund_dividend
nav = last_nav['nav'] * asset_nav / last_nav['asset_nav']
# 如果是第一个工作日则进行配息份额记录
div_forecast = last_nav['div_forecast']
if self.is_first_workday(day):
funds = self._datum.get_datums(type=DatumType.FUND, ticker=self._config['redeem-list'])
for fund in funds:
if fund['id'] in share.keys():
self._last_div = asset_nav * self.month_dividend / navs[fund['id']]
break
div_forecast = asset_nav * self.month_dividend
rhp.insert({
'date': day,
'risk': risk,
'dividend': 0,
'fund_div': fund_dividend,
'div_forecast': div_forecast,
'div_acc': dividend_acc,
'signal_id': last_nav['signal_id'],
'rebalance': False,
......@@ -340,3 +346,39 @@ class DivHoldReportor(RoboReportor):
['date', 'signal_type', 'fund_av', 'fund_div', 'cash', 'real_av', 'port_div', 'acc_av', 'nav']]
return holds.to_dict('records')
return []
@component(bean_name='daily-hold-report')
class DailyHoldReportor(RoboReportor):
@autowired
def __init__(self, datum: Datum = None):
self._datum = datum
@property
def report_name(self) -> str:
return '每日持仓信息'
def load_report(self, max_date=prev_workday(dt.today()), min_date=None) -> List[dict]:
holds = pd.DataFrame(rhp.get_list(max_date=max_date, min_date=min_date))
holds = holds[holds['date'].dt.date == max_date.date()]
if not holds.empty:
portfolio = rhp.get_last_one(max_date=max_date, rebalance=True)
datum_ids = reduce(lambda x, y: x | y,
holds['portfolios'].apply(lambda x: set(json.loads(x)['weight'].keys())))
datums = pd.DataFrame(self._datum.get_datums(type=DatumType.FUND, datum_ids=datum_ids))
datums.set_index('id', inplace=True)
holds['rebalance_type'] = holds.apply(lambda row: PortfoliosType.NORMAL.name, axis=1)
holds['rebalance_date'] = holds.apply(lambda row: prev_workday(portfolio['date']), axis=1)
holds['risk'] = holds.apply(lambda row: PortfoliosRisk(row['risk']).name, axis=1)
holds['portfolios'] = holds.apply(lambda row: [x for x in json.loads(row['portfolios'])['weight'].items()],
axis=1)
holds = holds.explode('portfolios', ignore_index=True)
holds['weight'] = holds.apply(lambda row: format(row['portfolios'][1], '.0%'), axis=1)
holds['asset_ids'] = holds.apply(lambda row: datums.loc[int(row['portfolios'][0])]['ftTicker'], axis=1)
holds['name'] = holds.apply(lambda row: datums.loc[int(row['portfolios'][0])]['chineseName'], axis=1)
holds['lipper_id'] = holds.apply(lambda row: datums.loc[int(row['portfolios'][0])]['lipperKey'], axis=1)
holds = holds[['lipper_id', 'asset_ids', 'name', 'weight', 'risk', 'date', 'rebalance_type', 'rebalance_date']]
return holds.to_dict('records')
return []
......@@ -20,3 +20,5 @@ requests==2.28.1
scipy==1.9.3
six==1.16.0
urllib3==1.26.12
fastapi==0.100.0
uvicorn==0.23.1
\ No newline at end of file
import logging
import sys
from concurrent.futures import wait
from datetime import datetime as dt, timedelta
from datetime import datetime as dt
from typing import List
import pandas as pd
......@@ -34,19 +34,17 @@ class BacktestExecutor(RoboExecutor):
self._config = get_config(__name__)['backtest']
@staticmethod
def get_first_business_day(start_date, end_date):
def get_last_business_day(start_date, end_date):
# 生成日期范围并转换为DataFrame
dates = pd.date_range(start_date, end_date, freq='MS')
dates = dates.insert(0, start_date)
df = pd.DataFrame({'dates': dates})
# 提取每个月的第一个工作日
df['first_business_day'] = df['dates'].apply(
lambda x: pd.date_range(start=x, end=x + pd.offsets.MonthEnd(0), freq='B')[0]
df['last_business_day'] = df['dates'].map(
lambda date: pd.date_range(start=date, periods=1, freq='BM')[-1]
)
# 每隔n个月提取第一个工作日
result = []
for i in range(0, len(df), get_config('portfolios')['holder']['warehouse-frequency']):
result.append(df.iloc[i]['first_business_day'])
result.append(df.iloc[i]['last_business_day'])
delta = workday_range(result[0], result[1])
period = get_config(__name__)['backtest']['sealing-period']
if len(delta) <= period:
......@@ -98,7 +96,7 @@ class BacktestExecutor(RoboExecutor):
if self.start_step.within(BacktestStep.ASSET_POOL) and self.end_step.without(BacktestStep.ASSET_POOL):
logger.info("start to build asset pool".center(50, '-'))
now = dt.now()
workdays = self.get_first_business_day(self.start_date, self.end_date)
workdays = self.get_last_business_day(self.start_date, self.end_date)
for date in workdays:
self._pool.get_pool(date)
logger.info(f"build asset pool success, use[{(dt.now() - now).seconds}s]")
......@@ -107,7 +105,7 @@ class BacktestExecutor(RoboExecutor):
logger.info("start to build normal portfolios".center(50, '-'))
now = dt.now()
wait([self.async_build_portfolios(day, risk) for risk in PortfoliosRisk for day in
self.get_first_business_day(self.start_date, self.end_date)])
self.get_last_business_day(self.start_date, self.end_date)])
logger.info(f"build normal portfolios success, use[{(dt.now() - now).seconds}s]")
if self.start_step.within(BacktestStep.HOLD_PORTFOLIO) and self.end_step.without(BacktestStep.HOLD_PORTFOLIO):
logger.info("start to build hold portfolios".center(50, '-'))
......@@ -181,18 +179,8 @@ class RealExecutor(RoboExecutor):
for risk in PortfoliosRisk:
logger.info(f"start to build risk[{risk.name}] real for date[{format_date(date)}]".center(50, '-'))
now = dt.now()
first_day = date.replace(day=1)
prev_month = first_day - timedelta(days=1)
prev_month.replace(day=prev_month.day)
prev_month = prev_month if is_workday(prev_month) else prev_workday(prev_month)
self._pool.get_pool(prev_month)
self._builder.get_portfolios(prev_month, risk)
next_month = date.replace(day=28) + timedelta(days=4)
prev_month = next_month.replace(day=1) - timedelta(days=1)
prev_month = prev_month if is_workday(prev_month) else prev_workday(prev_month)
if date.day == prev_month.day:
self._pool.get_pool(date)
self._builder.get_portfolios(date, risk)
# 更新持仓
self._hold.build_hold_portfolio(date, risk)
logger.info(
f"build risk[{risk.name}] real for date[{format_date(date)}] success, use[{(dt.now() - now).seconds}s]")
if self.export:
......
import uvicorn
from fastapi import FastAPI
app = FastAPI()
REC_GID = 'E3886FBA-123B-7890-123E-123456BEEED'
@app.get("/recommend")
async def root():
rec_list = []
portfolios = {'recomm_guid': REC_GID}
data = {'recomm_guid': REC_GID}
data['data_date'] = '2019-09-20'
data['funds'] = [
{'weight': '5', 'fund_id': '0152'}
]
portfolios['data'] = data
rec_list.append(portfolios)
return rec_list
if __name__ == "__main__":
uvicorn.run("robo_controller:app", reload=True, port=8080)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment