import os from datetime import datetime, timedelta from pathlib import Path import pandas as pd import pytz import requests from openpyxl.reader.excel import load_workbook from py_jftech import sendmail def is_dst(): """ 判断当前时区是否实行夏令时 @return: """ tz = pytz.timezone('America/New_York') now = datetime.now(tz) return now.dst() != timedelta(0) def usa_close_day(): """ 美股收盘时间,收盘后,日期+1天 @return: """ tz = pytz.timezone('America/New_York') now = datetime.now(tz) if is_dst(): # 夏令时 if now.hour > 16: now = now + timedelta(1) else: # 冬令时 if now.hour > 17: now = now + timedelta(1) return now.strftime("%Y%m%d") def get_quarter_end_date(date=None): """ @return: 当前日期所在季度的最后一天日期 """ # 获取当前日期 if date is None: date = datetime.now() # 计算当前季度 current_quarter = (date.month - 1) // 3 + 1 # 计算季度的最后一个月份 if current_quarter == 1: quarter_end_month = 3 elif current_quarter == 2: quarter_end_month = 6 elif current_quarter == 3: quarter_end_month = 9 else: quarter_end_month = 12 # 计算季度末的日期 quarter_end_date = datetime(date.year, quarter_end_month, 1) + timedelta(days=31) - timedelta( days=datetime(date.year, quarter_end_month, 1).day) return quarter_end_date def list_files_sorted_by_name(directory, max_day=None): """ 文件排序 @param directory: 所在目录 @param max_day: 期望最大日期 @return: 返回日期小于max_day的所有文件 """ files = [] for root, dirs, filenames in os.walk(directory): for filename in filenames: files.append(os.path.join(root, filename)) files.sort() # 默认是按照字典序排序 if max_day: files = [f for f in files[:-1] if str(f)[-13:-5] >= max_day.strftime("%Y%m%d")] return files def fetch_sp500(): temp_file = Path(__file__).parent / 'resources/sp-500.xlsx' response = requests.get("https://www.spglobal.com/spdji/en/documents/additional-material/sp-500-eps-est.xlsx") # 确保请求成功 if response.status_code == 200: # 保存临时文件 with open(temp_file, 'wb') as f: f.write(response.content) else: print(f"Failed to retrieve file: {response.status_code}") def save_sp500(): fetch_sp500() files = list_files_sorted_by_name(Path(__file__).parent / 'resources')[-2:] compare_day = None for file in files: # 使用openpyxl加载Excel文件 wb = load_workbook(filename=file, data_only=True) ws = wb['ESTIMATES&PEs'] # 读取特定单元格的值 report_day = ws['A2'].value if compare_day is None: compare_day = report_day else: if compare_day != report_day: new_path = Path(__file__).parent / f'resources/sp-500-eps-est_USA{usa_close_day()}.xlsx' wb.save(new_path) send_sp500('download sp500.', [new_path]) # 关闭工作簿 wb.close() def sync_sp500(day): file = Path(__file__).parent / 'resources/sp-500-eps-est_USA20241014.xlsx' if day: files = list_files_sorted_by_name(Path(__file__).parent / 'resources', day) if files: file = files[0] else: return [] wb = load_workbook(filename=file, data_only=True) ws = wb['ESTIMATES&PEs'] estimates = "ESTIMATES" estimates_row = 0 actuals = "ACTUALS" actuals_row = 0 datas = [] # 遍历A列 for row in range(100, 300): cell_value = ws[f'A{row}'].value if cell_value and estimates == str(cell_value): estimates_row = row if cell_value and actuals == str(cell_value): actuals_row = row break report_day = ws['A2'].value for i in range(estimates_row + 1, actuals_row): if ws[f'A{i}'].value is None: break date_value = datetime.strptime(str(ws[f'A{i}'].value).split(' ')[0].strip(), '%m/%d/%Y') if type( ws[f'A{i}'].value) == str else ws[f'A{i}'].value if date_value < report_day: # 日期只要是季度首日也设置red_date = red_release_date data = {'date': date_value, 'eps': ws[f'C{i}'].value} data["releaseDate"] = data['date'] + timedelta(days=1) data["date"] = data['releaseDate'] datas.append(data) elif date_value == get_quarter_end_date(report_day): # 如果发布日是季度末,则red_date = red_release_date if report_day == get_quarter_end_date(report_day): data = {'date': date_value, 'eps': ws[f'C{i}'].value} data["releaseDate"] = data['date'] + timedelta(days=1) data["date"] = data['releaseDate'] else: data = {'date': report_day, 'eps': ws[f'C{i}'].value, 'releaseDate': datetime.strptime(str(file)[-13:-5], "%Y%m%d")} datas.append(data) for i in range(actuals_row + 1, ws.max_row): if ws[f'A{i}'].value is None: break data = {'date': datetime.strptime(str(ws[f'A{i}'].value).strip(), '%m/%d/%Y') if type( ws[f'A{i}'].value) == str else ws[f'A{i}'].value, 'eps': ws[f'C{i}'].value} data["releaseDate"] = data['date'] + timedelta(days=1) data["date"] = data['releaseDate'] datas.append(data) wb.close() datas = pd.DataFrame(datas[::-1]) datas['close'] = datas['eps'].rolling(window=4).sum().round(2) datas.dropna(inplace=True) return datas.to_dict(orient="records")[-1::] if day else datas.to_dict(orient="records") def send_sp500(content, attach_paths): receives = ['Tony.Wu.Home@gmail.com'] subject = 'sp500 eps download' sendmail(receives=receives, copies=[], attach_paths=attach_paths, subject=subject, content=content) if __name__ == '__main__': # print(list_files_sorted_by_name(Path(__file__).parent / 'resources')) # save_sp500() sync_sp500(day=None)