Commit fc57ab42 authored by jichao's avatar jichao

添加同步数据功能

parent 0857edb9
......@@ -9,6 +9,7 @@ from py_jftech import get_config
class DatumType(Enum):
FUND = 'FUND'
INDEX = 'INDEX'
ECO = 'ECO'
@unique
......@@ -76,11 +77,28 @@ SignalType.DRIFT_BUY.p_type = PortfoliosType.NORMAL
SignalType.INIT.p_type = PortfoliosType.RIGHT_SIDE
class DataSync(ABC):
'''
数据同步服务,需要同步数据的服务,可以实现该接口
'''
@abstractmethod
def do_sync(self, max_date=dt.today()):
'''
开始同步数据,到指定日期,如果没给则同步到当前日期
'''
pass
class Datum(ABC):
'''
基础资料服务,基金资料数据,各种指数,指标资料数据
'''
@abstractmethod
def get_datums(self, type: DatumType = None, crncy=None, risk=None, datum_ids=None, ticker=None):
pass
@abstractmethod
def get_fund_datums(self, crncy=None, risk=None, fund_ids=None):
'''
......@@ -161,6 +179,14 @@ class Navs(ABC):
'''
pass
@abstractmethod
def get_eco_values(self, datum_ids=None, min_date=None, max_date=None, ticker=None):
pass
@abstractmethod
def get_last_eco_values(self, max_date, datum_id=None, ticker=None, count=1):
pass
class AssetOptimize(ABC):
'''
......
......@@ -7,7 +7,7 @@ from dateutil.relativedelta import relativedelta
from empyrical import sortino_ratio
from py_jftech import filter_weekend, dict_remove, get_config, component, autowired, get_quarter_start, next_workday, is_workday
from api import AssetOptimize, Navs, Datum, AssetPoolType
from api import AssetOptimize, Navs, Datum, AssetPoolType, DatumType
from asset_pool.dao import robo_assets_pool as rop
......@@ -109,7 +109,7 @@ class FundSortinoAssetOptimize(SortinoAssetOptimize):
return len([x for x in self.nav_min_dates.items() if start_date <= x[1] <= end_date]) > 0
def get_groups(self):
funds = pd.DataFrame(self._datum.get_fund_datums())
funds = pd.DataFrame(self._datum.get_datums(type=DatumType.FUND))
min_dates = self._navs.get_nav_start_date()
result = []
for (category, asset_type), fund_group in funds.groupby(by=['category', 'assetType']):
......
......@@ -7,7 +7,7 @@ from dateutil.relativedelta import relativedelta
from py_jftech import component, autowired, get_config, format_date, transaction, asynchronized
from scipy.stats import norm
from api import AssetRisk, Navs, AssetRiskDateType as DateType, Datum, AssetPoolType, RoboExecutor
from api import AssetRisk, Navs, AssetRiskDateType as DateType, Datum, AssetPoolType, RoboExecutor, DatumType
from asset_pool.dao import asset_risk_dates as ard, asset_ewma_value as aev, robo_assets_pool as rap
logger = logging.getLogger(__name__)
......@@ -34,7 +34,7 @@ class CvarEwmaAssetRisk(AssetRisk):
def get_risk_pool(self, day):
asset_pool = rap.get_one(day, AssetPoolType.RISK)
if not asset_pool:
result = {x['id']: self.async_is_risk(x['id'], day) for x in self._datum.get_fund_datums(risk=(3, 4, 5))}
result = {x['id']: self.async_is_risk(x['id'], day) for x in self._datum.get_datums(type=DatumType.FUND, risk=(3, 4, 5))}
risk_ids = [x[0] for x in result.items() if x[1].result()]
rap.insert(day, AssetPoolType.RISK, risk_ids)
asset_pool = rap.get_one(day, AssetPoolType.RISK)
......
......@@ -73,3 +73,17 @@ CREATE TABLE IF NOT EXISTS `robo_index_datas`
INDEX (`rid_date`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT '指标数据表';
CREATE TABLE IF NOT EXISTS `robo_eco_datas`
(
`red_eco_id` BIGINT UNSIGNED NOT NULL COMMENT '指标id',
`red_date` DATETIME NOT NULL COMMENT '指标数据日期',
`red_indicator` DOUBLE NOT NULL COMMENT '指标值',
`red_release_date` DATETIME DEFAULT NULL COMMENT '公告日期',
`red_create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`red_update_time` DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`red_eco_id`, `red_date`),
INDEX (`red_date`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT '经济指标数据表';
from py_jftech import read, write, where, mapper_columns, format_date
__COLUMNS__ = {
'red_eco_id': 'eco_id',
'red_date': 'date',
'red_indicator': 'indicator',
'red_release_date': 'release_date',
}
@write
def batch_insert(datas):
datas = [mapper_columns(datas=x, columns=__COLUMNS__, ignore_none=False) for x in datas]
values = ','.join([f'''({','.join([(f"'{x[j]}'" if j in x and x[j] is not None else 'null') for j in __COLUMNS__.keys()])})''' for x in datas])
return f'''insert into robo_eco_datas({','.join(__COLUMNS__.keys())}) values {values}'''
@read
def get_list(eco_ids=None, min_date=None, max_date=None):
sqls = []
if min_date:
sqls.append(f"red_date >= '{format_date(min_date)}'")
if max_date:
sqls.append(f"red_date <= '{format_date(max_date)}'")
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_eco_datas
{where(*sqls, red_eco_id=to_tuple(index_ids))} order by red_eco_id, red_date
'''
@read(one=True)
def get_last_one(eco_id, max_date=None):
sql = f"red_date <= '{format_date(max_date)}'" if max_date else None
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_eco_datas
{where(sql, red_eco_id=eco_id)} order by red_date desc limit 1
'''
@read
def get_last(eco_id, max_date=None, count=1):
sql = f"red_date <= '{format_date(max_date)}'" if max_date else None
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_eco_datas
{where(sql, red_eco_id=eco_id)} order by red_date desc limit {count}
'''
@read(one=True)
def get_one(eco_id, date):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_eco_datas
{where(red_eco_id=eco_id, red_date=date)}
'''
from py_jftech import read, where, format_date, to_tuple
from py_jftech import read, where, format_date, to_tuple, mapper_columns, write
__COLUMNS__ = {
'rfn_fund_id': 'fund_id',
......@@ -6,6 +6,23 @@ __COLUMNS__ = {
'rfn_nav_cal': 'nav_cal',
}
__INSERT_COLUMNS__ = {
**__COLUMNS__,
'rfn_av': 'av',
'rfn_div': 'div',
'rfn_split': 'split',
'rfn_accrue_split': 'accrue_split',
'rfn_av_p': 'av_p',
'rfn_div_p': 'div_p',
}
@write
def batch_insert(datas):
datas = [mapper_columns(x, __INSERT_COLUMNS__, ignore_none=False) for x in datas]
values = ','.join([f'''({','.join([(f"'{x[j]}'" if j in x and x[j] is not None else 'null') for j in __INSERT_COLUMNS__.keys()])})''' for x in datas])
return f'''insert into robo_fund_navs({','.join(__INSERT_COLUMNS__.keys())}) values {values}'''
@read
def get_navs(fund_id=None, min_date=None, max_date=None):
......@@ -26,3 +43,12 @@ def get_min_dates(fund_ids=None):
select rfn_fund_id as fund_id, min(rfn_date) as min_date from robo_fund_navs {where(rfn_fund_id=to_tuple(fund_ids))}
group by rfn_fund_id
'''
@read(one=True)
def get_last_one(fund_id, max_date=None):
sql = f"rfn_date <= '{format_date(max_date)}'" if max_date else None
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_fund_navs
{where(sql, rfn_fund_id=fund_id)} order by rfn_date desc limit 1
'''
......@@ -16,6 +16,7 @@ __COLUMNS__ = {
@write
def batch_insert(datas):
datas = [mapper_columns(x, __COLUMNS__, ignore_none=False) for x in datas]
values = ','.join([f'''({','.join([(f"'{x[j]}'" if j in x and x[j] is not None else 'null') for j in __COLUMNS__.keys()])})''' for x in datas])
return f'''insert into robo_index_datas({','.join(__COLUMNS__.keys())}) values {values}'''
......@@ -34,18 +35,20 @@ def get_list(index_ids=None, min_date=None, max_date=None):
@read(one=True)
def get_last_one(index_id, max_date):
def get_last_one(index_id, max_date=None):
sql = f"rid_date <= '{format_date(max_date)}'" if max_date else None
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_index_datas
{where(f"rid_date <= '{format_date(max_date)}'", rid_index_id=index_id)} order by rid_date desc limit 1
{where(sql, rid_index_id=index_id)} order by rid_date desc limit 1
'''
@read
def get_last(index_id, max_date, count=1):
def get_last(index_id, max_date=None, count=1):
sql = f"rid_date <= '{format_date(max_date)}'" if max_date else None
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_index_datas
{where(f"rid_date <= '{format_date(max_date)}'", rid_index_id=index_id)} order by rid_date desc limit {count}
{where(sql, rid_index_id=index_id)} order by rid_date desc limit {count}
'''
......
......@@ -16,6 +16,19 @@ class DefaultDatum(Datum):
def excludes(self):
return self._config['excludes'] if 'excludes' in self._config else []
def format_datum(self, datum):
if DatumType(datum['type']) is DatumType.FUND:
return {
**datum,
'inceptDate': parse_date(datum['inceptDate'])
}
return datum
def get_datums(self, type: DatumType = None, crncy=None, risk=None, datum_ids=None, ticker=None):
result = rbd.get_base_datums(type=type, crncy=crncy, risk=risk, datum_ids=datum_ids, ticker=ticker)
result = [{**json.loads(x['datas']), 'id': x['id']} for x in result]
return [self.format_datum(x) for x in result if DatumType(x['type']) is not DatumType.FUND or x['bloombergTicker'] not in self.excludes]
def get_fund_datums(self, crncy=None, risk=None, fund_ids=None):
result = rbd.get_base_datums(type=DatumType.FUND, crncy=crncy, risk=risk, datum_ids=fund_ids)
result = [{**json.loads(x['datas']), 'id': x['id']} for x in result]
......@@ -25,14 +38,15 @@ class DefaultDatum(Datum):
result = rbd.get_base_datums(type=DatumType.INDEX, ticker=ticker, datum_ids=index_ids)
return [{**json.loads(x['datas']), 'id': x['id']} for x in result]
def get_high_risk_datums(self, risk: PortfoliosRisk):
risk3 = self.get_fund_datums(risk=3)
risk3 = self.get_datums(type=DatumType.FUND, risk=3)
if risk is PortfoliosRisk.FT3:
return risk3
risk3 = [x for x in risk3 if x['assetType'] in ['STOCK', 'BALANCED', 'COMMODITY']]
if risk is PortfoliosRisk.FT6:
return risk3 + self.get_fund_datums(risk=4)
return risk3 + self.get_datums(type=DatumType.FUND, risk=4)
if risk is PortfoliosRisk.FT9:
return risk3 + self.get_fund_datums(risk=(4, 5))
return risk3 + self.get_datums(type=DatumType.FUND, risk=(4, 5))
return None
import pandas as pd
from py_jftech import get_config, component, autowired, to_tuple
from api import Navs, Datum
from api import Navs, Datum, DatumType
from basic.dao import robo_exrate as re, robo_fund_navs as rfn, robo_index_datas as rid
......@@ -23,7 +23,7 @@ class DefaultNavs(Navs):
max_date=navs.index.max()))
exrate = exrate[['date', 'close']]
exrate.set_index('date', inplace=True)
for fund in self._datum.get_fund_datums(crncy=exrate_config['from']):
for fund in self._datum.get_datums(type=DatumType.FUND, crncy=exrate_config['from']):
if fund['id'] in navs.columns:
navs[fund['id']] = round(navs[fund['id']] * exrate['close'], 4)
navs = navs.reset_index().melt(id_vars='nav_date', value_name='nav_cal')
......@@ -33,13 +33,13 @@ class DefaultNavs(Navs):
navs = navs.to_dict('records')
return navs
def get_nav_start_date(self, fund_ids = None):
def get_nav_start_date(self, fund_ids=None):
return {x['fund_id']: x['min_date'] for x in rfn.get_min_dates(fund_ids=fund_ids)}
def get_index_close(self, datum_ids=None, min_date=None, max_date=None, ticker=None):
datum_ids = to_tuple(datum_ids)
if ticker:
datums = self._datum.get_index_datums(ticker=ticker)
datums = self._datum.get_datums(type=DatumType.INDEX, ticker=ticker)
datum_ids = tuple(set(list(datum_ids or []) + [x['id'] for x in datums]))
results = rid.get_list(index_ids=datum_ids, min_date=min_date, max_date=max_date)
return [{'index_id': x['index_id'], 'date': x['date'], 'close': x['close']} for x in results]
......@@ -47,7 +47,7 @@ class DefaultNavs(Navs):
def get_last_index_close(self, max_date, datum_id=None, ticker=None, count=1):
if not datum_id:
assert ticker, "get last index close, datum_id and ticker give at least one"
datum = self._datum.get_index_datums(ticker=ticker)
datum = self._datum.get_datums(type=DatumType.INDEX, ticker=ticker)
datum_id = datum[0]['id'] if datum else None
assert datum_id, "get last index close, datum id is not found"
assert max_date, "get last index close, start_date is not found"
......@@ -66,3 +66,8 @@ class DefaultNavs(Navs):
'close': x['close']
} for x in last] if last else None
def get_eco_values(self, datum_ids=None, min_date=None, max_date=None, ticker=None):
pass
def get_last_eco_values(self, max_date, datum_id=None, ticker=None, count=1):
pass
from py_jftech import read, write, where, parse_date, format_date, is_workday
import logging
from datetime import datetime as dt
from urllib.parse import quote
from basic.dao import robo_base_datum as rbd, robo_index_datas as rid
from api import DatumType
from abc import ABC, abstractmethod
from typing import List
import requests
from datetime import datetime as dt
import logging
from py_jftech import parse_date, format_date, is_workday, component, autowired, get_config, filter_weekend, next_workday
from api import DatumType, DataSync, Datum
from basic.dao import robo_base_datum as rbd, robo_index_datas as rid, robo_eco_datas as red, robo_fund_navs as rfn
logger = logging.getLogger(__name__)
URL = {
'INDEX': 'http://jdcprod.thiztech.com/api/datas/index-value?page={}&size=200&sourceCode={}&sourceType=BLOOMBERG&startDate={}',
'ECO': 'http://jdcprod.thiztech.com/api/datas/eco-value?page={}&size=200&sourceCode={}&sourceType=BLOOMBERG&startDate={}'
}
TICKERS = {
'SPX Index': 'INDEX',
'USGG10YR Index': 'INDEX',
'USGG2YR Index': 'INDEX',
'MXWO Index': 'INDEX',
'MXWD Index': 'INDEX',
'CCMP Index': 'INDEX',
'TWSE Index': 'INDEX',
'CPI YOY Index': 'ECO',
'FDTR Index': 'ECO',
'CPURNSA Index': 'ECO',
}
def sync_index():
for ticker, type in TICKERS.items():
logger.info(f"start sync {ticker}")
datum = rbd.get_base_datums(type=DatumType.INDEX, ticker=ticker)[0]
url: str = URL[type]
class JDCDataSync(DataSync, ABC):
@autowired
def __init__(self, datum: Datum = None):
self._datum = datum
self._config = get_config(__name__)
@property
def start_date(self):
return filter_weekend(self._config['start-date'])
@abstractmethod
def datum_start_date(self, datum_id):
pass
@abstractmethod
def build_urls(self, datum, start_date, page=0) -> str:
pass
@property
@abstractmethod
def datum_type(self) -> DatumType:
pass
@abstractmethod
def store_date(self, datumid, datas: List[dict]):
pass
def do_sync(self, max_date=dt.today()):
logger.info(f'start sync datas for type[{self.datum_type}]')
for datum in self._datum.get_datums(type=self.datum_type):
logger.debug(f'start sync ticker[{datum["bloombergTicker"]}]')
page = 0
start = parse_date('2007-01-01')
start_date = self.datum_start_date(datum['id'])
while True:
req_url = url.format(page, quote(ticker), format_date(start))
response = requests.get(req_url).json()
url = self.build_urls(datum=datum, page=page, start_date=start_date)
if url is None:
break
response = requests.get(url).json()
if not response['success']:
raise Exception(f'''request indictor failed: {response['status']}''')
try:
save_close = [{
'rid_index_id': datum['id'],
'rid_date': dt.fromtimestamp(x['date'] / 1000).strftime('%Y-%m-%d'),
'rid_close': x['close'],
'rid_open': x['open'] if 'open' in x else None,
'rid_high': x['high'] if 'high' in x else None,
'rid_low': x['low'] if 'low' in x else None,
'rid_pe': x['peRatio'] if 'peRatio' in x else None,
'rid_pb': x['pbRatio'] if 'pbRatio' in x else None,
'rid_volume': x['volume'] if 'volume' in x else None,
} for x in response['body']['content'] if is_workday(dt.fromtimestamp(x['date'] / 1000)) and 'close' in x]
rid.batch_insert(save_close)
self.store_date(datum['id'], response['body']['content'])
except Exception as e:
logger.exception(req_url)
logger.exception(f'url[{url}] store data failed')
raise e
if response['body']['last']:
break
......@@ -61,5 +65,99 @@ def sync_index():
page += 1
if __name__ == '__main__':
sync_index()
@component(bean_name='index-sync')
class IndexSync(JDCDataSync):
@property
def datum_type(self) -> DatumType:
return DatumType.INDEX
def datum_start_date(self, datum_id):
last = rid.get_last_one(index_id=datum_id)
return next_workday(last['date']) if last else self.start_date
def build_urls(self, datum, start_date, page=0) -> str:
return f'http://jdcprod.thiztech.com/api/datas/index-value?page={page}&size=200&sourceCode={quote(datum["bloombergTicker"])}&sourceType=BLOOMBERG&startDate={format_date(start_date)}'
def store_date(self, datumid, datas: List[dict]):
save_datas = [{
'index_id': datumid,
'date': dt.fromtimestamp(x['date'] / 1000).strftime('%Y-%m-%d'),
'close': x['close'],
'open': x['open'] if 'open' in x else None,
'high': x['high'] if 'high' in x else None,
'low': x['low'] if 'low' in x else None,
'pe': x['peRatio'] if 'peRatio' in x else None,
'pb': x['pbRatio'] if 'pbRatio' in x else None,
'volume': x['volume'] if 'volume' in x else None,
} for x in datas if is_workday(dt.fromtimestamp(x['date'] / 1000)) and 'close' in x]
rid.batch_insert(save_datas)
@component(bean_name='eco-sync')
class EcoSync(JDCDataSync):
@property
def datum_type(self) -> DatumType:
return DatumType.ECO
def datum_start_date(self, datum_id):
last = red.get_last_one(eco_id=datum_id)
return next_workday(last['date']) if last else self.start_date
def build_urls(self, datum, start_date, page=0) -> str:
return f'http://jdcprod.thiztech.com/api/datas/eco-value?page={page}&size=200&sourceCode={quote(datum["bloombergTicker"])}&sourceType=BLOOMBERG&startDate={format_date(start_date)}'
def store_date(self, datumid, datas: List[dict]):
save_datas = [{
'eco_id': datumid,
'date': dt.fromtimestamp(x['date'] / 1000),
'indicator': x['close'],
'release_date': dt.fromtimestamp(x['releaseDate'] / 1000),
} for x in datas if 'releaseDate' in x]
red.batch_insert(save_datas)
@component(bean_name='navs-sync')
class FundNavSync(JDCDataSync):
def __init__(self):
super(FundNavSync, self).__init__()
self._subject_keys = self.find_jdc_subject_key()
@property
def datum_type(self) -> DatumType:
return DatumType.FUND
def datum_start_date(self, datum_id):
last = rfn.get_last_one(fund_id=datum_id)
return next_workday(last['nav_date']) if last else self.start_date
def build_urls(self, datum, start_date, page=0) -> str:
if datum['id'] not in self._subject_keys:
return None
key = self._subject_keys[datum['id']]
return f'http://jdcprod.thiztech.com/api/datas/asset-value?subjectKeys={key}&page={page}&size=200&sourceType=TW&startDate={format_date(start_date)}'
def find_jdc_subject_key(self):
funds = self._datum.get_datums(type=DatumType.FUND)
funds = {x['ftTicker']: x for x in self._datum.get_datums(type=DatumType.FUND)}
response = requests.get('http://jdcprod.thiztech.com/api/subject?busiField=TW&sourceType=TW&subjectType=FUND')
response = response.json()
if not response['success']:
raise CollectError(f'''find fund subject failed: {response['status']}''')
return {funds[x['fundId']]['id']: x['key'] for x in response['body']['content'] if x['fundId'] in funds}
def store_date(self, datumid, datas: List[dict]):
save_navs = [{
'fund_id': datumid,
'nav_date': dt.fromtimestamp(x['date'] / 1000),
'av': x['originValue'],
'div': x['dividend'] if 'dividend' in x else 0,
'split': x['split'] if 'split' in x else 1,
'accrue_split': x['totalSplit'] if 'totalSplit' in x else 1,
'av_p': x['postValue'],
'div_p': x['postDividend'] if 'postDividend' in x else 0,
'nav_cal': x['calibrateValue']
} for x in datas if is_workday(dt.fromtimestamp(x['date'] / 1000))]
rfn.batch_insert(save_navs)
......@@ -3,7 +3,7 @@ import unittest
from py_jftech import autowired, parse_date
from api import Navs, Datum, PortfoliosRisk
from api import Navs, Datum, PortfoliosRisk, DataSync
class BasicTest(unittest.TestCase):
......@@ -19,6 +19,18 @@ class BasicTest(unittest.TestCase):
datums = datum.get_high_risk_datums(PortfoliosRisk.FT9)
self.logger.info(datums)
@autowired(names={'sync': 'index-sync'})
def test_index_sync(self, sync: DataSync = None):
sync.do_sync()
@autowired(names={'sync': 'eco-sync'})
def test_eco_sync(self, sync: DataSync = None):
sync.do_sync()
@autowired(names={'sync': 'navs-sync'})
def test_navs_sync(self, sync: DataSync = None):
sync.do_sync()
if __name__ == '__main__':
unittest.main()
......@@ -21,6 +21,11 @@ py-jftech:
backupCount: 30
encoding: utf8
when: D
loggers:
basic.sync:
level: DEBUG
handlers: [console]
propagate: no
root:
level: ${LOG_LEVEL:INFO}
handlers: ${LOG_HANDLERS:[ console ]}
......@@ -40,6 +45,8 @@ py-jftech:
mulit-process:
max-workers: ${MAX_PROCESS:8}
basic: # 基础信息模块
sync:
start-date: 2007-01-01 # 同步数据开始日期
datum: # 资料模块
excludes: # 排除的资料彭博ticker
- 'FKUQX US Equity'
......@@ -150,6 +157,7 @@ rebalance: # 再平衡模块
threshold: [ 0.5, 0.8 ] # [ 低买入阀值,高买入阀值 ]
robo-executor: # 执行器相关
use: ${ROBO_EXECUTOR:backtest} #执行哪个执行器,优先取系统环境变量ROBO_EXECUTOR的值,默认backtest
sync-data: ${SYNC_DATA:on}
backtest: # 回测执行器相关
start-date: 2008-01-02 # 回测起始日期
end-date: 2022-11-01 # 回测截止日期
......
......@@ -8,7 +8,7 @@ from numpy import NAN
from py_jftech import component, autowired, get_config, is_workday
from pyomo.environ import *
from api import SolverFactory as Factory, PortfoliosRisk, PortfoliosType, AssetPool, Navs, Solver, Datum
from api import SolverFactory as Factory, PortfoliosRisk, PortfoliosType, AssetPool, Navs, Solver, Datum, DatumType
from portfolios.utils import format_weight
logger = getLogger(__name__)
......@@ -210,7 +210,7 @@ class DefaultSolver(Solver):
def reset_navs(self, day):
asset_ids = self._assets.get_pool(day)
asset_risk = self.get_config('navs.risk')
datum = self._datum.get_fund_datums(fund_ids=asset_ids, risk=asset_risk)
datum = self._datum.get_datums(type=DatumType.FUND, fund_ids=asset_ids, risk=asset_risk)
exclude = self.get_config('navs.exclude-asset-type') or []
asset_ids = list(set(asset_ids) & set([x['id'] for x in datum if x['assetType'] not in exclude]))
......
......@@ -4,6 +4,7 @@ import time
from datetime import datetime as dt
from enum import Enum, unique
from concurrent.futures import wait
from typing import List
import pandas as pd
from py_jftech import (
......@@ -12,8 +13,8 @@ from py_jftech import (
)
from api import (
RoboExecutor, AssetRisk, Datum, AssetPool, PortfoliosBuilder,
PortfoliosRisk, PortfoliosHolder, PortfoliosType, RebalanceRuler
RoboExecutor, AssetRisk, Datum, AssetPool, PortfoliosBuilder, DatumType,
PortfoliosRisk, PortfoliosHolder, PortfoliosType, RebalanceRuler, DataSync
)
logger = logging.getLogger(__name__)
......@@ -34,7 +35,7 @@ class BacktestStep(Enum):
class BacktestExector(RoboExecutor):
@autowired
def __init__(self, risk: AssetRisk = None, datum: Datum = None, pool: AssetPool = None,
def __init__(self, risk: AssetRisk = None, datum: Datum = None, pool: AssetPool = None, syncs: List[DataSync] = None,
builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None, rule: RebalanceRuler = None):
self._risk = risk
self._datum = datum
......@@ -42,6 +43,7 @@ class BacktestExector(RoboExecutor):
self._builder = builder
self._hold = hold
self._rule = rule
self._syncs = syncs
self._config = get_config(__name__)['backtest']
@property
......@@ -52,10 +54,15 @@ class BacktestExector(RoboExecutor):
def start_step(self) -> BacktestStep:
return BacktestStep(self._config['start-step'])
@property
def is_sync_data(self):
return get_config(__name__)['sync-data']
@property
def end_date(self):
return pd.to_datetime(self._config['end-date'])
@property
def is_clean_up(self):
return self._config['clean-up'] if 'clean-up' in self._config else True
......@@ -75,12 +82,15 @@ class BacktestExector(RoboExecutor):
self._rule.clear_signal()
def start_exec(self):
if self.is_clean_up():
if self.is_sync_data:
for sync in self._syncs:
sync.do_sync()
if self.is_clean_up:
self.clear_datas()
if self.start_step.within(BacktestStep.EWMA_VALUE):
logger.info("start to build fund ewma value.".center(50, '-'))
now = dt.now()
wait([self.async_build_risk_date(x['id']) for x in self._datum.get_fund_datums(risk=(3, 4, 5))])
wait([self.async_build_risk_date(x['id']) for x in self._datum.get_datums(type=DatumType.FUND, risk=(3, 4, 5))])
logger.info(f"build fund ewma value success, use[{(dt.now() - now).seconds}s]")
if self.start_step.within(BacktestStep.ASSET_POOL):
logger.info("start to build asset pool".center(50, '-'))
......@@ -120,15 +130,20 @@ class BacktestExector(RoboExecutor):
class RealExecutor(RoboExecutor):
@autowired
def __init__(self, builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None):
def __init__(self, builder: PortfoliosBuilder = None, hold: PortfoliosHolder = None, syncs: List[DataSync] = None,):
self._builder = builder
self._hold = hold
self._syncs = syncs
self._config = get_config(__name__)['real']
@property
def start_date(self):
return pd.to_datetime(filter_weekend(self._config['start-date']))
@property
def is_sync_data(self):
return get_config(__name__)['sync-data']
@property
def curt_date(self):
if len(sys.argv) > 1:
......@@ -139,6 +154,9 @@ class RealExecutor(RoboExecutor):
return dt.combine(dt.today().date(), dt.min.time())
def start_exec(self):
if self.is_sync_data:
for sync in self._syncs:
sync.do_sync()
date = self.curt_date
for risk in PortfoliosRisk:
logger.info(f"start to build risk[{risk.name}] real for date[{format_date(date)}]".center(50, '-'))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment