from __future__ import annotations import html import json import math import re from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any import numpy as np import pandas as pd try: from spread_from_db import read_spread_from_db_batch, build_spread_seasonal_data, NAME_TO_PCODE as SPREAD_NAME_TO_PCODE _HAS_SPREAD_DB = True except ImportError: _HAS_SPREAD_DB = False BASE_DIR = Path(__file__).resolve().parent CONFIRM_FILE = BASE_DIR / "基本面指标确认.xlsx" WIND_FILE = BASE_DIR / "wind数据汇总.xlsx" STEEL_FILE = BASE_DIR / "钢联数据汇总.xlsx" SPREAD_FILE = BASE_DIR / "月差拼接数据.xlsx" OUTPUT_FILE = BASE_DIR / "基本面评分面板.html" DIMENSIONS = ["利润", "产量", "库存", "需求", "月差"] INVERSE_DIMS = {"库存"} DIM_COLORS = { "利润": "#2563eb", "产量": "#dc2626", "库存": "#1d4ed8", "需求": "#0f9f6e", "月差": "#9333ea", } DEFAULT_BAND_WINDOW = 15 DEFAULT_BAND_K = 3.0 DEFAULT_BAND_TRIM = 0.10 HISTORY_START_DEFAULT = "2026-01-01" BOARD_MAP = { "黑色": ["焦煤", "焦炭", "铁矿", "螺纹", "热卷", "硅铁", "锰硅", "不锈钢"], "建材": ["玻璃", "纯碱", "PVC", "原木"], "有色": ["沪铜", "沪铝", "沪锌", "沪铅", "沪镍", "沪锡", "氧化铝", "铝合金", "工业硅", "多晶硅", "碳酸锂", "国际铜"], "贵金属": ["沪金", "沪银", "铂", "钯"], "能源": ["原油", "液化气", "燃油", "LU燃油", "沥青", "动力煤", "欧线"], "煤化工": ["尿素", "甲醇"], "油化工": ["塑料", "聚丙烯", "丙烯", "纯苯", "苯乙烯", "对二甲苯", "PTA", "乙二醇", "短纤", "瓶片", "烧碱", "丁二烯"], "农产品": ["玉米", "淀粉", "豆一", "豆二", "豆粕", "豆油", "菜籽油", "菜籽粕", "棕榈油", "鸡蛋", "生猪", "棉花", "白糖", "苹果", "红枣", "花生", "棉纱"], "软商品": ["纸浆", "双胶纸", "橡胶", "20号胶", "BR橡胶"], "金融": ["IF", "IC", "IM", "IH", "二年债", "五年债", "十年债", "三十年债"], } @dataclass class Metric: name: str source: str unit: str freq: str series: pd.Series def clean_name(value: Any) -> str: if pd.isna(value): return "" text = str(value).strip() text = text.replace("(", "(").replace(")", ")") text = text.replace(";", ";").replace(",", ",") return re.sub(r"\s+", "", text) def read_metrics(path: Path, sheet: str, name_row: int, unit_row: int, freq_row: int, data_row: int, source: str) -> list[Metric]: raw = pd.read_excel(path, sheet_name=sheet, header=None) dates = pd.to_datetime(raw.iloc[data_row:, 0], errors="coerce") metrics: list[Metric] = [] for col in range(1, raw.shape[1]): name = clean_name(raw.iat[name_row, col]) if not name: continue values = pd.to_numeric(raw.iloc[data_row:, col], errors="coerce") frame = pd.DataFrame({"date": dates, "value": values}).dropna(subset=["date", "value"]) if frame.empty: continue series = frame.drop_duplicates("date").set_index("date")["value"].sort_index() if series.empty: continue metrics.append( Metric( name=name, source=source, unit=str(raw.iat[unit_row, col]) if pd.notna(raw.iat[unit_row, col]) else "", freq=str(raw.iat[freq_row, col]) if pd.notna(raw.iat[freq_row, col]) else "", series=series, ) ) return metrics def build_metric_index(metrics: list[Metric]) -> tuple[dict[str, Metric], list[str]]: grouped: dict[str, list[Metric]] = {} for metric in metrics: grouped.setdefault(metric.name, []).append(metric) selected: dict[str, Metric] = {} for name, items in grouped.items(): selected[name] = max(items, key=lambda m: (m.series.index.max(), m.series.notna().sum())) return selected, sorted(selected.keys(), key=len, reverse=True) def normalize_expr(expr: Any) -> str: if pd.isna(expr): return "" text = str(expr).strip() text = text.replace("(", "(").replace(")", ")") text = text.replace(";", ";").replace(",", ",") text = re.sub(r"\s+", "", text) text = re.sub(r"(?i)\bSum\s*\(", "(", text) text = text.replace(";", "+") return text def parse_expression(expr: Any, metric_index: dict[str, Metric], metric_names: list[str]) -> tuple[str, dict[str, Metric], list[str]]: text = normalize_expr(expr) if not text: return "", {}, [] used: dict[str, Metric] = {} token_expr = text for idx, name in enumerate(metric_names): if name and name in token_expr: token = f"m{idx}" token_expr = token_expr.replace(name, token) used[token] = metric_index[name] leftovers = re.sub(r"m\d+", " ", token_expr) leftovers = re.sub(r"[0-9eE\.\+\-\*/\(\)\s,]+", " ", leftovers) missing = [x.strip() for x in re.split(r"\s+", leftovers) if x.strip()] return token_expr, used, missing def eval_expression(expr: Any, metric_index: dict[str, Metric], metric_names: list[str]) -> tuple[pd.Series | None, list[str], list[Metric]]: token_expr, used, missing = parse_expression(expr, metric_index, metric_names) if not token_expr or not used: return None, missing, [] local_env = {token: metric.series for token, metric in used.items()} try: value = eval(token_expr, {"__builtins__": {}}, local_env) except Exception: return None, missing or [token_expr], list(used.values()) if isinstance(value, (int, float, np.number)): return None, missing, list(used.values()) series = pd.to_numeric(value, errors="coerce").dropna().sort_index() return (series if not series.empty else None), missing, list(used.values()) def circular_day_diff(day_values: np.ndarray, target_day: int) -> np.ndarray: diff = np.abs(day_values - target_day) return np.minimum(diff, 366 - diff) def trimmed_values(values: pd.Series, trim: float = DEFAULT_BAND_TRIM) -> pd.Series: values = pd.to_numeric(values, errors="coerce").dropna().sort_values() if values.empty or trim <= 0: return values cut = int(len(values) * trim) if len(values) - cut * 2 < 4: return values return values.iloc[cut : len(values) - cut] def seasonal_band_sample(history: pd.Series, day: int, window: int = DEFAULT_BAND_WINDOW) -> tuple[pd.Series, int]: history = pd.to_numeric(history, errors="coerce").dropna().sort_index() if history.empty: return history, window days = history.index.dayofyear.to_numpy() sample = history[circular_day_diff(days, day) <= window] used_window = window if len(sample) < 8: sample = history[circular_day_diff(days, day) <= 30] used_window = 30 if len(sample) < 8: sample = history used_window = 366 return sample, used_window def band_score(latest_value: float, history: pd.Series, day: int, inverse: bool = False) -> dict[str, Any] | None: sample, used_window = seasonal_band_sample(history, day) sample = sample.sort_values() if len(sample) < 4: return None mid = float(sample.quantile(0.5)) q_low = float(sample.quantile(DEFAULT_BAND_TRIM)) q_high = float(sample.quantile(1 - DEFAULT_BAND_TRIM)) width_scale = DEFAULT_BAND_K / 3 low = mid - (mid - q_low) * width_scale high = mid + (q_high - mid) * width_scale if not math.isfinite(low) or not math.isfinite(high): return None if high == low: raw_score = 100.0 if latest_value >= high else -100.0 elif latest_value >= high: raw_score = 100.0 elif latest_value <= low: raw_score = -100.0 else: raw_score = 200.0 * (latest_value - low) / (high - low) - 100.0 score = -raw_score if inverse else raw_score return { "score": max(-100.0, min(100.0, score)), "band_low": low, "band_high": high, "band_mean": mid, "band_window": used_window, "band_k": DEFAULT_BAND_K, "band_trim": DEFAULT_BAND_TRIM, } def seasonal_percentile(series: pd.Series) -> dict[str, Any] | None: series = pd.to_numeric(series, errors="coerce").dropna().sort_index() if series.empty: return None latest_date = series.index.max() latest_value = float(series.loc[latest_date]) history = series[series.index < latest_date].dropna() if history.empty: return None day = int(latest_date.dayofyear) day_diff = np.abs(history.index.dayofyear - day) sample = history[day_diff <= 15] window = 15 if len(sample) < 8: sample = history[day_diff <= 30] window = 30 if len(sample) < 8: sample = history window = 366 less = float((sample < latest_value).sum()) equal = float((sample == latest_value).sum()) raw_pct = 100.0 * (less + 0.5 * equal) / len(sample) latest_year = int(latest_date.year) yearly: dict[str, list[tuple[str, float]]] = {} clipped = series[series.index.year >= 2021] for dt, val in clipped.items(): yearly.setdefault(str(int(dt.year)), []).append((dt.strftime("%m-%d"), float(val))) return { "latest_date": latest_date.strftime("%Y-%m-%d"), "latest_value": latest_value, "raw_percentile": round(raw_pct, 1), "sample_size": int(len(sample)), "window": window, "years": yearly, "chart_transform": "linear_z_score_in_browser", } def commodity_boards(name: str) -> list[str]: boards = [board for board, names in BOARD_MAP.items() if name in names] return boards or ["其他"] def score_series_at(series: pd.Series, date: pd.Timestamp, inverse: bool = False) -> float | None: series = pd.to_numeric(series, errors="coerce").dropna().sort_index() available = series[series.index <= date] if available.empty: return None value_date = available.index.max() value = float(available.loc[value_date]) history = series[series.index < value_date].dropna() if history.empty: return None band = band_score(value, history, int(value_date.dayofyear), inverse) if band: return float(band["score"]) sample, _ = seasonal_band_sample(history, int(value_date.dayofyear)) if sample.empty: return None less = float((sample < value).sum()) equal = float((sample == value).sum()) raw = 100.0 * (less + 0.5 * equal) / len(sample) raw_mapped = raw * 2.0 - 100.0 return -raw_mapped if inverse else raw_mapped def build_score_history(dim_series: dict[str, pd.Series]) -> list[dict[str, Any]]: available = [s.dropna().sort_index() for s in dim_series.values() if s is not None and not s.dropna().empty] if not available: return [] latest = max(s.index.max() for s in available) start = pd.Timestamp(HISTORY_START_DEFAULT) if latest < start: start = min(s.index.min() for s in available) dates = pd.date_range(start=start, end=latest, freq="D") history: list[dict[str, Any]] = [] for date in dates: scores: dict[str, float] = {} for dim, series in dim_series.items(): score = score_series_at(series, date, dim in INVERSE_DIMS) if score is not None and math.isfinite(score): scores[dim] = score if not scores: continue dims_with_spread = [d for d in DIMENSIONS if d in scores] dims_without_demand = [d for d in DIMENSIONS if d != "需求" and d in scores] dims_no_demand_no_spread = [d for d in DIMENSIONS if d not in ("需求", "月差") and d in scores] total = float(np.mean([scores[d] for d in dims_with_spread])) if dims_with_spread else None total_no_demand = float(np.mean([scores[d] for d in dims_without_demand])) if dims_without_demand else None total_no_demand_no_spread = float(np.mean([scores[d] for d in dims_no_demand_no_spread])) if dims_no_demand_no_spread else None history.append( { "date": date.strftime("%Y-%m-%d"), "total": round(total, 1) if total is not None else None, "totalNoDemand": round(total_no_demand, 1) if total_no_demand is not None else None, "totalNoDemandNoSpread": round(total_no_demand_no_spread, 1) if total_no_demand_no_spread is not None else None, } ) return history def make_records() -> dict[str, Any]: metrics = [] metrics.extend(read_metrics(WIND_FILE, "wind", 1, 3, 2, 8, "Wind")) metrics.extend(read_metrics(STEEL_FILE, "钢联", 1, 2, 5, 10, "钢联")) metric_index, metric_names = build_metric_index(metrics) confirm = pd.read_excel(CONFIRM_FILE).fillna("") # 从 MySQL 批量加载所有品种月差数据 if _HAS_SPREAD_DB: all_variety_names = [str(n).strip() for n in confirm["品种"] if str(n).strip()] spread_db = read_spread_from_db_batch(all_variety_names) else: spread_db = {} commodities: list[dict[str, Any]] = [] for idx, row in confirm.iterrows(): name = str(row["品种"]).strip() if not name: continue dims: dict[str, Any] = {} dim_series: dict[str, pd.Series] = {} valid_scores: list[float] = [] for dim in ["利润", "产量", "库存", "需求"]: series, missing, used_metrics = eval_expression(row.get(dim, ""), metric_index, metric_names) if series is not None: dim_series[dim] = series stat = seasonal_percentile(series) if series is not None else None if stat: latest_dt = pd.Timestamp(stat["latest_date"]) history = series[series.index < latest_dt].dropna() band = band_score(stat["latest_value"], history, int(latest_dt.dayofyear), dim in INVERSE_DIMS) if band: score = band["score"] stat.update(band) else: raw = stat["raw_percentile"] raw_mapped = raw * 2.0 - 100.0 score = -raw_mapped if dim in INVERSE_DIMS else raw_mapped stat["score"] = round(score, 1) stat["direction"] = "低库存更优,按正常区间反向计分" if dim in INVERSE_DIMS else "高于正常区间更优" stat["expr"] = str(row.get(dim, "")).strip() stat["metrics"] = [ {"name": m.name, "source": m.source, "unit": m.unit, "freq": m.freq} for m in used_metrics ] valid_scores.append(float(score)) dims[dim] = stat or { "score": None, "raw_percentile": None, "direction": "缺数据", "expr": str(row.get(dim, "")).strip(), "missing": missing, "metrics": [{"name": m.name, "source": m.source, "unit": m.unit, "freq": m.freq} for m in used_metrics], } # 月差维度:从 MySQL contract_day 动态计算 dim = "月差" spread_info = spread_db.get(name, {}) spread_series_db = spread_info.get("series") if spread_series_db is not None and not spread_series_db.empty: dim_series[dim] = spread_series_db # 转换为 seasonal_percentile 兼容格式 stat = build_spread_seasonal_data(spread_series_db) else: stat = None if stat: latest_dt = pd.Timestamp(stat["latest_date"]) history = spread_series_db[spread_series_db.index < latest_dt].dropna() band = band_score(stat["latest_value"], history, int(latest_dt.dayofyear), dim in INVERSE_DIMS) if band: score = band["score"] stat.update(band) else: raw = stat["raw_percentile"] if stat.get("raw_percentile") is not None else 50.0 raw_mapped = raw * 2.0 - 100.0 score = -raw_mapped if dim in INVERSE_DIMS else raw_mapped stat["score"] = round(score, 1) stat["direction"] = "高于正常区间更优(近月升水=强势)" stat["expr"] = "近月(top3 OI) - 远月(最远>1万手)" # 月差对详情 pair_scores = spread_info.get("pair_scores", []) stat["pair_scores"] = pair_scores pair_desc = " / ".join( f"{ps['pair_label']}:{ps.get('score', '--')}" for ps in pair_scores ) if pair_scores else "" stat["metrics"] = [{ "name": f"月差对({len(pair_scores)}对)", "source": "MySQL contract_day", "unit": "", "freq": "日", "detail": pair_desc, }] valid_scores.append(float(score)) dims[dim] = stat or { "score": None, "raw_percentile": None, "direction": "缺数据" if not _HAS_SPREAD_DB else "月差数据未找到", "expr": "近月(top3 OI) - 远月(最远>1万手)", "missing": [] if _HAS_SPREAD_DB else ["spread_from_db 模块不可用"], "metrics": [], } total = round(float(np.mean(valid_scores)), 1) if valid_scores else None boards = commodity_boards(name) commodities.append( { "rank": None, "name": name, "board": " / ".join(boards), "boards": boards, "total": total, "valid_count": len(valid_scores), "dims": dims, "scoreHistory": build_score_history(dim_series), } ) ranked = sorted([c for c in commodities if c["total"] is not None], key=lambda x: x["total"], reverse=True) for i, item in enumerate(ranked, 1): item["rank"] = i no_score = [c for c in commodities if c["total"] is None] commodities = ranked + no_score completion_total = len(commodities) completion_count = sum( all(c["dims"].get(dim, {}).get("years") for dim in DIMENSIONS) for c in commodities ) return { "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "source_files": { "wind": WIND_FILE.name, "steel": STEEL_FILE.name, "confirm": CONFIRM_FILE.name, "spread_source": "MySQL contract_day (spread_from_db.py)", }, "dimensions": DIMENSIONS, "boards": sorted(set(board for c in commodities for board in c.get("boards", [c["board"]]))), "completion": { "count": completion_count, "total": completion_total, "ratio": round(100.0 * completion_count / completion_total, 1) if completion_total else 0, }, "commodities": commodities, } def html_template(data: dict[str, Any]) -> str: payload = json.dumps(data, ensure_ascii=False, allow_nan=False) return f"""