Files
jibenmian-dashboard/spread_from_db.py
T

655 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
月差数据库计算模块
基于 MySQL contract_day 表动态计算月差对,取代静态 Excel 读取。
逻辑:
- 以最新日期确定固定合约对(近月=持仓top3 OI不含远月,远月=最远OI>1万合约)
- 历史序列只取该固定合约对的价差,确保同一合约对连续走势
- 若标准月差对(OI>1万)无法形成,使用主次月差对(持仓前2名合约)
- 每月差对独立评分(seasonal band_score),考虑生命周期窗口
- 品种月差分 = 所有有效月差对得分平均
"""
from __future__ import annotations
import sys
import os
import json
from datetime import datetime
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
# 导入 MySQL 客户端
SKILL_DIR = Path(os.path.expanduser("~/.workbuddy/skills/contract_day/scripts"))
sys.path.insert(0, str(SKILL_DIR))
from mysql_client import MysqlDLLClient
# ---- 品种代码映射 -----------------------------------------------------------
# 基本面指标确认中的品种名 -> 品种代码
NAME_TO_PCODE: dict[str, str] = {
"玉米": "c", "淀粉": "cs", "豆一": "a", "豆二": "b", "豆粕": "m",
"豆油": "y", "棕榈油": "p", "鸡蛋": "jd", "生猪": "lh", "塑料": "l",
"PVC": "v", "聚丙烯": "pp", "焦炭": "j", "焦煤": "jm", "铁矿": "i",
"乙二醇": "eg", "苯乙烯": "eb", "液化气": "pg", "棉花": "cf",
"白糖": "sr", "菜籽油": "oi", "菜籽粕": "rm", "苹果": "ap",
"红枣": "cj", "花生": "pk", "PTA": "ta", "甲醇": "ma", "玻璃": "fg",
"硅铁": "sf", "锰硅": "sm", "尿素": "ur", "纯碱": "sa", "短纤": "pf",
"沪铜": "cu", "沪铝": "al", "沪锌": "zn", "沪铅": "pb", "沪镍": "ni",
"沪锡": "sn", "沪金": "au", "沪银": "ag", "螺纹": "rb", "热卷": "hc",
"不锈钢": "ss", "纸浆": "sp", "燃油": "fu", "沥青": "bu", "橡胶": "ru",
"20号胶": "nr", "原油": "sc", "LU燃油": "lu", "IF": "if", "IC": "ic",
"IM": "im", "IH": "ih", "二年债": "ts", "五年债": "tf", "十年债": "t",
"工业硅": "si", "氧化铝": "ao", "碳酸锂": "lc", "三十年债": "tl",
"BR橡胶": "br", "烧碱": "sh", "对二甲苯": "px", "欧线": "ec",
"瓶片": "pr", "原木": "lg", "": "pt", "": "pd", "丙烯": "pl",
"纯苯": "bz", "国际铜": "op", "动力煤": "zc",
"多晶硅": "ps", "铝合金": "ad",
}
PCODE_TO_NAME: dict[str, str] = {v: k for k, v in NAME_TO_PCODE.items()}
# ---- 数据库查询 -------------------------------------------------------------
def _fetch_one(db: MysqlDLLClient, p_code: str, start: str = "2018-01-01") -> pd.DataFrame:
"""查询单品种全合约历史(内部函数,需传入已打开的 db)。"""
try:
rows = db.query(
1,
"SELECT times, contract, close, oi FROM contract_day "
"WHERE p_code = ? AND times >= ? ORDER BY times, contract",
[p_code, start],
)
except Exception:
return pd.DataFrame(columns=["times", "contract", "close", "oi"])
if not rows:
return pd.DataFrame(columns=["times", "contract", "close", "oi"])
df = pd.DataFrame(rows)
df["times"] = pd.to_datetime(df["times"])
df["close"] = pd.to_numeric(df["close"], errors="coerce")
df["oi"] = pd.to_numeric(df["oi"], errors="coerce")
return df
def get_contract_lifecycles(df: pd.DataFrame) -> dict[str, tuple[pd.Timestamp, pd.Timestamp]]:
"""获取每个合约的生命周期:(首次交易日, 最后交易日)。"""
life = {}
for contract, grp in df.groupby("contract"):
life[contract] = (grp["times"].min(), grp["times"].max())
return life
# ---- 月差对计算 --------------------------------------------------------------
def compute_spread_pairs_for_date(date_df: pd.DataFrame) -> list[dict[str, Any]]:
"""
计算某一天的所有有效月差对。
"""
active = date_df[date_df["oi"] > 10000].copy()
if len(active) < 2:
return []
active = active.sort_values("contract")
far_row = active.iloc[-1] # 远月:排序最靠后
near_candidates = active[active["contract"] != far_row["contract"]].copy()
if near_candidates.empty:
return []
near_top3 = near_candidates.nlargest(min(3, len(near_candidates)), "oi")
pairs = []
for _, near_row in near_top3.iterrows():
pairs.append({
"near": near_row["contract"],
"far": far_row["contract"],
"near_close": float(near_row["close"]),
"far_close": float(far_row["close"]),
"spread": float(near_row["close"]) - float(far_row["close"]),
})
return pairs
def compute_fallback_pairs_for_date(date_df: pd.DataFrame) -> list[dict[str, Any]]:
"""
计算某一天的月差对(低门槛版本)。
当标准月差对(OI>1万)无法形成时,取持仓前两名的合约按先后顺序形成近月-远月月差对。
"""
active = date_df[date_df["oi"] > 0].copy()
if len(active) < 2:
return []
# 按持仓排序,取前两名
top2 = active.nlargest(2, "oi")
if len(top2) < 2:
return []
# 按合约代码排序(时间顺序),前者为近月,后者为远月
top2_sorted = top2.sort_values("contract")
near_row = top2_sorted.iloc[0]
far_row = top2_sorted.iloc[1]
return [{
"near": near_row["contract"],
"far": far_row["contract"],
"near_close": float(near_row["close"]),
"far_close": float(far_row["close"]),
"spread": float(near_row["close"]) - float(far_row["close"]),
}]
# ---- 辅助函数:提取合约月份 -----------------------------------------------
def _extract_contract_month(contract: str) -> int | None:
"""从合约代码提取月份。支持4位(如 RB2610→10)和3位(如 MA609→9)格式。"""
m = _re.match(r"\D+(\d{2})(\d{2})$", contract)
if m:
return int(m.group(2))
m = _re.match(r"\D+(\d)(\d{2})$", contract)
if m:
return int(m.group(2))
return None
def _extract_month_pair(near: str, far: str) -> tuple[int, int] | None:
"""从近月/远月合约代码提取月份组合,如 (EG2609, EG2701) → (9, 1)。"""
nm = _extract_contract_month(near)
fm = _extract_contract_month(far)
if nm is not None and fm is not None:
return (nm, fm)
return None
def _extract_year_prefix(contract: str) -> int | None:
"""从合约代码提取年份前缀(数字部分去掉末2位月份)。
RB2610 → 26, MA609 → 6, SC2608 → 26。"""
digits = "".join(c for c in contract if c.isdigit())
if len(digits) < 3:
return None
try:
return int(digits[:-2])
except (ValueError, IndexError):
return None
def _contract_year(contract: str) -> int | None:
"""从合约代码推断完整年份。
上期所/大商所/中金所 4位数字: RB2610 → 2000+26=2026
郑商所 3位数字: MA609 → 2020+6=2026(当前十年2020s
"""
digits = "".join(c for c in contract if c.isdigit())
if len(digits) < 3:
return None
try:
prefix = int(digits[:-2])
if len(digits) == 3: # 郑商所3位数字,前缀1位
return 2020 + prefix
else: # 4位数字,前缀2位
return 2000 + prefix
except (ValueError, IndexError):
return None
def _year_gap(near: str, far: str) -> int | None:
"""计算近月/远月合约的年份间隔,如 (SC2608, SC2609) → 0, (EG2609, EG2701) → 1。"""
ny = _extract_year_prefix(near)
fy = _extract_year_prefix(far)
if ny is not None and fy is not None:
return fy - ny
return None
# ---- 历史月差序列构建 --------------------------------------------------------
def build_historical_spreads(
df: pd.DataFrame,
latest_pairs: list[dict[str, Any]] | None = None,
) -> list[dict[str, Any]]:
"""
构建该品种所有有效月差对的历史时间序列(含往年同期月差对)。
对每个最新月差对,提取月份组合(如9-1),然后扫描所有合约找出
历史上同一月份组合的合约对,各自独立计算价差序列。
返回每项含 hist_series 列表,每个元素含 pair_label / series / year。
"""
if not latest_pairs:
return []
all_contracts = df["contract"].unique().tolist()
start_year = 2021
result = []
for i, pair in enumerate(latest_pairs):
near_contract = pair["near"]
far_contract = pair["far"]
type_label = f"N{i+1}"
pair_label = f"{near_contract} - {far_contract}"
# 提取月份组合
month_pair = _extract_month_pair(near_contract, far_contract)
if month_pair is None:
continue
near_month, far_month = month_pair
# 提取年份间隔(关键过滤:只保留与最新对同年隔的历史对)
ref_gap = _year_gap(near_contract, far_contract)
# 找出所有与该月份组合匹配的历史合约对
hist_series_list = []
for c1 in all_contracts:
m1 = _extract_contract_month(c1)
if m1 != near_month:
continue
for c2 in all_contracts:
m2 = _extract_contract_month(c2)
if m2 != far_month:
continue
# c1 应早于 c2(近月合约代码 ≤ 远月合约代码)
if c1 >= c2:
continue
# 年份间隔过滤:只保留与最新月差对同年隔的合约对
# 如最新对 SC2608-SC26090年隔),则排除 SC2108-SC22091年隔)等
if ref_gap is not None:
gap = _year_gap(c1, c2)
if gap is not None and gap != ref_gap:
continue
# 取两个合约的价差序列
near_data = df[df["contract"] == c1][["times", "close"]].rename(columns={"close": "near_close"})
far_data = df[df["contract"] == c2][["times", "close"]].rename(columns={"close": "far_close"})
merged = near_data.merge(far_data, on="times", how="inner")
if merged.empty:
continue
merged = merged.sort_values("times")
spread_series = pd.Series(
(merged["near_close"] - merged["far_close"]).values,
index=merged["times"],
name="spread",
)
spread_series = spread_series[~spread_series.index.duplicated(keep="last")]
if spread_series.dropna().empty:
continue
# 只保留 start_year 之后的数据
spread_series = spread_series[spread_series.index >= f"{start_year}-01-01"]
if spread_series.dropna().empty:
continue
# 只保留远月合约到期前约12个月开始的数据
# 远月合约 c2 交割年月 = (Y, far_month),保留 date(Y-1, far_month, 1) 之后的数据
_far_year = _contract_year(c2)
if _far_year is not None:
try:
_far_start = pd.Timestamp(year=_far_year - 1, month=far_month, day=1)
spread_series = spread_series[spread_series.index >= _far_start]
except (ValueError, TypeError):
pass
if spread_series.dropna().empty:
continue
# 推断年份:从近月合约代码提取完整年份
year_val = _contract_year(c1)
if year_val is None:
# 降级方案:使用序列中间日期的年份
mid_idx = len(spread_series) // 2
year_val = int(spread_series.index[mid_idx].year)
hist_series_list.append({
"pair_label": f"{c1} - {c2}",
"series": spread_series,
"year": year_val,
})
# 按年份排序
hist_series_list.sort(key=lambda x: x["year"])
if not hist_series_list:
continue
result.append({
"type_label": type_label,
"pair_label": pair_label,
"hist_series": hist_series_list,
})
return result
# ---- 使用主脚本的 band_score 进行评分 -------------------------------------
# band_score 函数由 generate_fundamental_dashboard.py 提供,这里导入复用
# 若独立运行,使用内置简化版
try:
from generate_fundamental_dashboard import band_score as _band_score
_USE_EXTERNAL_BAND = True
except ImportError:
_USE_EXTERNAL_BAND = False
def score_spread_pair(
value: float,
history: pd.Series,
current_date: pd.Timestamp,
inverse: bool = False,
) -> dict[str, Any] | None:
"""使用标准 band_score 对月差对评分。"""
import math
history = history.dropna()
if history.empty:
return None
day = int(current_date.dayofyear)
if _USE_EXTERNAL_BAND:
result = _band_score(value, history[history.index < current_date], day, inverse)
else:
# 内置简化版
result = _builtin_band_score(value, history, day, inverse)
return result
def _builtin_band_score(value: float, history: pd.Series, day: int, inverse: bool) -> dict | None:
"""内置简化版 band_score(独立运行时使用)。使用分位数方法,与主模块一致。"""
import math
history = history.dropna().sort_index()
if len(history) < 10:
return None
# day-of-year 窗口采样
days = history.index.dayofyear.to_numpy()
day_diff = np.abs(days - day)
day_diff = np.minimum(day_diff, 366 - day_diff)
sample = history[day_diff <= 15]
if len(sample) < 8:
sample = history[day_diff <= 30]
if len(sample) < 4:
return None
sample = sample.sort_values()
trim_n = int(len(sample) * 0.1)
trimmed_s = sample.iloc[trim_n : len(sample) - trim_n] if trim_n > 0 else sample
mid = float(trimmed_s.quantile(0.50))
tail = max(0.02, min(0.45, 0.10))
q_low = float(trimmed_s.quantile(tail))
q_high = float(trimmed_s.quantile(1 - tail))
width_scale = max(0.25, min(4, 3.0 / 3))
low = mid - (mid - q_low) * width_scale
high = mid + (q_high - mid) * width_scale
if high == low:
raw_score = 100.0 if value >= high else -100.0
elif value >= high:
raw_score = 100.0
elif value <= low:
raw_score = -100.0
else:
raw_score = 200.0 * (value - low) / (high - low) - 100.0
score = -raw_score if inverse else raw_score
below = float((sample < value).sum())
equal = float((sample == value).sum())
raw_pct = 100.0 * (below + 0.5 * equal) / len(sample)
return {
"score": round(max(-100.0, min(100.0, score)), 1),
"raw_percentile": round(raw_pct, 1),
"band_low": round(low, 4),
"band_high": round(high, 4),
"band_mean": round(mid, 4),
}
# ---- 月差图表时间序列(用于 seasonal_percentile 兼容) ---------------------
def build_spread_seasonal_data(
combined_series: pd.Series,
) -> dict[str, Any] | None:
"""将合并月差序列转为与 seasonal_percentile() 兼容的格式。"""
s = combined_series.dropna().sort_index()
if s.empty:
return None
latest_date = s.index.max()
latest_value = float(s.loc[latest_date])
yearly: dict[str, list[tuple[str, float]]] = {}
clipped = s[s.index.year >= 2021]
for dt, val in clipped.items():
yearly.setdefault(str(int(dt.year)), []).append((dt.strftime("%m-%d"), float(val)))
return {
"latest_date": latest_date.strftime("%Y-%m-%d"),
"latest_value": latest_value,
"years": yearly,
}
def build_spread_seasonal_data_multi(
hist_series_list: list[dict[str, Any]],
) -> dict[str, Any] | None:
"""
将多月差历史序列转为前端绘图格式。
years 的键用年份字符串(如 "2026"),同时提供 year_labels 映射
年份→合约对标签(如 "2026""EG2609-EG2701")。
"""
if not hist_series_list:
return None
yearly: dict[str, list[tuple[str, float]]] = {}
year_labels: dict[str, str] = {}
latest_date = None
latest_value = None
for hs in hist_series_list:
s = hs["series"].dropna().sort_index()
if s.empty:
continue
year_str = str(hs["year"])
# 图例只保留年份:EG2609-EG2701 → 26-27MA609-MA701 → 6-7
pair_label = hs["pair_label"].replace(" ", "")
parts = pair_label.split("-")
short_parts = ["".join(c for c in p if c.isdigit())[:-2] for p in parts]
short_label = "-".join(short_parts)
year_labels[year_str] = short_label
for dt, val in s.items():
yearly.setdefault(year_str, []).append((dt.strftime("%m-%d"), float(val)))
# 记录最新值(取最末序列的末尾值)
if latest_date is None or s.index.max() > latest_date:
latest_date = s.index.max()
latest_value = float(s.loc[latest_date])
if not yearly:
return None
return {
"latest_date": latest_date.strftime("%Y-%m-%d") if latest_date else "",
"latest_value": latest_value,
"years": yearly,
"year_labels": year_labels,
}
# ---- 从 pair_label 提取横坐标范围 --------------------------------------------
import re as _re
def _extract_x_range(pair_label: str) -> tuple[str, str] | None:
"""
从月差对标签提取横坐标范围,如 "RB2610 - RB2705" → ("05-15", "10-15")。
规则:x_start = 远月合约月份15日,x_end = 近月合约月份15日。
支持4位数字(大商所/上期所/中金所)和3位数字(郑商所)格式。
"""
# 4位数字格式(同品种代码): e.g., RB2610 - RB2705
m = _re.match(r"(\D+)(\d{2})(\d{2})\s*-\s*\1(\d{2})(\d{2})", pair_label)
if m:
near_month = int(m.group(3))
far_month = int(m.group(5))
return (f"{far_month:02d}-15", f"{near_month:02d}-15")
# 3位数字格式(郑商所,同品种代码): e.g., MA609 - MA701
m = _re.match(r"(\D+)(\d)(\d{2})\s*-\s*\1(\d)(\d{2})", pair_label)
if m:
near_month = int(m.group(3))
far_month = int(m.group(5))
return (f"{far_month:02d}-15", f"{near_month:02d}-15")
# 4位数字格式(品种代码不同): e.g., IF2612 - IH2703
m = _re.match(r"(\D+)(\d{2})(\d{2})\s*-\s*(\D+)(\d{2})(\d{2})", pair_label)
if m:
near_month = int(m.group(3))
far_month = int(m.group(6))
return (f"{far_month:02d}-15", f"{near_month:02d}-15")
# 3位数字格式(品种代码不同)
m = _re.match(r"(\D+)(\d)(\d{2})\s*-\s*(\D+)(\d)(\d{2})", pair_label)
if m:
near_month = int(m.group(3))
far_month = int(m.group(6))
return (f"{far_month:02d}-15", f"{near_month:02d}-15")
return None
# ---- 主入口函数(批量模式)--------------------------------------------------
def read_spread_from_db_batch(
variety_names: list[str],
start_date: str = "2018-01-01",
) -> dict[str, dict[str, Any]]:
"""
批量从 MySQL 读取所有品种月差数据并评分(复用同一 DB 连接)。
返回 {品种名: {score, pair_scores, series, latest_date, latest_pairs}}
"""
result: dict[str, dict[str, Any]] = {}
with MysqlDLLClient() as db:
# 先查出全局最新日期
try:
max_date_rows = db.query(
1,
"SELECT MAX(times) as max_dt FROM contract_day WHERE times >= ?",
["2026-01-01"],
)
global_latest = pd.Timestamp(max_date_rows[0]["max_dt"]) if max_date_rows else None
except Exception:
global_latest = None
for name in variety_names:
p_code = NAME_TO_PCODE.get(name)
if not p_code:
continue
try:
raw = _fetch_one(db, p_code, start_date)
except Exception:
continue
if raw.empty:
continue
# 确定最新日期
latest_date_actual = global_latest or raw["times"].max()
latest_df = raw[raw["times"] == latest_date_actual]
if latest_df.empty:
for d in sorted(raw["times"].unique(), reverse=True):
if not raw[raw["times"] == d].empty:
latest_date_actual = d
latest_df = raw[raw["times"] == d]
break
if latest_df.empty:
continue
latest_pairs = compute_spread_pairs_for_date(latest_df)
# 标准月差对(OI>1万)无法形成时,使用主次月差对(持仓前2名)
if not latest_pairs:
latest_pairs = compute_fallback_pairs_for_date(latest_df)
hist_spreads = build_historical_spreads(raw, latest_pairs)
# 对每月差对评分(最多3对)
pair_scores = []
valid_scores = []
for hs in hist_spreads[:3]:
# 从 hist_series 中找最新合约对(最后一个)的最新值
latest_hist = None
for h in reversed(hs["hist_series"]):
if latest_date_actual in h["series"].index:
latest_hist = h
break
if latest_hist is None:
continue
latest_val = latest_hist["series"].loc[latest_date_actual]
if pd.isna(latest_val):
continue
# 评分:合并所有历史序列作为评分参考
all_series = pd.concat([h["series"] for h in hs["hist_series"]]).sort_index()
all_series = all_series[~all_series.index.duplicated(keep="last")]
sr = score_spread_pair(
value=latest_val,
history=all_series,
current_date=latest_date_actual,
inverse=False,
)
if sr is None:
continue
# 统一四舍五入
sr_rounded = {k: (round(v, 1) if isinstance(v, float) else v) for k, v in sr.items()}
pair_entry = {
"pair_label": hs["pair_label"],
"type_label": hs["type_label"],
"latest_value": round(float(latest_val), 4),
**sr_rounded,
}
# 每个月差对独立生成季节性图数据(多历史序列)
pair_seasonal = build_spread_seasonal_data_multi(hs["hist_series"])
if pair_seasonal:
pair_entry["years"] = pair_seasonal["years"]
if pair_seasonal.get("year_labels"):
pair_entry["year_labels"] = pair_seasonal["year_labels"]
# 从 pair_label 提取合约月份,计算横坐标范围
x_range = _extract_x_range(hs["pair_label"])
if x_range:
pair_entry["x_start"] = x_range[0]
pair_entry["x_end"] = x_range[1]
pair_scores.append(pair_entry)
if sr_rounded.get("score") is not None:
valid_scores.append(sr_rounded["score"])
total_score = round(float(np.mean(valid_scores)), 1) if valid_scores else None
# 合并月差序列(取中位数)
combined_series = _combine_spread_series(hist_spreads)
if not combined_series.empty:
combined_series = combined_series[combined_series.index >= "2021-01-01"]
result[name] = {
"score": total_score,
"pair_scores": pair_scores,
"series": combined_series,
"latest_date": latest_date_actual.strftime("%Y-%m-%d") if latest_date_actual else "",
"latest_pairs": [
{"near": p["near"], "far": p["far"], "spread": round(p["spread"], 4)}
for p in latest_pairs
],
}
return result
def _combine_spread_series(hist_spreads: list[dict]) -> pd.Series:
"""合并多月差序列(取中位数)。适配 hist_series 新格式。"""
if not hist_spreads:
return pd.Series(dtype=float)
dfs = []
for hs in hist_spreads:
# 新格式:hist_series 列表
for h in hs.get("hist_series", []):
s = h["series"].rename(hs["type_label"])
dfs.append(s)
if not dfs:
return pd.Series(dtype=float)
combined = pd.concat(dfs, axis=1)
combined["median"] = combined.median(axis=1, skipna=True)
return combined["median"]
# ---- 测试入口 ----------------------------------------------------------------
if __name__ == "__main__":
import sys
sys.stdout.reconfigure(encoding="utf-8")
result = read_spread_from_db_batch(["螺纹", "沪铜", "沪金"])
for name, data in result.items():
print(f"\n{'='*60}")
print(f"品种: {name} 总得分: {data['score']}")
print(f"最新日期: {data['latest_date']}")
print(f"月差对:")
for ps in data["pair_scores"]:
print(f" {ps['pair_label']}: 值={ps['latest_value']}, 分={ps['score']}, "
f"区间=[{ps['band_low']}, {ps['band_high']}]")