Commit 0857edb9 authored by jichao's avatar jichao

使用异步框架

parent 19acecb9
......@@ -65,6 +65,7 @@ CREATE TABLE IF NOT EXISTS `robo_index_datas`
`rid_low` DOUBLE DEFAULT NULL COMMENT '最高价',
`rid_close` DOUBLE NOT NULL COMMENT '收盘价',
`rid_pe` DOUBLE DEFAULT NULL COMMENT '市盈率',
`rid_pb` DOUBLE DEFAULT NULL COMMENT '市净率',
`rid_volume` DOUBLE DEFAULT NULL COMMENT '成交量',
`rid_create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`rid_update_time` DATETIME DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
......@@ -72,5 +73,3 @@ CREATE TABLE IF NOT EXISTS `robo_index_datas`
INDEX (`rid_date`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT '指标数据表';
from py_jftech import read, write, format_date, to_tuple, where
from py_jftech import read, write, format_date, to_tuple, where, mapper_columns
__COLUMNS__ = {
'rid_index_id': 'index_id',
......@@ -8,13 +8,14 @@ __COLUMNS__ = {
'rid_low': 'low',
'rid_close': 'close',
'rid_pe': 'pe',
'rid_pb': 'pb',
'rid_volume': 'volume',
}
@write
def insert(datas):
def batch_insert(datas):
values = ','.join([f'''({','.join([(f"'{x[j]}'" if j in x and x[j] is not None else 'null') for j in __COLUMNS__.keys()])})''' for x in datas])
return f'''insert into robo_index_datas({','.join(__COLUMNS__.keys())}) values {values}'''
......@@ -46,3 +47,11 @@ def get_last(index_id, max_date, count=1):
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_index_datas
{where(f"rid_date <= '{format_date(max_date)}'", rid_index_id=index_id)} order by rid_date desc limit {count}
'''
@read(one=True)
def get_one(index_id, date):
return f'''
select {','.join([f"{x[0]} as {x[1]}" for x in __COLUMNS__.items()])} from robo_index_datas
{where(rid_index_id=index_id, rid_date=date)}
'''
from py_jftech import read, write, where, parse_date, format_date, is_workday
from urllib.parse import quote
from basic.dao import robo_base_datum as rbd, robo_index_datas as rid
from api import DatumType
import requests
from datetime import datetime as dt
import logging
logger = logging.getLogger(__name__)
URL = {
'INDEX': 'http://jdcprod.thiztech.com/api/datas/index-value?page={}&size=200&sourceCode={}&sourceType=BLOOMBERG&startDate={}',
'ECO': 'http://jdcprod.thiztech.com/api/datas/eco-value?page={}&size=200&sourceCode={}&sourceType=BLOOMBERG&startDate={}'
}
TICKERS = {
'SPX Index': 'INDEX',
'USGG10YR Index': 'INDEX',
'USGG2YR Index': 'INDEX',
'MXWO Index': 'INDEX',
'MXWD Index': 'INDEX',
'CCMP Index': 'INDEX',
'TWSE Index': 'INDEX',
'CPI YOY Index': 'ECO',
'FDTR Index': 'ECO',
'CPURNSA Index': 'ECO',
}
def sync_index():
for ticker, type in TICKERS.items():
logger.info(f"start sync {ticker}")
datum = rbd.get_base_datums(type=DatumType.INDEX, ticker=ticker)[0]
url: str = URL[type]
page = 0
start = parse_date('2007-01-01')
while True:
req_url = url.format(page, quote(ticker), format_date(start))
response = requests.get(req_url).json()
if not response['success']:
raise Exception(f'''request indictor failed: {response['status']}''')
try:
save_close = [{
'rid_index_id': datum['id'],
'rid_date': dt.fromtimestamp(x['date'] / 1000).strftime('%Y-%m-%d'),
'rid_close': x['close'],
'rid_open': x['open'] if 'open' in x else None,
'rid_high': x['high'] if 'high' in x else None,
'rid_low': x['low'] if 'low' in x else None,
'rid_pe': x['peRatio'] if 'peRatio' in x else None,
'rid_pb': x['pbRatio'] if 'pbRatio' in x else None,
'rid_volume': x['volume'] if 'volume' in x else None,
} for x in response['body']['content'] if is_workday(dt.fromtimestamp(x['date'] / 1000)) and 'close' in x]
rid.batch_insert(save_close)
except Exception as e:
logger.exception(req_url)
raise e
if response['body']['last']:
break
else:
page += 1
if __name__ == '__main__':
sync_index()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment