Commit 898bf81b authored by jichao's avatar jichao

抽离framework

parent caca123c
......@@ -2,7 +2,7 @@ from abc import ABC, abstractmethod
from datetime import datetime as dt
from enum import Enum, unique
from framework import get_config
from py_jftech import get_config
@unique
......
......@@ -3,13 +3,12 @@ from abc import ABC, abstractmethod
from datetime import datetime as dt, timedelta
import pandas as pd
import numpy as np
from dateutil.relativedelta import relativedelta
from empyrical import sortino_ratio
from py_jftech import filter_weekend, dict_remove, get_config, component, autowired, get_quarter_start, next_workday, is_workday
from api import AssetOptimize, Navs, Datum, AssetPoolType
from asset_pool.dao import robo_assets_pool as rop
from framework import filter_weekend, dict_remove, get_config, component, autowired, get_quarter_start, next_workday, is_workday
class SortinoAssetOptimize(AssetOptimize, ABC):
......
from datetime import datetime as dt
from py_jftech import component, autowired
from api import AssetPool, AssetOptimize, AssetRisk
from framework import component, autowired
from asset_pool.dao import robo_assets_pool as rap
......
import json
import logging
from datetime import datetime as dt
import pandas as pd
from dateutil.relativedelta import relativedelta
from py_jftech import component, autowired, get_config, format_date, block_execute, transaction
from scipy.stats import norm
from api import AssetRisk, Navs, AssetRiskDateType as DateType, Datum, AssetPoolType, RoboExecutor
from asset_pool.dao import asset_risk_dates as ard, asset_ewma_value as aev, robo_assets_pool as rap
from framework import component, autowired, get_config, format_date, block_execute, get_logger, transaction
logger = get_logger(__name__)
logger = logging.getLogger(__name__)
@component
......
from framework import read, write, where, format_date
from py_jftech import read, write, where, format_date
__COLUMNS__ = {
'aev_id': 'id',
......
from py_jftech import read, write, where, format_date
from api import AssetRiskDateType as DateType
from framework import read, write, where, format_date
__COLUMNS__ = {
'ard_id': 'id',
......
import json
from py_jftech import read, write, where, format_date
from api import AssetPoolType
from framework import read, write, where, format_date
__COLUMNS__ = {
'rap_id': 'id',
......
from py_jftech import read, where, to_tuple
from api import DatumType
from framework import read, where, to_tuple
@read
......
from framework import read, where, format_date
from py_jftech import read, where, format_date
__COLUMNS__ = {
're_id': 'id',
......
from framework import read, where, format_date, to_tuple
from py_jftech import read, where, format_date, to_tuple
__COLUMNS__ = {
'rfn_fund_id': 'fund_id',
......
from framework import read, write, format_date, to_tuple, where
import requests
from datetime import datetime
import pandas as pd
from py_jftech import read, write, format_date, to_tuple, where
__COLUMNS__ = {
'rid_index_id': 'index_id',
......
import json
from py_jftech import component, parse_date, get_config
from api import DatumType, Datum, PortfoliosRisk
from basic.dao import robo_base_datum as rbd
from framework import component, parse_date, get_config
@component
......
import pandas as pd
from py_jftech import get_config, component, autowired, to_tuple
from api import Navs, Datum
from basic.dao import robo_exrate as re, robo_fund_navs as rfn, robo_index_datas as rid
from framework import get_config, component, autowired, to_tuple
@component
......
import logging
import unittest
from py_jftech import autowired, parse_date
from api import Navs, Datum, PortfoliosRisk
from framework import autowired, parse_date, get_logger
class BasicTest(unittest.TestCase):
logger = get_logger(__name__)
logger = logging.getLogger(__name__)
@autowired
def test_index_close(self, navs: Navs = None):
......
......@@ -16,7 +16,6 @@ framework:
max-workers: 8
logger:
version: 1
use: ${LOG_NAME}
formatters:
brief:
format: "%(asctime)s - %(levelname)s - %(message)s"
......@@ -32,19 +31,14 @@ framework:
class: logging.handlers.TimedRotatingFileHandler
level: INFO
formatter: brief
filename: logs/info.log
filename: ${LOG_FILE:logs/info.log}
interval: 1
backupCount: 30
encoding: utf8
when: D
loggers:
prod:
handlers: [ console, file ]
level: INFO
propagate: no
root:
level: INFO
handlers: [ console ]
level: ${LOG_LEVEL:INFO}
handlers: ${LOG_HANDLERS:[ console ]}
basic: # 基础信息模块
datum: # 资料模块
excludes: # 排除的资料彭博ticker
......
from .date_utils import *
from .base import *
from .database import read, write, transaction, where, mapper_columns
from .env_config import config, get_config
from .logs import build_logger, get_logger
from .injectable import component, autowired, get_instance, init_injectable as _init_injectable
from .mulit_process import process_pool, create_process_pool, block_execute
_init_injectable()
del injectable, logs, env_config, database, base, date_utils, _init_injectable, mulit_process
import os
from functools import reduce
__all__ = [
'get_project_path',
'deep_dict_update',
'dict_remove',
'equals_ignore_case',
'to_bool',
'to_tuple',
]
def get_project_path():
for anchor in ['.idea', '.git', 'config.yml', 'requirements.txt']:
path = os.path.realpath(os.curdir)
while anchor not in os.listdir(path):
parent = os.path.dirname(path)
if parent == path:
path = None
break
else:
path = parent
if path is not None:
return path
return None
def deep_dict_update(d1, d2):
for key in d1:
if key not in d2:
continue
if isinstance(d1[key], dict) and isinstance(d2[key], dict):
deep_dict_update(d1[key], d2[key])
else:
d1[key] = d2[key]
for key in d2:
if key not in d1:
d1[key] = d2[key]
def dict_remove(d: dict, k) -> dict:
result = d.copy()
for key in tuple(k):
del result[key]
return result
def equals_ignore_case(a, b) -> bool:
return str(a).upper() == str(b).upper() if a and b else False
_TRUE_STR = ['true', 't', 'yes', 'y', 'on']
_FALSE_STR = ['false', 'f', 'no', 'n', 'off']
def to_bool(v) -> bool:
if isinstance(v, str):
if reduce(lambda a, b: a or b, [equals_ignore_case(v, x) for x in _TRUE_STR]):
return True
if reduce(lambda a, b: a or b, [equals_ignore_case(v, x) for x in _FALSE_STR]):
return False
return bool(v)
def to_tuple(v) -> tuple:
if isinstance(v, tuple):
return v
if isinstance(v, list):
return tuple(v)
return (v,) if v else None
import functools
import json
import threading
from enum import Enum
import pymysql
from pymysql.cursors import DictCursor
from framework.date_utils import format_date, datetime
from framework.env_config import get_config
class DatabaseError(Exception):
def __init__(self, msg):
self.__msg = msg
def __str__(self):
return self.__msg
class Database:
def __init__(self, config):
self._config = config or get_config(__name__)
if self._config is None:
raise DatabaseError("database config is not found.")
def __enter__(self):
port = 3306
if 'port' in self._config:
port = self._config['port']
self.__connect = pymysql.connect(host=self._config['host'], user=self._config['user'], port=port,
password=str(self._config['password']), database=self._config['dbname'])
self.__cursor = self.connect.cursor(DictCursor)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.cursor.close()
self.connect.close()
@property
def connect(self):
return self.__connect
@property
def cursor(self):
return self.__cursor
__local__ = threading.local()
def read(func=None, config=None, one=False):
if func is None:
return functools.partial(read, config=config, one=one)
def execute(db, sql):
db.cursor.execute(sql)
result = db.cursor.fetchall()
if one:
return result[0] if result else None
else:
return result
@functools.wraps(func)
def wraps(*args, **kwargs):
sql = func(*args, **kwargs)
if hasattr(__local__, 'db'):
return execute(__local__.db, sql)
else:
with Database(config) as db:
return execute(db, sql)
return wraps
def write(func=None, config=None):
if func is None:
return functools.partial(write, config=config)
def get_result(db, sql, res):
if sql.find('insert into') >= 0:
return db.connect.insert_id()
else:
return res
def execute(db, sqls):
if isinstance(sqls, list):
results = []
for sql in sqls:
return [x for x in get_result(db, sql, db.cursor.execute(sql))]
else:
return get_result(db, sqls, db.cursor.execute(sqls))
@functools.wraps(func)
def wraps(*args, **kwargs):
sqls = func(*args, **kwargs)
if hasattr(__local__, 'db'):
return execute(__local__.db, sqls)
else:
with Database(config) as db:
try:
result = execute(db, sqls)
db.connect.commit()
return result
except Exception as e:
db.connect.rollback()
raise e
return wraps
def transaction(func=None, config=None):
if func is None:
return functools.partial(transaction, config=config)
@functools.wraps(func)
def wraps(*args, **kwargs):
if hasattr(__local__, 'db'):
return func(*args, **kwargs)
with Database(config) as db:
__local__.db = db
try:
result = func(*args, **kwargs)
db.connect.commit()
return result
except Exception as e:
db.connect.rollback()
raise e
finally:
del __local__.db
return wraps
def where(*args, **kwargs) -> str:
result = []
if kwargs:
for k, v in kwargs.items():
if isinstance(v, str):
result.append(f"{k} = '{v}'")
elif isinstance(v, bool):
result.append(f"{k} = {1 if v else 0}")
elif isinstance(v, datetime):
result.append(f"{k} = '{format_date(v)}'")
elif isinstance(v, Enum):
result.append(f"{k} = '{v.value}'")
elif isinstance(v, tuple) or isinstance(v, list):
if len(v) > 0:
v = tuple([(x.value if isinstance(x, Enum) else x) for x in v])
result.append(f"{k} in {v}" if len(v) > 1 else f"{k} = '{v[0]}'")
elif v is not None:
result.append(f"{k} = '{v}'")
if args:
result.extend([x for x in args if x])
return f"where {' and '.join(result)}" if result else ''
def mapper_columns(datas: dict, columns: dict) -> dict:
datas = {x[0]: datas[x[1]] for x in columns.items() if x[1] in datas and datas[x[1]] is not None}
return {
**datas,
**{x[0]: format_date(x[1]) for x in datas.items() if isinstance(x[1], datetime)},
**{x[0]: x[1].value for x in datas.items() if isinstance(x[1], Enum)},
**{x[0]: json.dumps(x[1]) for x in datas.items() if isinstance(x[1], dict)},
**{x[0]: (1 if x[1] else 0) for x in datas.items() if isinstance(x[1], bool)}
}
import calendar
from datetime import timedelta, datetime, date
import pandas as pd
def filter_weekend(day):
while calendar.weekday(day.year, day.month, day.day) in [5, 6]:
day = day - timedelta(1)
return pd.to_datetime(day)
def next_workday(day):
result = day
while result == day or result.weekday() in [5, 6]:
result = result + timedelta(1)
return result
def prev_workday(day):
result = day
while result == day or result.weekday() in [5, 6]:
result = result - timedelta(1)
return result
def is_workday(day):
return day.weekday() in range(5)
def workday_range(start, end):
return [datetime.combine(x.date(), datetime.min.time()) for x in pd.date_range(start, end) if is_workday(x)]
def format_date(date, has_time=False):
return date.strftime('%Y-%m-%d %H:%M:%S' if has_time else '%Y-%m-%d')
def parse_date(date, has_time=False):
return datetime.strptime(date, '%Y-%m-%d %H:%M:%S' if has_time else '%Y-%m-%d')
def get_quarter_start(today=datetime.today()):
result = date(today.year, today.month - (today.month - 1) % 3, 1)
return datetime.combine(result, datetime.min.time())
if __name__ == '__main__':
print(workday_range(datetime.today() - timedelta(1), datetime.today()))
import os
import re
from functools import partial
import yaml
from framework.base import *
has_regex_module = False
ENV_VAR_MATCHER = re.compile(
r"""
\$\{ # match characters `${` literally
([^}:\s]+) # 1st group: matches any character except `}` or `:`
:? # matches the literal `:` character zero or one times
([^}]+)? # 2nd group: matches any character except `}`
\} # match character `}` literally
""", re.VERBOSE
)
IMPLICIT_ENV_VAR_MATCHER = re.compile(
r"""
.* # matches any number of any characters
\$\{.*\} # matches any number of any characters
# between `${` and `}` literally
.* # matches any number of any characters
""", re.VERBOSE
)
RECURSIVE_ENV_VAR_MATCHER = re.compile(
r"""
\$\{ # match characters `${` literally
([^}]+)? # matches any character except `}`
\} # match character `}` literally
([^$}]+)? # matches any character except `}` or `$`
\} # match character `}` literally
""",
re.VERBOSE,
)
def _replace_env_var(match):
env_var, default = match.groups()
value = os.environ.get(env_var, None)
if value is None:
# expand default using other vars
if default is None:
# regex module return None instead of
# '' if engine didn't entered default capture group
default = ''
value = default
while IMPLICIT_ENV_VAR_MATCHER.match(value): # pragma: no cover
value = ENV_VAR_MATCHER.sub(_replace_env_var, value)
return value
def env_var_constructor(loader, node, raw=False):
raw_value = loader.construct_scalar(node)
# detect and error on recursive environment variables
if not has_regex_module and RECURSIVE_ENV_VAR_MATCHER.match(raw_value):
# pragma: no cover
raise Exception("Nested environment variable lookup requires the `regex` module")
value = ENV_VAR_MATCHER.sub(_replace_env_var, raw_value)
if value == raw_value:
return value # avoid recursion
return value if raw else yaml.safe_load(value)
def build_config(config_name=None):
CONFIG_NAME = config_name or 'config.yml'
path = f'{get_project_path()}{os.path.sep}{CONFIG_NAME}'
with open(path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
yaml.add_constructor('!env_var', env_var_constructor, yaml.SafeLoader)
yaml.add_constructor(
'!raw_env_var',
partial(env_var_constructor, raw=True),
yaml.SafeLoader
)
yaml.add_implicit_resolver(
'!env_var', IMPLICIT_ENV_VAR_MATCHER, Loader=yaml.SafeLoader
)
config = build_config()
def get_config(module: str = None, file: str = None):
if module == '__main__':
module = file
result = config
if module:
for name in [x.replace('_', '-') for x in module.split('.')]:
result = result[name] if name in result else {}
return result
import os
from functools import partial, wraps
from importlib import import_module
from inspect import signature, Parameter
from typing import get_origin, get_args
from framework.base import get_project_path
from framework.env_config import get_config
config = get_config(__name__)
types_config = config['types'] if 'types' in config and config['types'] else {}
names_config = config['names'] if 'names' in config and config['names'] else {}
__COMPONENT_CLASS = []
__NAME_COMPONENT = {}
__COMPONENT_INSTANCE = {}
class_name = lambda cls: f'{cls.__module__}.{cls.__name__}'
class InjectableError(Exception):
def __init__(self, msg):
self.__msg = msg
def __str__(self):
return self.__msg
def component(cls=None, bean_name=None):
if cls is None:
return partial(component, bean_name=bean_name)
__COMPONENT_CLASS.append(cls)
if bean_name:
if bean_name in __NAME_COMPONENT:
if bean_name not in names_config or names_config[bean_name] is None:
raise InjectableError(f"bean name[{bean_name}] is already defined.")
if class_name(cls) != names_config[bean_name]:
__COMPONENT_CLASS.remove(cls)
return cls
__COMPONENT_CLASS.remove(__NAME_COMPONENT[bean_name])
__NAME_COMPONENT[bean_name] = cls
return cls
def autowired(func=None, names=None):
if func is None:
return partial(autowired, names=names)
@wraps(func)
def wrap(*args, **kwargs):
if func.__name__ == '__init__':
self_type = type(args[0])
if self_type in __COMPONENT_CLASS and self_type not in __COMPONENT_INSTANCE:
__COMPONENT_INSTANCE[self_type] = args[0]
for p_name, p_type in signature(func).parameters.items():
if p_name == 'self' or p_type == Parameter.empty or p_name in kwargs:
continue
if names is not None and p_name in names:
if names[p_name] in __NAME_COMPONENT:
kwargs[p_name] = get_instance(__NAME_COMPONENT[names[p_name]])
elif get_origin(p_type.annotation) is list:
inject_types = get_args(p_type.annotation)
if len(inject_types) > 0:
instances = [get_instance(x) for x in __COMPONENT_CLASS if issubclass(x, inject_types)]
if len(instances) > 0:
kwargs[p_name] = instances
else:
components = [x for x in __COMPONENT_CLASS if issubclass(x, p_type.annotation)]
if len(components) == 1:
kwargs[p_name] = get_instance(components[0])
elif len(components) > 1:
cls = components[0]
if class_name(p_type.annotation) in types_config:
target_name = types_config[class_name(p_type.annotation)]
find_cls = [x for x in components if class_name(x) == target_name]
if find_cls:
cls = find_cls[0]
kwargs[p_name] = get_instance(cls)
func(*args, **kwargs)
return wrap
def get_instance(t):
if t not in __COMPONENT_CLASS:
return None
if t not in __COMPONENT_INSTANCE:
__COMPONENT_INSTANCE[t] = t()
return __COMPONENT_INSTANCE[t]
def init_injectable(root=get_project_path()):
for f in os.listdir(root):
path = os.path.join(root, f)
if os.path.isdir(path) and os.path.exists(os.path.join(path, '__init__.py')):
init_injectable(root=path)
if f.endswith('.py') and f != '__init__.py':
py = os.path.relpath(path, get_project_path())[:-3]
import_module('.'.join(py.split(os.path.sep)))
import logging
import os
from logging import config as cf
from framework.base import get_project_path
from framework.env_config import get_config
def build_logger(config):
if 'handlers' in config and 'file' in config['handlers']:
file = config['handlers']['file']
path = os.path.join(get_project_path(), file["filename"])
os.makedirs(os.path.split(path)[0], exist_ok=True)
file["filename"] = os.path.abspath(path)
cf.dictConfig(config)
config = get_config("framework.logger")
if config:
build_logger(config)
def get_logger(name=None):
return logging.getLogger(config['use'] if 'use' in config and config['use'] is not None else name)
from concurrent.futures import ProcessPoolExecutor, as_completed, wait
from framework.env_config import get_config
config = get_config(__name__)
process_pool = ProcessPoolExecutor(max_workers=config['max-workers'] or 2)
def create_process_pool(max_workers=None):
return ProcessPoolExecutor(max_workers=max_workers or config['max-workers'])
def block_execute(func, params: dict, isolate=False, result=True) -> dict:
if isolate:
with create_process_pool() as process:
futures = {process.submit(func, *x[1]): x[0] for x in params.items()}
return {futures[x]: x.result() for x in as_completed(futures)} if result else wait(futures.keys())
else:
futures = {process_pool.submit(func, *x[1]): x[0] for x in params.items()}
return {futures[x]: x.result() for x in as_completed(futures)} if result else wait(futures.keys())
from framework import autowired
from py_jftech import autowired
from api import RoboExecutor
......
import json
import logging
from py_jftech import component, autowired, format_date
from api import PortfoliosBuilder, PortfoliosRisk, AssetPool, Navs, PortfoliosType, Datum, SolveType, SolverFactory
from framework import component, autowired, format_date, get_logger
from portfolios.dao import robo_mpt_portfolios as rmp
logger = get_logger(__name__)
logger = logging.getLogger(__name__)
@component(bean_name='mpt')
......
from framework import read, where, write, format_date, mapper_columns
from py_jftech import read, where, write, format_date, mapper_columns
from api import PortfoliosRisk
__COLUMNS__ = {
......
from datetime import datetime
from enum import Enum
from py_jftech import read, write, where, format_date, mapper_columns
from api import PortfoliosRisk, PortfoliosType
from framework import read, write, where, format_date, mapper_columns
__COLUMNS__ = {
'rmp_id': 'id',
......
import json
import logging
import pandas as pd
from py_jftech import (
component, autowired, get_config, next_workday, prev_workday, transaction, workday_range, format_date
)
from api import PortfoliosHolder, PortfoliosRisk, RebalanceRuler, Navs, SignalType, RoboExecutor, PortfoliosType
from framework import (
component, autowired, get_config, next_workday, filter_weekend,
prev_workday, transaction, workday_range, format_date, get_logger
)
from portfolios.dao import robo_hold_portfolios as rhp
from portfolios.utils import format_weight
logger = get_logger(__name__)
logger = logging.getLogger(__name__)
@component(bean_name='next-re')
......
import os
import sys
from logging import DEBUG
from logging import DEBUG, getLogger
import pandas as pd
from dateutil.relativedelta import relativedelta
from numpy import NAN
from py_jftech import component, autowired, get_config
from pyomo.environ import *
from api import SolverFactory as Factory, PortfoliosRisk, PortfoliosType, AssetPool, Navs, Solver, Datum
from framework import component, autowired, get_config, get_logger
from portfolios.utils import format_weight
logger = get_logger(__name__)
logger = getLogger(__name__)
def create_solver():
......
import logging
import unittest
from framework import autowired, parse_date, get_logger
from py_jftech import autowired, parse_date
from api import PortfoliosBuilder, PortfoliosType, PortfoliosRisk, PortfoliosHolder
class PortfoliosTest(unittest.TestCase):
logger = get_logger(__name__)
logger = logging.getLogger(__name__)
@autowired(names={'builder': 'poem'})
def test_poem_build_portfolio(self, builder: PortfoliosBuilder = None):
......
from abc import ABC, abstractmethod
from py_jftech import autowired
from api import RebalanceSignal, PortfoliosBuilder, PortfoliosRisk
from framework import autowired
from rebalance.dao import robo_rebalance_signal as rrs
......
from framework import read, write, where, format_date, mapper_columns
from py_jftech import read, write, where, format_date, mapper_columns
from api import SignalType, PortfoliosRisk
import json
from datetime import datetime
from enum import Enum
__COLUMNS__ = {
'rrs_id': 'id',
......
from framework import read, write, where, format_date, mapper_columns
from py_jftech import read, write, where, format_date, mapper_columns
from api import PortfoliosRisk
__COLUMNS__ = {
......
from py_jftech import component, autowired, get_config, workday_range, next_workday
from api import DriftSolver, PortfoliosRisk, PortfoliosBuilder, Datum, RoboExecutor
from framework import component, autowired, get_config, workday_range, filter_weekend, next_workday
from rebalance.dao import robo_rebalance_signal as rrs, robo_weight_drift as rwd
......
from framework import component, autowired, get_config, workday_range, next_workday
from api import RebalanceRuler, PortfoliosRisk, RebalanceSignal, SignalType, PortfoliosType, PortfoliosHolder
from typing import List
from py_jftech import component, autowired, get_config, workday_range, next_workday
from api import RebalanceRuler, PortfoliosRisk, RebalanceSignal, SignalType, PortfoliosType, PortfoliosHolder
from rebalance.dao import robo_rebalance_signal as rrs
......
......@@ -2,9 +2,9 @@ from abc import ABC
import pandas as pd
from dateutil.relativedelta import relativedelta
from py_jftech import get_config, autowired, component
from api import PortfoliosRisk, SignalType, Navs
from framework import get_config, autowired, component
from rebalance.base_signal import BaseRebalanceSignal
from rebalance.dao import robo_rebalance_signal as rrs
......
from py_jftech import component, autowired
from api import PortfoliosRisk, SignalType, Datum, PortfoliosHolder, DriftSolver
from framework import component, autowired, get_config
from rebalance.base_signal import BaseRebalanceSignal
from rebalance.dao import robo_rebalance_signal as rrs
......
import pandas as pd
from py_jftech import component, autowired, get_config
from api import PortfoliosBuilder, SignalType, PortfoliosRisk, Datum, DriftSolver
from framework import component, autowired, get_config, filter_weekend, next_workday, is_workday
from api import SignalType, PortfoliosRisk, DriftSolver
from rebalance.base_signal import BaseRebalanceSignal
from rebalance.dao import robo_weight_drift as rwd, robo_rebalance_signal as rrs
from rebalance.dao import robo_rebalance_signal as rrs
@component(bean_name='high-buy')
......
from py_jftech import component, autowired
from api import PortfoliosRisk, SignalType, RoboExecutor
from framework import component, filter_weekend, get_config, autowired
from rebalance.base_signal import BaseRebalanceSignal
from rebalance.dao import robo_rebalance_signal as rrs
......
import pandas as pd
from py_jftech import component, autowired, get_config
from scipy.stats import norm
from api import SignalType, PortfoliosRisk, Navs
from framework import component, autowired, get_config
from rebalance.base_signal import BaseRebalanceSignal
from rebalance.dao import robo_rebalance_signal as rrs
......
import logging
import unittest
from py_jftech import autowired, parse_date
from api import RebalanceSignal, PortfoliosRisk, RebalanceRuler
from framework import autowired, parse_date, get_logger
class RebalanceTest(unittest.TestCase):
logger = get_logger(__name__)
logger = logging.getLogger(__name__)
@autowired(names={'builder': 'crisis_one'})
def test_crisis_one(self, builder: RebalanceSignal = None):
......@@ -31,6 +33,9 @@ class RebalanceTest(unittest.TestCase):
def test_rebalance_builder(self, builder: RebalanceRuler = None):
builder.take_next_signal(parse_date('2022-09-01'), PortfoliosRisk.FT3)
def test_logger(self):
self.logger.info('123123')
if __name__ == '__main__':
unittest.main()
......@@ -7,6 +7,7 @@ numpy==1.23.4
pandas==1.5.1
pandas-datareader==0.10.0
ply==3.11
PyJFTech==1.0.0
PyMySQL==1.0.2
Pyomo==6.4.3
python-dateutil==2.8.2
......
import logging
import sys
import time
from datetime import datetime as dt
from enum import Enum, unique
import pandas as pd
from api import RoboExecutor, AssetRisk, Datum, AssetPool, PortfoliosBuilder, PortfoliosRisk, PortfoliosHolder, PortfoliosType
from framework import (
component, autowired, block_execute, get_config, get_logger, filter_weekend,
from py_jftech import (
component, autowired, block_execute, get_config, filter_weekend,
workday_range, format_date, prev_workday, parse_date
)
logger = get_logger(__name__)
from api import RoboExecutor, AssetRisk, Datum, AssetPool, PortfoliosBuilder, PortfoliosRisk, PortfoliosHolder, PortfoliosType
logger = logging.getLogger(__name__)
@unique
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment