# -*- coding: utf-8 -*- """ 月差数据库计算模块 基于 MySQL contract_day 表动态计算月差对,取代静态 Excel 读取。 逻辑: - 以最新日期确定固定合约对(近月=持仓top3 OI不含远月,远月=最远OI>1万合约) - 历史序列只取该固定合约对的价差,确保同一合约对连续走势 - 若标准月差对(OI>1万)无法形成,使用主次月差对(持仓前2名合约) - 每月差对独立评分(seasonal band_score),考虑生命周期窗口 - 品种月差分 = 所有有效月差对得分平均 """ from __future__ import annotations import sys import os import json from datetime import datetime from pathlib import Path from typing import Any import numpy as np import pandas as pd # 导入 MySQL 客户端 SKILL_DIR = Path(os.path.expanduser("~/.workbuddy/skills/contract_day/scripts")) sys.path.insert(0, str(SKILL_DIR)) from mysql_client import MysqlDLLClient # ---- 品种代码映射 ----------------------------------------------------------- # 基本面指标确认中的品种名 -> 品种代码 NAME_TO_PCODE: dict[str, str] = { "玉米": "c", "淀粉": "cs", "豆一": "a", "豆二": "b", "豆粕": "m", "豆油": "y", "棕榈油": "p", "鸡蛋": "jd", "生猪": "lh", "塑料": "l", "PVC": "v", "聚丙烯": "pp", "焦炭": "j", "焦煤": "jm", "铁矿": "i", "乙二醇": "eg", "苯乙烯": "eb", "液化气": "pg", "棉花": "cf", "白糖": "sr", "菜籽油": "oi", "菜籽粕": "rm", "苹果": "ap", "红枣": "cj", "花生": "pk", "PTA": "ta", "甲醇": "ma", "玻璃": "fg", "硅铁": "sf", "锰硅": "sm", "尿素": "ur", "纯碱": "sa", "短纤": "pf", "沪铜": "cu", "沪铝": "al", "沪锌": "zn", "沪铅": "pb", "沪镍": "ni", "沪锡": "sn", "沪金": "au", "沪银": "ag", "螺纹": "rb", "热卷": "hc", "不锈钢": "ss", "纸浆": "sp", "燃油": "fu", "沥青": "bu", "橡胶": "ru", "20号胶": "nr", "原油": "sc", "LU燃油": "lu", "IF": "if", "IC": "ic", "IM": "im", "IH": "ih", "二年债": "ts", "五年债": "tf", "十年债": "t", "工业硅": "si", "氧化铝": "ao", "碳酸锂": "lc", "三十年债": "tl", "BR橡胶": "br", "烧碱": "sh", "对二甲苯": "px", "欧线": "ec", "瓶片": "pr", "原木": "lg", "铂": "pt", "钯": "pd", "丙烯": "pl", "纯苯": "bz", "国际铜": "op", "动力煤": "zc", "多晶硅": "ps", "铝合金": "ad", } PCODE_TO_NAME: dict[str, str] = {v: k for k, v in NAME_TO_PCODE.items()} # ---- 数据库查询 ------------------------------------------------------------- def _fetch_one(db: MysqlDLLClient, p_code: str, start: str = "2018-01-01") -> pd.DataFrame: """查询单品种全合约历史(内部函数,需传入已打开的 db)。""" try: rows = db.query( 1, "SELECT times, contract, close, oi FROM contract_day " "WHERE p_code = ? AND times >= ? ORDER BY times, contract", [p_code, start], ) except Exception: return pd.DataFrame(columns=["times", "contract", "close", "oi"]) if not rows: return pd.DataFrame(columns=["times", "contract", "close", "oi"]) df = pd.DataFrame(rows) df["times"] = pd.to_datetime(df["times"]) df["close"] = pd.to_numeric(df["close"], errors="coerce") df["oi"] = pd.to_numeric(df["oi"], errors="coerce") return df def get_contract_lifecycles(df: pd.DataFrame) -> dict[str, tuple[pd.Timestamp, pd.Timestamp]]: """获取每个合约的生命周期:(首次交易日, 最后交易日)。""" life = {} for contract, grp in df.groupby("contract"): life[contract] = (grp["times"].min(), grp["times"].max()) return life # ---- 月差对计算 -------------------------------------------------------------- def compute_spread_pairs_for_date(date_df: pd.DataFrame) -> list[dict[str, Any]]: """ 计算某一天的所有有效月差对。 """ active = date_df[date_df["oi"] > 10000].copy() if len(active) < 2: return [] active = active.sort_values("contract") far_row = active.iloc[-1] # 远月:排序最靠后 near_candidates = active[active["contract"] != far_row["contract"]].copy() if near_candidates.empty: return [] near_top3 = near_candidates.nlargest(min(3, len(near_candidates)), "oi") pairs = [] for _, near_row in near_top3.iterrows(): pairs.append({ "near": near_row["contract"], "far": far_row["contract"], "near_close": float(near_row["close"]), "far_close": float(far_row["close"]), "spread": float(near_row["close"]) - float(far_row["close"]), }) return pairs def compute_fallback_pairs_for_date(date_df: pd.DataFrame) -> list[dict[str, Any]]: """ 计算某一天的月差对(低门槛版本)。 当标准月差对(OI>1万)无法形成时,取持仓前两名的合约按先后顺序形成近月-远月月差对。 """ active = date_df[date_df["oi"] > 0].copy() if len(active) < 2: return [] # 按持仓排序,取前两名 top2 = active.nlargest(2, "oi") if len(top2) < 2: return [] # 按合约代码排序(时间顺序),前者为近月,后者为远月 top2_sorted = top2.sort_values("contract") near_row = top2_sorted.iloc[0] far_row = top2_sorted.iloc[1] return [{ "near": near_row["contract"], "far": far_row["contract"], "near_close": float(near_row["close"]), "far_close": float(far_row["close"]), "spread": float(near_row["close"]) - float(far_row["close"]), }] # ---- 辅助函数:提取合约月份 ----------------------------------------------- def _extract_contract_month(contract: str) -> int | None: """从合约代码提取月份。支持4位(如 RB2610→10)和3位(如 MA609→9)格式。""" m = _re.match(r"\D+(\d{2})(\d{2})$", contract) if m: return int(m.group(2)) m = _re.match(r"\D+(\d)(\d{2})$", contract) if m: return int(m.group(2)) return None def _extract_month_pair(near: str, far: str) -> tuple[int, int] | None: """从近月/远月合约代码提取月份组合,如 (EG2609, EG2701) → (9, 1)。""" nm = _extract_contract_month(near) fm = _extract_contract_month(far) if nm is not None and fm is not None: return (nm, fm) return None def _extract_year_prefix(contract: str) -> int | None: """从合约代码提取年份前缀(数字部分去掉末2位月份)。 RB2610 → 26, MA609 → 6, SC2608 → 26。""" digits = "".join(c for c in contract if c.isdigit()) if len(digits) < 3: return None try: return int(digits[:-2]) except (ValueError, IndexError): return None def _contract_year(contract: str) -> int | None: """从合约代码推断完整年份。 上期所/大商所/中金所 4位数字: RB2610 → 2000+26=2026 郑商所 3位数字: MA609 → 2020+6=2026(当前十年2020s) """ digits = "".join(c for c in contract if c.isdigit()) if len(digits) < 3: return None try: prefix = int(digits[:-2]) if len(digits) == 3: # 郑商所3位数字,前缀1位 return 2020 + prefix else: # 4位数字,前缀2位 return 2000 + prefix except (ValueError, IndexError): return None def _year_gap(near: str, far: str) -> int | None: """计算近月/远月合约的年份间隔,如 (SC2608, SC2609) → 0, (EG2609, EG2701) → 1。""" ny = _extract_year_prefix(near) fy = _extract_year_prefix(far) if ny is not None and fy is not None: return fy - ny return None # ---- 历史月差序列构建 -------------------------------------------------------- def build_historical_spreads( df: pd.DataFrame, latest_pairs: list[dict[str, Any]] | None = None, ) -> list[dict[str, Any]]: """ 构建该品种所有有效月差对的历史时间序列(含往年同期月差对)。 对每个最新月差对,提取月份组合(如9-1),然后扫描所有合约找出 历史上同一月份组合的合约对,各自独立计算价差序列。 返回每项含 hist_series 列表,每个元素含 pair_label / series / year。 """ if not latest_pairs: return [] all_contracts = df["contract"].unique().tolist() start_year = 2021 result = [] for i, pair in enumerate(latest_pairs): near_contract = pair["near"] far_contract = pair["far"] type_label = f"N{i+1}" pair_label = f"{near_contract} - {far_contract}" # 提取月份组合 month_pair = _extract_month_pair(near_contract, far_contract) if month_pair is None: continue near_month, far_month = month_pair # 提取年份间隔(关键过滤:只保留与最新对同年隔的历史对) ref_gap = _year_gap(near_contract, far_contract) # 找出所有与该月份组合匹配的历史合约对 hist_series_list = [] for c1 in all_contracts: m1 = _extract_contract_month(c1) if m1 != near_month: continue for c2 in all_contracts: m2 = _extract_contract_month(c2) if m2 != far_month: continue # c1 应早于 c2(近月合约代码 ≤ 远月合约代码) if c1 >= c2: continue # 年份间隔过滤:只保留与最新月差对同年隔的合约对 # 如最新对 SC2608-SC2609(0年隔),则排除 SC2108-SC2209(1年隔)等 if ref_gap is not None: gap = _year_gap(c1, c2) if gap is not None and gap != ref_gap: continue # 取两个合约的价差序列 near_data = df[df["contract"] == c1][["times", "close"]].rename(columns={"close": "near_close"}) far_data = df[df["contract"] == c2][["times", "close"]].rename(columns={"close": "far_close"}) merged = near_data.merge(far_data, on="times", how="inner") if merged.empty: continue merged = merged.sort_values("times") spread_series = pd.Series( (merged["near_close"] - merged["far_close"]).values, index=merged["times"], name="spread", ) spread_series = spread_series[~spread_series.index.duplicated(keep="last")] if spread_series.dropna().empty: continue # 只保留 start_year 之后的数据 spread_series = spread_series[spread_series.index >= f"{start_year}-01-01"] if spread_series.dropna().empty: continue # 只保留远月合约到期前约12个月开始的数据 # 远月合约 c2 交割年月 = (Y, far_month),保留 date(Y-1, far_month, 1) 之后的数据 _far_year = _contract_year(c2) if _far_year is not None: try: _far_start = pd.Timestamp(year=_far_year - 1, month=far_month, day=1) spread_series = spread_series[spread_series.index >= _far_start] except (ValueError, TypeError): pass if spread_series.dropna().empty: continue # 推断年份:从近月合约代码提取完整年份 year_val = _contract_year(c1) if year_val is None: # 降级方案:使用序列中间日期的年份 mid_idx = len(spread_series) // 2 year_val = int(spread_series.index[mid_idx].year) hist_series_list.append({ "pair_label": f"{c1} - {c2}", "series": spread_series, "year": year_val, }) # 按年份排序 hist_series_list.sort(key=lambda x: x["year"]) if not hist_series_list: continue result.append({ "type_label": type_label, "pair_label": pair_label, "hist_series": hist_series_list, }) return result # ---- 使用主脚本的 band_score 进行评分 ------------------------------------- # band_score 函数由 generate_fundamental_dashboard.py 提供,这里导入复用 # 若独立运行,使用内置简化版 try: from generate_fundamental_dashboard import band_score as _band_score _USE_EXTERNAL_BAND = True except ImportError: _USE_EXTERNAL_BAND = False def score_spread_pair( value: float, history: pd.Series, current_date: pd.Timestamp, inverse: bool = False, ) -> dict[str, Any] | None: """使用标准 band_score 对月差对评分。""" import math history = history.dropna() if history.empty: return None day = int(current_date.dayofyear) if _USE_EXTERNAL_BAND: result = _band_score(value, history[history.index < current_date], day, inverse) else: # 内置简化版 result = _builtin_band_score(value, history, day, inverse) return result def _builtin_band_score(value: float, history: pd.Series, day: int, inverse: bool) -> dict | None: """内置简化版 band_score(独立运行时使用)。使用分位数方法,与主模块一致。""" import math history = history.dropna().sort_index() if len(history) < 10: return None # day-of-year 窗口采样 days = history.index.dayofyear.to_numpy() day_diff = np.abs(days - day) day_diff = np.minimum(day_diff, 366 - day_diff) sample = history[day_diff <= 15] if len(sample) < 8: sample = history[day_diff <= 30] if len(sample) < 4: return None sample = sample.sort_values() trim_n = int(len(sample) * 0.1) trimmed_s = sample.iloc[trim_n : len(sample) - trim_n] if trim_n > 0 else sample mid = float(trimmed_s.quantile(0.50)) tail = max(0.02, min(0.45, 0.10)) q_low = float(trimmed_s.quantile(tail)) q_high = float(trimmed_s.quantile(1 - tail)) width_scale = max(0.25, min(4, 3.0 / 3)) low = mid - (mid - q_low) * width_scale high = mid + (q_high - mid) * width_scale if high == low: raw_score = 100.0 if value >= high else -100.0 elif value >= high: raw_score = 100.0 elif value <= low: raw_score = -100.0 else: raw_score = 200.0 * (value - low) / (high - low) - 100.0 score = -raw_score if inverse else raw_score below = float((sample < value).sum()) equal = float((sample == value).sum()) raw_pct = 100.0 * (below + 0.5 * equal) / len(sample) return { "score": round(max(-100.0, min(100.0, score)), 1), "raw_percentile": round(raw_pct, 1), "band_low": round(low, 4), "band_high": round(high, 4), "band_mean": round(mid, 4), } # ---- 月差图表时间序列(用于 seasonal_percentile 兼容) --------------------- def build_spread_seasonal_data( combined_series: pd.Series, ) -> dict[str, Any] | None: """将合并月差序列转为与 seasonal_percentile() 兼容的格式。""" s = combined_series.dropna().sort_index() if s.empty: return None latest_date = s.index.max() latest_value = float(s.loc[latest_date]) yearly: dict[str, list[tuple[str, float]]] = {} clipped = s[s.index.year >= 2021] for dt, val in clipped.items(): yearly.setdefault(str(int(dt.year)), []).append((dt.strftime("%m-%d"), float(val))) return { "latest_date": latest_date.strftime("%Y-%m-%d"), "latest_value": latest_value, "years": yearly, } def build_spread_seasonal_data_multi( hist_series_list: list[dict[str, Any]], ) -> dict[str, Any] | None: """ 将多月差历史序列转为前端绘图格式。 years 的键用年份字符串(如 "2026"),同时提供 year_labels 映射 年份→合约对标签(如 "2026" → "EG2609-EG2701")。 """ if not hist_series_list: return None yearly: dict[str, list[tuple[str, float]]] = {} year_labels: dict[str, str] = {} latest_date = None latest_value = None for hs in hist_series_list: s = hs["series"].dropna().sort_index() if s.empty: continue year_str = str(hs["year"]) # 图例只保留年份:EG2609-EG2701 → 26-27,MA609-MA701 → 6-7 pair_label = hs["pair_label"].replace(" ", "") parts = pair_label.split("-") short_parts = ["".join(c for c in p if c.isdigit())[:-2] for p in parts] short_label = "-".join(short_parts) year_labels[year_str] = short_label for dt, val in s.items(): yearly.setdefault(year_str, []).append((dt.strftime("%m-%d"), float(val))) # 记录最新值(取最末序列的末尾值) if latest_date is None or s.index.max() > latest_date: latest_date = s.index.max() latest_value = float(s.loc[latest_date]) if not yearly: return None return { "latest_date": latest_date.strftime("%Y-%m-%d") if latest_date else "", "latest_value": latest_value, "years": yearly, "year_labels": year_labels, } # ---- 从 pair_label 提取横坐标范围 -------------------------------------------- import re as _re def _extract_x_range(pair_label: str) -> tuple[str, str] | None: """ 从月差对标签提取横坐标范围,如 "RB2610 - RB2705" → ("05-15", "10-15")。 规则:x_start = 远月合约月份15日,x_end = 近月合约月份15日。 支持4位数字(大商所/上期所/中金所)和3位数字(郑商所)格式。 """ # 4位数字格式(同品种代码): e.g., RB2610 - RB2705 m = _re.match(r"(\D+)(\d{2})(\d{2})\s*-\s*\1(\d{2})(\d{2})", pair_label) if m: near_month = int(m.group(3)) far_month = int(m.group(5)) return (f"{far_month:02d}-15", f"{near_month:02d}-15") # 3位数字格式(郑商所,同品种代码): e.g., MA609 - MA701 m = _re.match(r"(\D+)(\d)(\d{2})\s*-\s*\1(\d)(\d{2})", pair_label) if m: near_month = int(m.group(3)) far_month = int(m.group(5)) return (f"{far_month:02d}-15", f"{near_month:02d}-15") # 4位数字格式(品种代码不同): e.g., IF2612 - IH2703 m = _re.match(r"(\D+)(\d{2})(\d{2})\s*-\s*(\D+)(\d{2})(\d{2})", pair_label) if m: near_month = int(m.group(3)) far_month = int(m.group(6)) return (f"{far_month:02d}-15", f"{near_month:02d}-15") # 3位数字格式(品种代码不同) m = _re.match(r"(\D+)(\d)(\d{2})\s*-\s*(\D+)(\d)(\d{2})", pair_label) if m: near_month = int(m.group(3)) far_month = int(m.group(6)) return (f"{far_month:02d}-15", f"{near_month:02d}-15") return None # ---- 主入口函数(批量模式)-------------------------------------------------- def read_spread_from_db_batch( variety_names: list[str], start_date: str = "2018-01-01", ) -> dict[str, dict[str, Any]]: """ 批量从 MySQL 读取所有品种月差数据并评分(复用同一 DB 连接)。 返回 {品种名: {score, pair_scores, series, latest_date, latest_pairs}} """ result: dict[str, dict[str, Any]] = {} with MysqlDLLClient() as db: # 先查出全局最新日期 try: max_date_rows = db.query( 1, "SELECT MAX(times) as max_dt FROM contract_day WHERE times >= ?", ["2026-01-01"], ) global_latest = pd.Timestamp(max_date_rows[0]["max_dt"]) if max_date_rows else None except Exception: global_latest = None for name in variety_names: p_code = NAME_TO_PCODE.get(name) if not p_code: continue try: raw = _fetch_one(db, p_code, start_date) except Exception: continue if raw.empty: continue # 确定最新日期 latest_date_actual = global_latest or raw["times"].max() latest_df = raw[raw["times"] == latest_date_actual] if latest_df.empty: for d in sorted(raw["times"].unique(), reverse=True): if not raw[raw["times"] == d].empty: latest_date_actual = d latest_df = raw[raw["times"] == d] break if latest_df.empty: continue latest_pairs = compute_spread_pairs_for_date(latest_df) # 标准月差对(OI>1万)无法形成时,使用主次月差对(持仓前2名) if not latest_pairs: latest_pairs = compute_fallback_pairs_for_date(latest_df) hist_spreads = build_historical_spreads(raw, latest_pairs) # 对每月差对评分(最多3对) pair_scores = [] valid_scores = [] for hs in hist_spreads[:3]: # 从 hist_series 中找最新合约对(最后一个)的最新值 latest_hist = None for h in reversed(hs["hist_series"]): if latest_date_actual in h["series"].index: latest_hist = h break if latest_hist is None: continue latest_val = latest_hist["series"].loc[latest_date_actual] if pd.isna(latest_val): continue # 评分:合并所有历史序列作为评分参考 all_series = pd.concat([h["series"] for h in hs["hist_series"]]).sort_index() all_series = all_series[~all_series.index.duplicated(keep="last")] sr = score_spread_pair( value=latest_val, history=all_series, current_date=latest_date_actual, inverse=False, ) if sr is None: continue # 统一四舍五入 sr_rounded = {k: (round(v, 1) if isinstance(v, float) else v) for k, v in sr.items()} pair_entry = { "pair_label": hs["pair_label"], "type_label": hs["type_label"], "latest_value": round(float(latest_val), 4), **sr_rounded, } # 每个月差对独立生成季节性图数据(多历史序列) pair_seasonal = build_spread_seasonal_data_multi(hs["hist_series"]) if pair_seasonal: pair_entry["years"] = pair_seasonal["years"] if pair_seasonal.get("year_labels"): pair_entry["year_labels"] = pair_seasonal["year_labels"] # 从 pair_label 提取合约月份,计算横坐标范围 x_range = _extract_x_range(hs["pair_label"]) if x_range: pair_entry["x_start"] = x_range[0] pair_entry["x_end"] = x_range[1] pair_scores.append(pair_entry) if sr_rounded.get("score") is not None: valid_scores.append(sr_rounded["score"]) total_score = round(float(np.mean(valid_scores)), 1) if valid_scores else None # 合并月差序列(取中位数) combined_series = _combine_spread_series(hist_spreads) if not combined_series.empty: combined_series = combined_series[combined_series.index >= "2021-01-01"] result[name] = { "score": total_score, "pair_scores": pair_scores, "series": combined_series, "latest_date": latest_date_actual.strftime("%Y-%m-%d") if latest_date_actual else "", "latest_pairs": [ {"near": p["near"], "far": p["far"], "spread": round(p["spread"], 4)} for p in latest_pairs ], } return result def _combine_spread_series(hist_spreads: list[dict]) -> pd.Series: """合并多月差序列(取中位数)。适配 hist_series 新格式。""" if not hist_spreads: return pd.Series(dtype=float) dfs = [] for hs in hist_spreads: # 新格式:hist_series 列表 for h in hs.get("hist_series", []): s = h["series"].rename(hs["type_label"]) dfs.append(s) if not dfs: return pd.Series(dtype=float) combined = pd.concat(dfs, axis=1) combined["median"] = combined.median(axis=1, skipna=True) return combined["median"] # ---- 测试入口 ---------------------------------------------------------------- if __name__ == "__main__": import sys sys.stdout.reconfigure(encoding="utf-8") result = read_spread_from_db_batch(["螺纹", "沪铜", "沪金"]) for name, data in result.items(): print(f"\n{'='*60}") print(f"品种: {name} 总得分: {data['score']}") print(f"最新日期: {data['latest_date']}") print(f"月差对:") for ps in data["pair_scores"]: print(f" {ps['pair_label']}: 值={ps['latest_value']}, 分={ps['score']}, " f"区间=[{ps['band_low']}, {ps['band_high']}]")