feat: 月差模块独立+阴影带分位数法+季节性历史折线
- 新增 spread_from_db.py: 从MySQL动态计算月差,含标准月差对/主次月差对/历史同期序列 - 阴影带计算改为分位数+K缩放法: mid=Q50, low=Q(trim), high=Q(1-trim), widthScale=K/3 上下沿各自取自数据分布,天然跟随季节性方向 - 月差图支持历史同期折线: 同一月份组合的多条历史合约对序列独立绘制 - 月差图滚动平均平滑(smooth_window=5): 消除价差固有日度抖动,不影响评分 - OI筛选仅用于确定面板展示的月差对,历史序列不过滤持仓 - 前端支持K值/trim实时调节,scoreFromCurrentBand动态计算得分
This commit is contained in:
@@ -0,0 +1,619 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
月差数据库计算模块
|
||||
基于 MySQL contract_day 表动态计算月差对,取代静态 Excel 读取。
|
||||
|
||||
逻辑:
|
||||
- 以最新日期确定固定合约对(近月=持仓top3 OI不含远月,远月=最远OI>1万合约)
|
||||
- 历史序列只取该固定合约对的价差,确保同一合约对连续走势
|
||||
- 若标准月差对(OI>1万)无法形成,使用主次月差对(持仓前2名合约)
|
||||
- 每月差对独立评分(seasonal band_score),考虑生命周期窗口
|
||||
- 品种月差分 = 所有有效月差对得分平均
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
# 导入 MySQL 客户端
|
||||
SKILL_DIR = Path(os.path.expanduser("~/.workbuddy/skills/contract_day/scripts"))
|
||||
sys.path.insert(0, str(SKILL_DIR))
|
||||
from mysql_client import MysqlDLLClient
|
||||
|
||||
# ---- 品种代码映射 -----------------------------------------------------------
|
||||
|
||||
# 基本面指标确认中的品种名 -> 品种代码
|
||||
NAME_TO_PCODE: dict[str, str] = {
|
||||
"玉米": "c", "淀粉": "cs", "豆一": "a", "豆二": "b", "豆粕": "m",
|
||||
"豆油": "y", "棕榈油": "p", "鸡蛋": "jd", "生猪": "lh", "塑料": "l",
|
||||
"PVC": "v", "聚丙烯": "pp", "焦炭": "j", "焦煤": "jm", "铁矿": "i",
|
||||
"乙二醇": "eg", "苯乙烯": "eb", "液化气": "pg", "棉花": "cf",
|
||||
"白糖": "sr", "菜籽油": "oi", "菜籽粕": "rm", "苹果": "ap",
|
||||
"红枣": "cj", "花生": "pk", "PTA": "ta", "甲醇": "ma", "玻璃": "fg",
|
||||
"硅铁": "sf", "锰硅": "sm", "尿素": "ur", "纯碱": "sa", "短纤": "pf",
|
||||
"沪铜": "cu", "沪铝": "al", "沪锌": "zn", "沪铅": "pb", "沪镍": "ni",
|
||||
"沪锡": "sn", "沪金": "au", "沪银": "ag", "螺纹": "rb", "热卷": "hc",
|
||||
"不锈钢": "ss", "纸浆": "sp", "燃油": "fu", "沥青": "bu", "橡胶": "ru",
|
||||
"20号胶": "nr", "原油": "sc", "LU燃油": "lu", "IF": "if", "IC": "ic",
|
||||
"IM": "im", "IH": "ih", "二年债": "ts", "五年债": "tf", "十年债": "t",
|
||||
"工业硅": "si", "氧化铝": "ao", "碳酸锂": "lc", "三十年债": "tl",
|
||||
"BR橡胶": "br", "烧碱": "sh", "对二甲苯": "px", "欧线": "ec",
|
||||
"瓶片": "pr", "原木": "lg", "铂": "pt", "钯": "pd", "丙烯": "pl",
|
||||
"纯苯": "bz", "国际铜": "op", "动力煤": "zc",
|
||||
"多晶硅": "ps", "铝合金": "ad",
|
||||
}
|
||||
PCODE_TO_NAME: dict[str, str] = {v: k for k, v in NAME_TO_PCODE.items()}
|
||||
|
||||
|
||||
# ---- 数据库查询 -------------------------------------------------------------
|
||||
|
||||
def _fetch_one(db: MysqlDLLClient, p_code: str, start: str = "2018-01-01") -> pd.DataFrame:
|
||||
"""查询单品种全合约历史(内部函数,需传入已打开的 db)。"""
|
||||
try:
|
||||
rows = db.query(
|
||||
1,
|
||||
"SELECT times, contract, close, oi FROM contract_day "
|
||||
"WHERE p_code = ? AND times >= ? ORDER BY times, contract",
|
||||
[p_code, start],
|
||||
)
|
||||
except Exception:
|
||||
return pd.DataFrame(columns=["times", "contract", "close", "oi"])
|
||||
if not rows:
|
||||
return pd.DataFrame(columns=["times", "contract", "close", "oi"])
|
||||
df = pd.DataFrame(rows)
|
||||
df["times"] = pd.to_datetime(df["times"])
|
||||
df["close"] = pd.to_numeric(df["close"], errors="coerce")
|
||||
df["oi"] = pd.to_numeric(df["oi"], errors="coerce")
|
||||
return df
|
||||
|
||||
|
||||
def get_contract_lifecycles(df: pd.DataFrame) -> dict[str, tuple[pd.Timestamp, pd.Timestamp]]:
|
||||
"""获取每个合约的生命周期:(首次交易日, 最后交易日)。"""
|
||||
life = {}
|
||||
for contract, grp in df.groupby("contract"):
|
||||
life[contract] = (grp["times"].min(), grp["times"].max())
|
||||
return life
|
||||
|
||||
|
||||
# ---- 月差对计算 --------------------------------------------------------------
|
||||
|
||||
def compute_spread_pairs_for_date(date_df: pd.DataFrame) -> list[dict[str, Any]]:
|
||||
"""
|
||||
计算某一天的所有有效月差对。
|
||||
"""
|
||||
active = date_df[date_df["oi"] > 10000].copy()
|
||||
if len(active) < 2:
|
||||
return []
|
||||
|
||||
active = active.sort_values("contract")
|
||||
far_row = active.iloc[-1] # 远月:排序最靠后
|
||||
|
||||
near_candidates = active[active["contract"] != far_row["contract"]].copy()
|
||||
if near_candidates.empty:
|
||||
return []
|
||||
near_top3 = near_candidates.nlargest(min(3, len(near_candidates)), "oi")
|
||||
|
||||
pairs = []
|
||||
for _, near_row in near_top3.iterrows():
|
||||
pairs.append({
|
||||
"near": near_row["contract"],
|
||||
"far": far_row["contract"],
|
||||
"near_close": float(near_row["close"]),
|
||||
"far_close": float(far_row["close"]),
|
||||
"spread": float(near_row["close"]) - float(far_row["close"]),
|
||||
})
|
||||
return pairs
|
||||
|
||||
|
||||
def compute_fallback_pairs_for_date(date_df: pd.DataFrame) -> list[dict[str, Any]]:
|
||||
"""
|
||||
计算某一天的月差对(低门槛版本)。
|
||||
当标准月差对(OI>1万)无法形成时,取持仓前两名的合约按先后顺序形成近月-远月月差对。
|
||||
"""
|
||||
active = date_df[date_df["oi"] > 0].copy()
|
||||
if len(active) < 2:
|
||||
return []
|
||||
# 按持仓排序,取前两名
|
||||
top2 = active.nlargest(2, "oi")
|
||||
if len(top2) < 2:
|
||||
return []
|
||||
# 按合约代码排序(时间顺序),前者为近月,后者为远月
|
||||
top2_sorted = top2.sort_values("contract")
|
||||
near_row = top2_sorted.iloc[0]
|
||||
far_row = top2_sorted.iloc[1]
|
||||
return [{
|
||||
"near": near_row["contract"],
|
||||
"far": far_row["contract"],
|
||||
"near_close": float(near_row["close"]),
|
||||
"far_close": float(far_row["close"]),
|
||||
"spread": float(near_row["close"]) - float(far_row["close"]),
|
||||
}]
|
||||
|
||||
|
||||
# ---- 辅助函数:提取合约月份 -----------------------------------------------
|
||||
|
||||
def _extract_contract_month(contract: str) -> int | None:
|
||||
"""从合约代码提取月份。支持4位(如 RB2610→10)和3位(如 MA609→9)格式。"""
|
||||
m = _re.match(r"\D+(\d{2})(\d{2})$", contract)
|
||||
if m:
|
||||
return int(m.group(2))
|
||||
m = _re.match(r"\D+(\d)(\d{2})$", contract)
|
||||
if m:
|
||||
return int(m.group(2))
|
||||
return None
|
||||
|
||||
|
||||
def _extract_month_pair(near: str, far: str) -> tuple[int, int] | None:
|
||||
"""从近月/远月合约代码提取月份组合,如 (EG2609, EG2701) → (9, 1)。"""
|
||||
nm = _extract_contract_month(near)
|
||||
fm = _extract_contract_month(far)
|
||||
if nm is not None and fm is not None:
|
||||
return (nm, fm)
|
||||
return None
|
||||
|
||||
|
||||
def _extract_year_prefix(contract: str) -> int | None:
|
||||
"""从合约代码提取年份前缀(数字部分去掉末2位月份)。
|
||||
RB2610 → 26, MA609 → 6, SC2608 → 26。"""
|
||||
digits = "".join(c for c in contract if c.isdigit())
|
||||
if len(digits) < 3:
|
||||
return None
|
||||
try:
|
||||
return int(digits[:-2])
|
||||
except (ValueError, IndexError):
|
||||
return None
|
||||
|
||||
|
||||
def _year_gap(near: str, far: str) -> int | None:
|
||||
"""计算近月/远月合约的年份间隔,如 (SC2608, SC2609) → 0, (EG2609, EG2701) → 1。"""
|
||||
ny = _extract_year_prefix(near)
|
||||
fy = _extract_year_prefix(far)
|
||||
if ny is not None and fy is not None:
|
||||
return fy - ny
|
||||
return None
|
||||
|
||||
|
||||
# ---- 历史月差序列构建 --------------------------------------------------------
|
||||
|
||||
def build_historical_spreads(
|
||||
df: pd.DataFrame,
|
||||
latest_pairs: list[dict[str, Any]] | None = None,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""
|
||||
构建该品种所有有效月差对的历史时间序列(含往年同期月差对)。
|
||||
对每个最新月差对,提取月份组合(如9-1),然后扫描所有合约找出
|
||||
历史上同一月份组合的合约对,各自独立计算价差序列。
|
||||
返回每项含 hist_series 列表,每个元素含 pair_label / series / year。
|
||||
"""
|
||||
if not latest_pairs:
|
||||
return []
|
||||
|
||||
all_contracts = df["contract"].unique().tolist()
|
||||
start_year = 2021
|
||||
|
||||
result = []
|
||||
for i, pair in enumerate(latest_pairs):
|
||||
near_contract = pair["near"]
|
||||
far_contract = pair["far"]
|
||||
type_label = f"N{i+1}"
|
||||
pair_label = f"{near_contract} - {far_contract}"
|
||||
|
||||
# 提取月份组合
|
||||
month_pair = _extract_month_pair(near_contract, far_contract)
|
||||
if month_pair is None:
|
||||
continue
|
||||
|
||||
near_month, far_month = month_pair
|
||||
|
||||
# 提取年份间隔(关键过滤:只保留与最新对同年隔的历史对)
|
||||
ref_gap = _year_gap(near_contract, far_contract)
|
||||
|
||||
# 找出所有与该月份组合匹配的历史合约对
|
||||
hist_series_list = []
|
||||
for c1 in all_contracts:
|
||||
m1 = _extract_contract_month(c1)
|
||||
if m1 != near_month:
|
||||
continue
|
||||
for c2 in all_contracts:
|
||||
m2 = _extract_contract_month(c2)
|
||||
if m2 != far_month:
|
||||
continue
|
||||
# c1 应早于 c2(近月合约代码 ≤ 远月合约代码)
|
||||
if c1 >= c2:
|
||||
continue
|
||||
# 年份间隔过滤:只保留与最新月差对同年隔的合约对
|
||||
# 如最新对 SC2608-SC2609(0年隔),则排除 SC2108-SC2209(1年隔)等
|
||||
if ref_gap is not None:
|
||||
gap = _year_gap(c1, c2)
|
||||
if gap is not None and gap != ref_gap:
|
||||
continue
|
||||
# 取两个合约的价差序列
|
||||
near_data = df[df["contract"] == c1][["times", "close"]].rename(columns={"close": "near_close"})
|
||||
far_data = df[df["contract"] == c2][["times", "close"]].rename(columns={"close": "far_close"})
|
||||
merged = near_data.merge(far_data, on="times", how="inner")
|
||||
if merged.empty:
|
||||
continue
|
||||
merged = merged.sort_values("times")
|
||||
spread_series = pd.Series(
|
||||
(merged["near_close"] - merged["far_close"]).values,
|
||||
index=merged["times"],
|
||||
name="spread",
|
||||
)
|
||||
spread_series = spread_series[~spread_series.index.duplicated(keep="last")]
|
||||
if spread_series.dropna().empty:
|
||||
continue
|
||||
# 只保留 start_year 之后的数据
|
||||
spread_series = spread_series[spread_series.index >= f"{start_year}-01-01"]
|
||||
if spread_series.dropna().empty:
|
||||
continue
|
||||
# 推断年份:取序列中间日期的年份
|
||||
mid_idx = len(spread_series) // 2
|
||||
year_val = int(spread_series.index[mid_idx].year)
|
||||
hist_series_list.append({
|
||||
"pair_label": f"{c1} - {c2}",
|
||||
"series": spread_series,
|
||||
"year": year_val,
|
||||
})
|
||||
|
||||
# 按年份排序
|
||||
hist_series_list.sort(key=lambda x: x["year"])
|
||||
|
||||
if not hist_series_list:
|
||||
continue
|
||||
|
||||
result.append({
|
||||
"type_label": type_label,
|
||||
"pair_label": pair_label,
|
||||
"hist_series": hist_series_list,
|
||||
})
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ---- 使用主脚本的 band_score 进行评分 -------------------------------------
|
||||
|
||||
# band_score 函数由 generate_fundamental_dashboard.py 提供,这里导入复用
|
||||
# 若独立运行,使用内置简化版
|
||||
|
||||
try:
|
||||
from generate_fundamental_dashboard import band_score as _band_score
|
||||
_USE_EXTERNAL_BAND = True
|
||||
except ImportError:
|
||||
_USE_EXTERNAL_BAND = False
|
||||
|
||||
|
||||
def score_spread_pair(
|
||||
value: float,
|
||||
history: pd.Series,
|
||||
current_date: pd.Timestamp,
|
||||
inverse: bool = False,
|
||||
) -> dict[str, Any] | None:
|
||||
"""使用标准 band_score 对月差对评分。"""
|
||||
import math
|
||||
history = history.dropna()
|
||||
if history.empty:
|
||||
return None
|
||||
day = int(current_date.dayofyear)
|
||||
if _USE_EXTERNAL_BAND:
|
||||
result = _band_score(value, history[history.index < current_date], day, inverse)
|
||||
else:
|
||||
# 内置简化版
|
||||
result = _builtin_band_score(value, history, day, inverse)
|
||||
return result
|
||||
|
||||
|
||||
def _builtin_band_score(value: float, history: pd.Series, day: int, inverse: bool) -> dict | None:
|
||||
"""内置简化版 band_score(独立运行时使用)。使用分位数方法,与主模块一致。"""
|
||||
import math
|
||||
history = history.dropna().sort_index()
|
||||
if len(history) < 10:
|
||||
return None
|
||||
# day-of-year 窗口采样
|
||||
days = history.index.dayofyear.to_numpy()
|
||||
day_diff = np.abs(days - day)
|
||||
day_diff = np.minimum(day_diff, 366 - day_diff)
|
||||
sample = history[day_diff <= 15]
|
||||
if len(sample) < 8:
|
||||
sample = history[day_diff <= 30]
|
||||
if len(sample) < 4:
|
||||
return None
|
||||
sample = sample.sort_values()
|
||||
trim_n = int(len(sample) * 0.1)
|
||||
trimmed_s = sample.iloc[trim_n : len(sample) - trim_n] if trim_n > 0 else sample
|
||||
mid = float(trimmed_s.quantile(0.50))
|
||||
tail = max(0.02, min(0.45, 0.10))
|
||||
q_low = float(trimmed_s.quantile(tail))
|
||||
q_high = float(trimmed_s.quantile(1 - tail))
|
||||
width_scale = max(0.25, min(4, 3.0 / 3))
|
||||
low = mid - (mid - q_low) * width_scale
|
||||
high = mid + (q_high - mid) * width_scale
|
||||
if high == low:
|
||||
score = 50.0
|
||||
elif inverse:
|
||||
score = 100.0 if value <= low else (0.0 if value >= high else 100.0 - (value - low) / (high - low) * 100)
|
||||
else:
|
||||
score = 100.0 if value >= high else (0.0 if value <= low else (value - low) / (high - low) * 100)
|
||||
below = float((sample < value).sum())
|
||||
equal = float((sample == value).sum())
|
||||
raw_pct = 100.0 * (below + 0.5 * equal) / len(sample)
|
||||
return {
|
||||
"score": round(max(0.0, min(100.0, score)), 1),
|
||||
"raw_percentile": round(raw_pct, 1),
|
||||
"band_low": round(low, 4),
|
||||
"band_high": round(high, 4),
|
||||
"band_mean": round(mid, 4),
|
||||
}
|
||||
|
||||
|
||||
# ---- 月差图表时间序列(用于 seasonal_percentile 兼容) ---------------------
|
||||
|
||||
def build_spread_seasonal_data(
|
||||
combined_series: pd.Series,
|
||||
) -> dict[str, Any] | None:
|
||||
"""将合并月差序列转为与 seasonal_percentile() 兼容的格式。"""
|
||||
s = combined_series.dropna().sort_index()
|
||||
if s.empty:
|
||||
return None
|
||||
latest_date = s.index.max()
|
||||
latest_value = float(s.loc[latest_date])
|
||||
yearly: dict[str, list[tuple[str, float]]] = {}
|
||||
clipped = s[s.index.year >= 2021]
|
||||
for dt, val in clipped.items():
|
||||
yearly.setdefault(str(int(dt.year)), []).append((dt.strftime("%m-%d"), float(val)))
|
||||
return {
|
||||
"latest_date": latest_date.strftime("%Y-%m-%d"),
|
||||
"latest_value": latest_value,
|
||||
"years": yearly,
|
||||
}
|
||||
|
||||
|
||||
def build_spread_seasonal_data_multi(
|
||||
hist_series_list: list[dict[str, Any]],
|
||||
) -> dict[str, Any] | None:
|
||||
"""
|
||||
将多月差历史序列转为前端绘图格式。
|
||||
years 的键用年份字符串(如 "2026"),同时提供 year_labels 映射
|
||||
年份→合约对标签(如 "2026" → "EG2609-EG2701")。
|
||||
"""
|
||||
if not hist_series_list:
|
||||
return None
|
||||
|
||||
yearly: dict[str, list[tuple[str, float]]] = {}
|
||||
year_labels: dict[str, str] = {}
|
||||
latest_date = None
|
||||
latest_value = None
|
||||
|
||||
for hs in hist_series_list:
|
||||
s = hs["series"].dropna().sort_index()
|
||||
if s.empty:
|
||||
continue
|
||||
year_str = str(hs["year"])
|
||||
# 图例只保留年份:EG2609-EG2701 → 26-27,MA609-MA701 → 6-7
|
||||
pair_label = hs["pair_label"].replace(" ", "")
|
||||
parts = pair_label.split("-")
|
||||
short_parts = ["".join(c for c in p if c.isdigit())[:-2] for p in parts]
|
||||
short_label = "-".join(short_parts)
|
||||
year_labels[year_str] = short_label
|
||||
for dt, val in s.items():
|
||||
yearly.setdefault(year_str, []).append((dt.strftime("%m-%d"), float(val)))
|
||||
# 记录最新值(取最末序列的末尾值)
|
||||
if latest_date is None or s.index.max() > latest_date:
|
||||
latest_date = s.index.max()
|
||||
latest_value = float(s.loc[latest_date])
|
||||
|
||||
if not yearly:
|
||||
return None
|
||||
|
||||
return {
|
||||
"latest_date": latest_date.strftime("%Y-%m-%d") if latest_date else "",
|
||||
"latest_value": latest_value,
|
||||
"years": yearly,
|
||||
"year_labels": year_labels,
|
||||
}
|
||||
|
||||
|
||||
# ---- 从 pair_label 提取横坐标范围 --------------------------------------------
|
||||
|
||||
import re as _re
|
||||
|
||||
def _extract_x_range(pair_label: str) -> tuple[str, str] | None:
|
||||
"""
|
||||
从月差对标签提取横坐标范围,如 "RB2610 - RB2705" → ("05-15", "10-15")。
|
||||
规则:x_start = 远月合约月份15日,x_end = 近月合约月份15日。
|
||||
支持4位数字(大商所/上期所/中金所)和3位数字(郑商所)格式。
|
||||
"""
|
||||
# 4位数字格式(同品种代码): e.g., RB2610 - RB2705
|
||||
m = _re.match(r"(\D+)(\d{2})(\d{2})\s*-\s*\1(\d{2})(\d{2})", pair_label)
|
||||
if m:
|
||||
near_month = int(m.group(3))
|
||||
far_month = int(m.group(5))
|
||||
return (f"{far_month:02d}-15", f"{near_month:02d}-15")
|
||||
# 3位数字格式(郑商所,同品种代码): e.g., MA609 - MA701
|
||||
m = _re.match(r"(\D+)(\d)(\d{2})\s*-\s*\1(\d)(\d{2})", pair_label)
|
||||
if m:
|
||||
near_month = int(m.group(3))
|
||||
far_month = int(m.group(5))
|
||||
return (f"{far_month:02d}-15", f"{near_month:02d}-15")
|
||||
# 4位数字格式(品种代码不同): e.g., IF2612 - IH2703
|
||||
m = _re.match(r"(\D+)(\d{2})(\d{2})\s*-\s*(\D+)(\d{2})(\d{2})", pair_label)
|
||||
if m:
|
||||
near_month = int(m.group(3))
|
||||
far_month = int(m.group(6))
|
||||
return (f"{far_month:02d}-15", f"{near_month:02d}-15")
|
||||
# 3位数字格式(品种代码不同)
|
||||
m = _re.match(r"(\D+)(\d)(\d{2})\s*-\s*(\D+)(\d)(\d{2})", pair_label)
|
||||
if m:
|
||||
near_month = int(m.group(3))
|
||||
far_month = int(m.group(6))
|
||||
return (f"{far_month:02d}-15", f"{near_month:02d}-15")
|
||||
return None
|
||||
|
||||
|
||||
# ---- 主入口函数(批量模式)--------------------------------------------------
|
||||
|
||||
def read_spread_from_db_batch(
|
||||
variety_names: list[str],
|
||||
start_date: str = "2018-01-01",
|
||||
) -> dict[str, dict[str, Any]]:
|
||||
"""
|
||||
批量从 MySQL 读取所有品种月差数据并评分(复用同一 DB 连接)。
|
||||
|
||||
返回 {品种名: {score, pair_scores, series, latest_date, latest_pairs}}
|
||||
"""
|
||||
result: dict[str, dict[str, Any]] = {}
|
||||
|
||||
with MysqlDLLClient() as db:
|
||||
# 先查出全局最新日期
|
||||
try:
|
||||
max_date_rows = db.query(
|
||||
1,
|
||||
"SELECT MAX(times) as max_dt FROM contract_day WHERE times >= ?",
|
||||
["2026-01-01"],
|
||||
)
|
||||
global_latest = pd.Timestamp(max_date_rows[0]["max_dt"]) if max_date_rows else None
|
||||
except Exception:
|
||||
global_latest = None
|
||||
|
||||
for name in variety_names:
|
||||
p_code = NAME_TO_PCODE.get(name)
|
||||
if not p_code:
|
||||
continue
|
||||
|
||||
try:
|
||||
raw = _fetch_one(db, p_code, start_date)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if raw.empty:
|
||||
continue
|
||||
|
||||
# 确定最新日期
|
||||
latest_date_actual = global_latest or raw["times"].max()
|
||||
latest_df = raw[raw["times"] == latest_date_actual]
|
||||
if latest_df.empty:
|
||||
for d in sorted(raw["times"].unique(), reverse=True):
|
||||
if not raw[raw["times"] == d].empty:
|
||||
latest_date_actual = d
|
||||
latest_df = raw[raw["times"] == d]
|
||||
break
|
||||
|
||||
if latest_df.empty:
|
||||
continue
|
||||
|
||||
latest_pairs = compute_spread_pairs_for_date(latest_df)
|
||||
# 标准月差对(OI>1万)无法形成时,使用主次月差对(持仓前2名)
|
||||
if not latest_pairs:
|
||||
latest_pairs = compute_fallback_pairs_for_date(latest_df)
|
||||
hist_spreads = build_historical_spreads(raw, latest_pairs)
|
||||
|
||||
# 对每月差对评分(最多3对)
|
||||
pair_scores = []
|
||||
valid_scores = []
|
||||
for hs in hist_spreads[:3]:
|
||||
# 从 hist_series 中找最新合约对(最后一个)的最新值
|
||||
latest_hist = None
|
||||
for h in reversed(hs["hist_series"]):
|
||||
if latest_date_actual in h["series"].index:
|
||||
latest_hist = h
|
||||
break
|
||||
if latest_hist is None:
|
||||
continue
|
||||
latest_val = latest_hist["series"].loc[latest_date_actual]
|
||||
if pd.isna(latest_val):
|
||||
continue
|
||||
|
||||
# 评分:合并所有历史序列作为评分参考
|
||||
all_series = pd.concat([h["series"] for h in hs["hist_series"]]).sort_index()
|
||||
all_series = all_series[~all_series.index.duplicated(keep="last")]
|
||||
|
||||
sr = score_spread_pair(
|
||||
value=latest_val,
|
||||
history=all_series,
|
||||
current_date=latest_date_actual,
|
||||
inverse=False,
|
||||
)
|
||||
if sr is None:
|
||||
continue
|
||||
# 统一四舍五入
|
||||
sr_rounded = {k: (round(v, 1) if isinstance(v, float) else v) for k, v in sr.items()}
|
||||
pair_entry = {
|
||||
"pair_label": hs["pair_label"],
|
||||
"type_label": hs["type_label"],
|
||||
"latest_value": round(float(latest_val), 4),
|
||||
**sr_rounded,
|
||||
}
|
||||
# 每个月差对独立生成季节性图数据(多历史序列)
|
||||
pair_seasonal = build_spread_seasonal_data_multi(hs["hist_series"])
|
||||
if pair_seasonal:
|
||||
pair_entry["years"] = pair_seasonal["years"]
|
||||
if pair_seasonal.get("year_labels"):
|
||||
pair_entry["year_labels"] = pair_seasonal["year_labels"]
|
||||
# 从 pair_label 提取合约月份,计算横坐标范围
|
||||
x_range = _extract_x_range(hs["pair_label"])
|
||||
if x_range:
|
||||
pair_entry["x_start"] = x_range[0]
|
||||
pair_entry["x_end"] = x_range[1]
|
||||
pair_scores.append(pair_entry)
|
||||
if sr_rounded.get("score") is not None:
|
||||
valid_scores.append(sr_rounded["score"])
|
||||
|
||||
total_score = round(float(np.mean(valid_scores)), 1) if valid_scores else None
|
||||
|
||||
# 合并月差序列(取中位数)
|
||||
combined_series = _combine_spread_series(hist_spreads)
|
||||
if not combined_series.empty:
|
||||
combined_series = combined_series[combined_series.index >= "2021-01-01"]
|
||||
|
||||
result[name] = {
|
||||
"score": total_score,
|
||||
"pair_scores": pair_scores,
|
||||
"series": combined_series,
|
||||
"latest_date": latest_date_actual.strftime("%Y-%m-%d") if latest_date_actual else "",
|
||||
"latest_pairs": [
|
||||
{"near": p["near"], "far": p["far"], "spread": round(p["spread"], 4)}
|
||||
for p in latest_pairs
|
||||
],
|
||||
}
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _combine_spread_series(hist_spreads: list[dict]) -> pd.Series:
|
||||
"""合并多月差序列(取中位数)。适配 hist_series 新格式。"""
|
||||
if not hist_spreads:
|
||||
return pd.Series(dtype=float)
|
||||
dfs = []
|
||||
for hs in hist_spreads:
|
||||
# 新格式:hist_series 列表
|
||||
for h in hs.get("hist_series", []):
|
||||
s = h["series"].rename(hs["type_label"])
|
||||
dfs.append(s)
|
||||
if not dfs:
|
||||
return pd.Series(dtype=float)
|
||||
combined = pd.concat(dfs, axis=1)
|
||||
combined["median"] = combined.median(axis=1, skipna=True)
|
||||
return combined["median"]
|
||||
|
||||
|
||||
# ---- 测试入口 ----------------------------------------------------------------
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
sys.stdout.reconfigure(encoding="utf-8")
|
||||
result = read_spread_from_db_batch(["螺纹", "沪铜", "沪金"])
|
||||
for name, data in result.items():
|
||||
print(f"\n{'='*60}")
|
||||
print(f"品种: {name} 总得分: {data['score']}")
|
||||
print(f"最新日期: {data['latest_date']}")
|
||||
print(f"月差对:")
|
||||
for ps in data["pair_scores"]:
|
||||
print(f" {ps['pair_label']}: 值={ps['latest_value']}, 分={ps['score']}, "
|
||||
f"区间=[{ps['band_low']}, {ps['band_high']}]")
|
||||
Reference in New Issue
Block a user