Commit 41b8f631 authored by jichao's avatar jichao

依赖注入实现中

parent 36651735
from abc import ABCMeta, abstractmethod
from enum import Enum, unique
@unique
class DatumType(Enum):
FUND = 'FUND'
class BusinessException(Exception):
def __init__(self, msg):
self.__msg = msg
def __str__(self):
return self.__msg
class Datum(metaclass=ABCMeta):
'''
基础资料服务,基金资料数据,各种指数,指标资料数据
'''
@abstractmethod
def get_fund_datums(self, crncy=None, risk=None, fund_ids=None):
'''
获取基金资料数据
:param crncy: 货币类型
:param risk: 风险等级,e.g: 1-5
:param fund_ids: 基金ID列表
:return:基金资料信息
'''
pass
class Navs(metaclass=ABCMeta):
'''
基础数据相关服务,基金净值,各种指标 高开低收
'''
@abstractmethod
def get_fund_navs(self, fund_ids=None, min_date=None, max_date=None):
'''
获取基金净值信息
:param fund_ids: 基金id,可以多个id,使用tuple包裹
:param min_date: 起始时间
:param max_date: 截止时间
:return: 基金净值信息
'''
pass
class FundOptimize(metaclass=ABCMeta):
'''
基金优选相关服务
'''
@abstractmethod
def find_optimize(self, fund_ids, day):
'''
从多个基金id中,选出指定日期最优的基金id
:param fund_ids: 待选基金id列表
:param day: 指定日期
:return: 最优的基金id
'''
pass
@abstractmethod
def get_optimize_pool(self, day):
'''
根据基金优选获取指定日期的基金池
:param day: 指定日期
:return: 基金id列表
'''
pass
......@@ -54,17 +54,3 @@ CREATE TABLE IF NOT EXISTS `robo_exrate`
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT '基金数据表';
CREATE TABLE IF NOT EXISTS robo_optimize_pool
(
rop_id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
rop_date DATETIME NOT NULL COMMENT '数据日期',
rop_fund_ids JSON DEFAULT NULL COMMENT '基金ID',
rop_create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
rop_update_time DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (rop_id),
INDEX (rop_date)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT '优选基金池';
import json
from framework import read, parse_date, where
from api import DatumType
@read
def get_list(*args, **kwargs):
return f'''select rbd_id as id, rbd_datas as datas from robo_base_datum {where(*args, **kwargs)}'''
@read(one=True)
def get_one(*args, **kwargs):
return f'''select rbd_id as id, rbd_datas as datas from robo_base_datum {where(*args, **kwargs)}'''
def get_base_datums(type: DatumType = None, crncy=None, risk=None, fund_ids=None):
fund_ids = tuple(fund_ids) if fund_ids else None
return get_list(rbd_id=fund_ids, v_rbd_type=type, v_rbd_crncy=crncy, v_rbd_risk=risk)
......@@ -2,7 +2,12 @@ from framework import read, where, format_date
@read
def _select(*args, **kwargs):
def get_list(*args, **kwargs):
return f'''select re_id as id, re_ticker as ticker, re_date as date, re_close as close from robo_exrate {where(*args, **kwargs)}'''
@read(one=True)
def get_one(*args, **kwargs):
return f'''select re_id as id, re_ticker as ticker, re_date as date, re_close as close from robo_exrate {where(*args, **kwargs)}'''
......@@ -12,15 +17,8 @@ def get_exrates(ticker=None, min_date=None, max_date=None):
sqls.append(f"re_date >= '{format_date(min_date)}'")
if max_date:
sqls.append(f"re_date <= '{format_date(max_date)}'")
return _select(*sqls, re_ticker=ticker)
return get_list(*sqls, re_ticker=ticker)
@read(one=True)
def get_exrate(ticker, date):
return f'''select re_id as id, re_ticker as ticker, re_date as date, re_close as close from robo_exrate {where(re_ticker=ticker, re_date=date)}'''
if __name__ == '__main__':
from framework import parse_date
print(get_exrate(date=parse_date('2022-11-01'), ticker='EURUSD BGN Curncy'))
return get_one(re_ticker=ticker, re_date=date)
......@@ -2,7 +2,12 @@ from framework import read, where, format_date
@read
def _select(*args, **kwargs):
def get_list(*args, **kwargs):
return f'''select rfn_fund_id as fund_id, rfn_date as nav_date, rfn_nav_cal as nav_cal from robo_fund_navs {where(*args, **kwargs)}'''
@read(one=True)
def get_one(*args, **kwargs):
return f'''select rfn_fund_id as fund_id, rfn_date as nav_date, rfn_nav_cal as nav_cal from robo_fund_navs {where(*args, **kwargs)}'''
......@@ -12,10 +17,5 @@ def get_navs(fund_id=None, min_date=None, max_date=None):
sqls.append(f"rfn_date >= '{format_date(min_date)}'")
if max_date:
sqls.append(f"rfn_date <= '{format_date(max_date)}'")
return _select(*sqls, rfn_fund_id=fund_id)
if __name__ == '__main__':
from framework import parse_date
navs = get_navs(fund_id=1, min_date=parse_date('2022-11-01'))
print(navs)
fund_id = tuple(fund_id) if fund_id else None
return get_list(*sqls, rfn_fund_id=fund_id)
import json
from api import DatumType, Datum
from basic.dao import robo_base_datum as rbd
from framework import component, parse_date
@component
class DefaultDatum(Datum):
def get_fund_datums(self, crncy=None, risk=None, fund_ids=None):
result = rbd.get_base_datums(type=DatumType.FUND, crncy=crncy, risk=risk, fund_ids=fund_ids)
result = [{**json.loads(x['datas']), 'id': x['id']} for x in result]
return [{**x, 'inceptDate': parse_date(x['inceptDate'])} for x in result]
import pandas as pd
from api import Navs, Datum
from basic.dao import robo_exrate as re, robo_fund_navs as rfn
from framework import get_config, component, autowired
@component
class DefaultNavs(Navs):
@autowired
def __init__(self, datum: Datum = None):
self._config = get_config(__name__)
self._datum = datum
def get_fund_navs(self, fund_ids=None, min_date=None, max_date=None):
navs = rfn.get_navs(fund_id=fund_ids, min_date=min_date, max_date=max_date)
if 'exrate' in self._config:
navs = pd.DataFrame(navs)
navs = navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
for exrate_config in self._config['exrate']:
exrate = pd.DataFrame(re.get_exrates(ticker=exrate_config['ticker'], min_date=navs.index.min(), max_date=navs.index.max()))
exrate = exrate[['date', 'close']]
exrate.set_index('date', inplace=True)
for fund in self._datum.get_fund_datums(crncy=exrate_config['from']):
if fund['id'] in navs.columns:
navs[fund['id']] = round(navs[fund['id']] * exrate['close'], 4)
navs = navs.reset_index().melt(id_vars='nav_date', value_name='nav_cal')
navs.dropna(inplace=True)
navs = navs[['fund_id', 'nav_date', 'nav_cal']]
navs.sort_values('fund_id', inplace=True)
navs = navs.to_dict('records')
return navs
......@@ -46,7 +46,7 @@ framework:
root:
level: INFO
handlers: [console]
datas:
basic:
navs:
exrate:
- from: EUR
......
from datas.datum.enums import *
from .api import (
get_fund_datums,
)
del api, robo_base_datum, enums
import datas.datum.robo_base_datum as _rbd
from datas.datum.enums import DatumType
def get_fund_datums(crncy=None, risk=None, fund_ids=None):
return _rbd.get_base_datums(type=DatumType.FUND, crncy=crncy, risk=risk)
if __name__ == '__main__':
print(get_fund_datums(risk=1))
from enum import Enum, unique
@unique
class DatumType(Enum):
FUND = 'FUND'
from framework import read, write, config, format_date, parse_date, where
import json
from datetime import datetime
from datas.datum.enums import DatumType
@read
def _select(*args, **kwargs):
return f'''select rbd_id as id, rbd_datas as datas from robo_base_datum {where(*args, **kwargs)}'''
def get_base_datums(type: DatumType = None, crncy=None, risk=None):
result = _select(v_rbd_type=type, v_rbd_crncy=crncy, v_rbd_risk=risk)
result = [{**json.loads(x['datas']), **{'id': x['id']}} for x in result]
return [{**x, **{'inceptDate': parse_date(x['inceptDate'])}} for x in result]
if __name__ == '__main__':
from enums import Enum
print(isinstance(DatumType.FUND, Enum))
from .api import (
get_navs
)
del api
import pandas as _pd
from datas.navs import robo_exrate as _re, robo_fund_navs as _navs
from datas import datum as _datum, DatumType
from framework import config, to_bool
from datetime import timedelta
navs_config = config['datas']['navs'] if 'datas' in config and 'navs' in config['datas'] else {}
def get_navs(fund_id=None, min_date=None, max_date=None):
navs = _navs.get_navs(fund_id=fund_id, min_date=min_date, max_date=max_date)
if 'exrate' in navs_config:
navs = _pd.DataFrame(navs)
navs = navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
for exrate_config in navs_config['exrate']:
exrate = _pd.DataFrame(_re.get_exrates(ticker=exrate_config['ticker'], min_date=navs.index.min(), max_date=navs.index.max()))
exrate = exrate[['date', 'close']]
exrate.set_index('date', inplace=True)
for fund in _datum.get_fund_datums(crncy=exrate_config['from']):
navs[fund['id']] = round(navs[fund['id']] * exrate['close'], 4)
navs = navs.reset_index().melt(id_vars='nav_date', value_name='nav_cal')
navs.dropna(inplace=True)
navs = navs[['fund_id', 'nav_date', 'nav_cal']]
navs.sort_values('fund_id', inplace=True)
navs = navs.to_dict('records')
return navs
if __name__ == '__main__':
from framework import parse_date
print(get_navs(min_date=parse_date('2022-11-01')))
from .date_utils import *
from .base import *
from .datebase import read, write, transaction, where
from .database import read, write, transaction, where
from .__env_config import config, get_config
from .__logger import build_logger, logger
from .injectable import component, autowired, get_instance, init_injectable as _init_injectable
_init_injectable()
del injectable, __logger, __env_config, datebase, base, date_utils, _init_injectable
del injectable, __logger, __env_config, database, base, date_utils, _init_injectable
......@@ -2,14 +2,11 @@ import functools
import pymysql
import threading
from pymysql.cursors import DictCursor
from .__env_config import config as global_config
from .__env_config import get_config
from .date_utils import format_date, datetime
from enum import Enum
_CONFIG = global_config['framework']['database'] if 'framework' in global_config and 'database' in global_config['framework'] else None
class DatabaseError(Exception):
def __init__(self, msg):
self.__msg = msg
......@@ -20,16 +17,16 @@ class DatabaseError(Exception):
class Database:
def __init__(self, config):
self._config = config or _CONFIG
self._config = config or get_config(__name__)
if self._config is None:
raise DatabaseError("database config is not found.")
def __enter__(self):
port = 3306
if 'port' in self.config.keys():
port = self.config['port']
self.__connect = pymysql.connect(host=self.config['host'], user=self.config['user'], port=port,
password=str(self.config['password']), database=self.config['dbname'])
if 'port' in self._config:
port = self._config['port']
self.__connect = pymysql.connect(host=self._config['host'], user=self._config['user'], port=port,
password=str(self._config['password']), database=self._config['dbname'])
self.__cursor = self.connect.cursor(DictCursor)
return self
......@@ -136,7 +133,7 @@ def where(*args, **kwargs) -> str:
elif v is not None:
result.append(f"{k} = {v}")
if args:
result.extend(args)
result.extend([x for x in args if x])
return f"where {' and '.join(result)}" if result else ''
CREATE TABLE IF NOT EXISTS robo_optimize_pool
(
rop_id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
rop_date DATETIME NOT NULL COMMENT '数据日期',
rop_fund_ids JSON DEFAULT NULL COMMENT '基金ID',
rop_create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
rop_update_time DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (rop_id),
UNIQUE INDEX (rop_date)
) ENGINE = InnoDB
AUTO_INCREMENT = 0
DEFAULT CHARSET = utf8mb4 COMMENT '优选基金池';
\ No newline at end of file
import json
from framework import read, write, where, format_date
__COLUMNS__ = {
'rop_id': 'id',
'rop_date': 'date',
'rop_fund_ids': 'fund_ids',
}
@read
def get_list(*args, **kwargs):
return f'''select {','.join([f"`{x[0]}` as `{x[1]}`" for x in __COLUMNS__.items()])} from robo_optimize_pool {where(*args, **kwargs)}'''
@read(one=True)
def get_one(*args, **kwargs):
return f'''select {','.join([f"`{x[0]}` as `{x[1]}`" for x in __COLUMNS__.items()])} from robo_optimize_pool {where(*args, **kwargs)}'''
@read(one=True)
def get_last_one(day=None):
sql = f"rop_date <= '{format_date(day)}'" if day else None
return f'''select {','.join([f"`{x[0]}` as `{x[1]}`" for x in __COLUMNS__.items()])} from robo_optimize_pool {where(sql)} order by rop_date desc limit 1'''
@write
def insert(day, pool):
return f'''
insert into robo_optimize_pool(rop_date, rop_fund_ids)
values ('{format_date(day)}', '{json.dumps(pool) if isinstance(pool, dict) else pool}')
'''
import json
import pandas as pd
from framework import filter_weekend, config, dict_remove, get_config
from datas import navs
from dateutil.relativedelta import relativedelta
from empyrical import sortino_ratio
from framework import filter_weekend, dict_remove, get_config, component, autowired
from api import FundOptimize, Navs, BusinessException, Datum
from fund_pool.dao import robo_optimize_pool as rop
@component
class SortinoFundOptimize(FundOptimize):
_navs: Navs = None
_datum: Datum = None
@autowired
def __init__(self, navs: Navs = None, datum: Datum = None):
self._navs = navs
self._datum = datum
optimize_config = get_config(__name__)
self._config = [{
**x,
'name': [f"sortino_{y[1]}_{y[0]}" for y in x.items() if y[0] != 'weight'][0]
} for x in optimize_config['sortino_weight']] if 'sortino_weight' in optimize_config else []
def find_optimize(self, fund_ids, day):
if not self._config:
raise BusinessException(f"find optimize, but not found sortino config.")
start = filter_weekend(sorted([day - relativedelta(days=1, **dict_remove(x, ('weight', 'name'))) for x in self._config])[0])
fund_navs = pd.DataFrame(self._navs.get_fund_navs(fund_ids=tuple(fund_ids), min_date=start, max_date=day))
fund_navs.sort_values('nav_date', inplace=True)
fund_navs = fund_navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
fund_navs.fillna(method='ffill', inplace=True)
day_income = round(fund_navs.pct_change(), 4)
sortino = pd.DataFrame()
for item in self._config:
delta_kwargs = item.copy()
del delta_kwargs['weight'], delta_kwargs['name']
ratio = dict(sortino_ratio(day_income.truncate(before=(day - relativedelta(**delta_kwargs)))))
sortino = pd.concat([sortino, pd.DataFrame([ratio], index=[item['name']])])
sortino = sortino.T
sortino['score'] = sortino.apply(lambda r: sum([x['weight'] * r[x['name']] for x in self._config]), axis=1)
sortino.sort_values('score', ascending=False, inplace=True)
return day_income.columns[sortino.index[0]]
def get_optimize_pool(self, day):
last_one = rop.get_last_one(day)
if not last_one:
funds = pd.DataFrame(self._datum.get_fund_datums())
pool = []
for (category, asset_type), fund_group in funds.groupby(by=['category', 'assetType']):
if len(fund_group) > 1:
pool.append(self.find_optimize(tuple(fund_group['id']), day))
else:
pool.append(fund_group.iloc[0].id)
rop.insert(day, sorted(pool))
last_one = rop.get_last_one(day)
return json.loads(last_one['fund_ids'])
optimize_config = get_config(__name__)
sortino_config = [{**x, 'name': [f"sortino_{y[1]}_{y[0]}" for y in x.items() if y[0] != 'weight'][0]} for x in optimize_config['sortino_weight']] \
if 'sortino_weight' in optimize_config else []
def find_optimize(fund_ids, day):
if not sortino_config:
raise NameError(f"find optimize, but not found sortino config.")
start = filter_weekend(sorted([day - relativedelta(days=1, **dict_remove(x, ('weight', 'name'))) for x in sortino_config])[0])
fund_navs = pd.DataFrame(navs.get_navs(fund_id=tuple(fund_ids), min_date=start, max_date=day))
fund_navs.sort_values('nav_date', inplace=True)
fund_navs = fund_navs.pivot_table(index='nav_date', columns='fund_id', values='nav_cal')
fund_navs.fillna(method='ffill', inplace=True)
day_income = round(fund_navs.pct_change(), 4)
sortino = pd.DataFrame()
for item in sortino_config:
delta_kwargs = item.copy()
del delta_kwargs['weight'], delta_kwargs['name']
ratio = dict(sortino_ratio(day_income.truncate(before=(day - relativedelta(**delta_kwargs)))))
sortino = pd.concat([sortino, pd.DataFrame([ratio], index=[item['name']])])
sortino = sortino.T
sortino['score'] = sortino.apply(lambda r: sum([x['weight'] * r[x['name']] for x in sortino_config]), axis=1)
sortino.sort_values('score', ascending=False, inplace=True)
return day_income.columns[sortino.index[0]]
def get_optimize_fund_pool(day):
pass
if __name__ == '__main__':
from datetime import date
get_optimize_fund_pool(date.today())
from framework import logger
from framework import autowired, parse_date, logger
from api import FundOptimize
@autowired
def start(optimize: FundOptimize = None):
pool = optimize.get_optimize_pool(parse_date('2022-11-07'))
logger.info(pool)
if __name__ == '__main__':
logger.info(dir())
start()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment