import datetime as dt import json import logging import os from statistics import pstdev from tempfile import TemporaryDirectory import pandas as pd import uvicorn from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.date import DateTrigger from dateutil.relativedelta import relativedelta from empyrical import sharpe_ratio, annual_volatility, annual_return from fastapi import FastAPI, Request from py_jftech import prev_workday, filter_weekend, autowired, next_workday, sendmail, format_date, get_config from starlette.responses import JSONResponse import main from api import DatumType, PortfoliosRisk, Datum, RoboReportor app = FastAPI() # 创建 AsyncIOScheduler 实例 scheduler = AsyncIOScheduler() REC_GID = get_config('web.guid') fund_infos, cp, roi, risk = None, None, None, None async def send_email(): with TemporaryDirectory() as tmpdir: filepath = os.path.join(tmpdir, "portfolio.json") with open(filepath, "w", encoding='utf-8') as file: recommends = await recommend() json.dump(recommends, file, indent=4, ensure_ascii=False) email = get_config('reports.exports.real-daily.email') receives = email['receives'] copies = email['copies'] if 'copies' in email and email['copies'] is not None else [] subject = email['subject']['rebalance'].format(today=format_date(dt.date.today())) content = email['content']['rebalance'] sendmail(receives=receives, copies=copies, subject=subject, content=content, attach_paths=[filepath]) def get_today_rec(): from portfolios.dao import robo_mpt_portfolios as rmp from api import PortfoliosType, PortfoliosRisk day = prev_workday(filter_weekend(dt.date.today())) portfolio = rmp.get_one(day, PortfoliosType.NORMAL, PortfoliosRisk.FT3) return portfolio def get_last_signal(): from rebalance.dao import robo_rebalance_signal as rrs day = prev_workday(filter_weekend(dt.date.today())) last_re = rrs.get_last_one(max_date=day, risk=PortfoliosRisk.FT3, effective=True) return last_re @autowired def get_fund_infos(datum: Datum = None): global fund_infos fund_infos = datum.get_datums(DatumType.FUND) @autowired(names={'combo': 'hold-report'}) def load_report(max_date=prev_workday(dt.date.today()), min_date=None, combo: RoboReportor = None): global cp, roi, risk datas = pd.DataFrame(combo.load_report(max_date=max_date, min_date=min_date)) datas.set_index('date', inplace=True) datas = datas['acc_av'] returns = round(datas.pct_change(), 5) roi = round(annual_return(returns), 1) risk = round(annual_volatility(returns), 1) cp = roi / risk return cp, roi, risk @app.get("/franklin/recommend") async def recommend(): sig = get_last_signal() if sig: if not fund_infos: get_fund_infos() id_ticker_map = {str(info['id']): info for info in fund_infos} funds = json.loads(sig['portfolio']) rec_list = [] portfolios = {'recomm_guid': REC_GID} load_report(min_date=prev_workday(dt.date.today()) - relativedelta(years=1)) data = {'recomm_guid': REC_GID, 'data_date': sig['create_time'].strftime('%Y-%m-%d'), 'funds': [{'weight': round(weight * 100), 'fund_id': id_ticker_map[key]['ftTicker']} for key, weight in funds.items()], 'creat_date': sig['create_time'].strftime('%Y-%m-%d %H:%M:%S'), 'risk': risk, 'rr': round(sum([id_ticker_map[key]['risk'] * weight for key, weight in funds.items()]), 2), 'cp': cp, 'roi': roi} note = {'recomman_reason': "recommend reason is ..."} data['note'] = json.dumps(note) # 计算股债比 stock_weight = int( sum(weight * 100 for key, weight in funds.items() if id_ticker_map[key]['category'] == 'STOCK')) data["p_note"] = f"{stock_weight}:{100 - stock_weight}" portfolios['data'] = data rec_list.append(portfolios) return rec_list else: return {'msg': '当日投组未产生,待10:30后获取'} # 其他异常处理程序 @app.exception_handler(Exception) async def general_exception_handler(request: Request, exc: Exception): # 打印一般错误信息 logging.error(f"请求 {request.url} 发生未知错误: {str(exc)}") return JSONResponse( status_code=500, content={"errorCode": "500", "errorMsg": str(exc)}, ) # 定义应用启动事件 @app.on_event("startup") async def startup_event(): # 异常情况可以重启跑当天投组 current_time = dt.datetime.now() target_time = dt.time(10, 20) if current_time.time() > target_time: scheduler.add_job(main.start, trigger=DateTrigger(run_date=current_time)) # await send_email() # 开启定时任务,执行实盘 scheduler.add_job(main.start, 'cron', day_of_week='0-4', hour=10, minute=20) scheduler.add_job(send_email, 'cron', day_of_week='0-4', hour=10, minute=25) scheduler.start() if __name__ == "__main__": uvicorn.run("robo_controller:app", host="0.0.0.0", port=get_config('web.port'))