from __future__ import annotations import html import json import math import re from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any import numpy as np import pandas as pd BASE_DIR = Path(__file__).resolve().parent CONFIRM_FILE = BASE_DIR / "基本面指标确认.xlsx" WIND_FILE = BASE_DIR / "wind数据汇总.xlsx" STEEL_FILE = BASE_DIR / "钢联数据汇总.xlsx" OUTPUT_FILE = BASE_DIR / "基本面评分面板.html" DIMENSIONS = ["利润", "产量", "库存", "需求"] INVERSE_DIMS = {"库存"} DIM_COLORS = { "利润": "#2563eb", "产量": "#dc2626", "库存": "#1d4ed8", "需求": "#0f9f6e", "月差": "#64748b", } DEFAULT_BAND_WINDOW = 15 DEFAULT_BAND_K = 3.0 DEFAULT_BAND_TRIM = 0.10 HISTORY_START_DEFAULT = "2026-01-01" BOARD_MAP = { "黑色": ["焦煤", "焦炭", "铁矿", "螺纹", "热卷", "硅铁", "锰硅", "不锈钢"], "建材": ["玻璃", "纯碱", "PVC", "原木"], "有色": ["沪铜", "沪铝", "沪锌", "沪铅", "沪镍", "沪锡", "氧化铝", "铝合金", "工业硅", "多晶硅", "碳酸锂", "国际铜"], "贵金属": ["沪金", "沪银", "铂", "钯"], "能源": ["原油", "液化气", "燃油", "LU燃油", "沥青", "动力煤", "欧线"], "煤化工": ["尿素", "甲醇"], "油化工": ["塑料", "聚丙烯", "丙烯", "纯苯", "苯乙烯", "对二甲苯", "PTA", "乙二醇", "短纤", "瓶片", "烧碱", "丁二烯"], "农产品": ["玉米", "淀粉", "豆一", "豆二", "豆粕", "豆油", "菜籽油", "菜籽粕", "棕榈油", "鸡蛋", "生猪", "棉花", "白糖", "苹果", "红枣", "花生", "棉纱"], "软商品": ["纸浆", "双胶纸", "橡胶", "20号胶", "BR橡胶"], "金融": ["IF", "IC", "IM", "IH", "二年债", "五年债", "十年债", "三十年债"], } @dataclass class Metric: name: str source: str unit: str freq: str series: pd.Series def clean_name(value: Any) -> str: if pd.isna(value): return "" text = str(value).strip() text = text.replace("(", "(").replace(")", ")") text = text.replace(";", ";").replace(",", ",") return re.sub(r"\s+", "", text) def read_metrics(path: Path, sheet: str, name_row: int, unit_row: int, freq_row: int, data_row: int, source: str) -> list[Metric]: raw = pd.read_excel(path, sheet_name=sheet, header=None) dates = pd.to_datetime(raw.iloc[data_row:, 0], errors="coerce") metrics: list[Metric] = [] for col in range(1, raw.shape[1]): name = clean_name(raw.iat[name_row, col]) if not name: continue values = pd.to_numeric(raw.iloc[data_row:, col], errors="coerce") frame = pd.DataFrame({"date": dates, "value": values}).dropna(subset=["date", "value"]) if frame.empty: continue series = frame.drop_duplicates("date").set_index("date")["value"].sort_index() if series.empty: continue metrics.append( Metric( name=name, source=source, unit=str(raw.iat[unit_row, col]) if pd.notna(raw.iat[unit_row, col]) else "", freq=str(raw.iat[freq_row, col]) if pd.notna(raw.iat[freq_row, col]) else "", series=series, ) ) return metrics def build_metric_index(metrics: list[Metric]) -> tuple[dict[str, Metric], list[str]]: grouped: dict[str, list[Metric]] = {} for metric in metrics: grouped.setdefault(metric.name, []).append(metric) selected: dict[str, Metric] = {} for name, items in grouped.items(): selected[name] = max(items, key=lambda m: (m.series.index.max(), m.series.notna().sum())) return selected, sorted(selected.keys(), key=len, reverse=True) def normalize_expr(expr: Any) -> str: if pd.isna(expr): return "" text = str(expr).strip() text = text.replace("(", "(").replace(")", ")") text = text.replace(";", ";").replace(",", ",") text = re.sub(r"\s+", "", text) text = re.sub(r"(?i)\bSum\s*\(", "(", text) text = text.replace(";", "+") return text def parse_expression(expr: Any, metric_index: dict[str, Metric], metric_names: list[str]) -> tuple[str, dict[str, Metric], list[str]]: text = normalize_expr(expr) if not text: return "", {}, [] used: dict[str, Metric] = {} token_expr = text for idx, name in enumerate(metric_names): if name and name in token_expr: token = f"m{idx}" token_expr = token_expr.replace(name, token) used[token] = metric_index[name] leftovers = re.sub(r"m\d+", " ", token_expr) leftovers = re.sub(r"[0-9eE\.\+\-\*/\(\)\s,]+", " ", leftovers) missing = [x.strip() for x in re.split(r"\s+", leftovers) if x.strip()] return token_expr, used, missing def eval_expression(expr: Any, metric_index: dict[str, Metric], metric_names: list[str]) -> tuple[pd.Series | None, list[str], list[Metric]]: token_expr, used, missing = parse_expression(expr, metric_index, metric_names) if not token_expr or not used: return None, missing, [] local_env = {token: metric.series for token, metric in used.items()} try: value = eval(token_expr, {"__builtins__": {}}, local_env) except Exception: return None, missing or [token_expr], list(used.values()) if isinstance(value, (int, float, np.number)): return None, missing, list(used.values()) series = pd.to_numeric(value, errors="coerce").dropna().sort_index() return (series if not series.empty else None), missing, list(used.values()) def circular_day_diff(day_values: np.ndarray, target_day: int) -> np.ndarray: diff = np.abs(day_values - target_day) return np.minimum(diff, 366 - diff) def trimmed_values(values: pd.Series, trim: float = DEFAULT_BAND_TRIM) -> pd.Series: values = pd.to_numeric(values, errors="coerce").dropna().sort_values() if values.empty or trim <= 0: return values cut = int(len(values) * trim) if len(values) - cut * 2 < 4: return values return values.iloc[cut : len(values) - cut] def seasonal_band_sample(history: pd.Series, day: int, window: int = DEFAULT_BAND_WINDOW) -> tuple[pd.Series, int]: history = pd.to_numeric(history, errors="coerce").dropna().sort_index() if history.empty: return history, window days = history.index.dayofyear.to_numpy() sample = history[circular_day_diff(days, day) <= window] used_window = window if len(sample) < 8: sample = history[circular_day_diff(days, day) <= 30] used_window = 30 if len(sample) < 8: sample = history used_window = 366 return sample, used_window def band_score(latest_value: float, history: pd.Series, day: int, inverse: bool = False) -> dict[str, Any] | None: sample, used_window = seasonal_band_sample(history, day) sample = trimmed_values(sample) if len(sample) < 4: return None mean = float(sample.mean()) std = float(sample.std(ddof=0)) if not math.isfinite(std): std = 0.0 low = mean - DEFAULT_BAND_K * std high = mean + DEFAULT_BAND_K * std if not math.isfinite(low) or not math.isfinite(high): return None if high == low: raw_score = 100.0 if latest_value >= high else 0.0 elif latest_value >= high: raw_score = 100.0 elif latest_value <= low: raw_score = 0.0 else: raw_score = 100.0 * (latest_value - low) / (high - low) score = 100.0 - raw_score if inverse else raw_score return { "score": max(0.0, min(100.0, score)), "band_low": low, "band_high": high, "band_mean": mean, "band_window": used_window, "band_k": DEFAULT_BAND_K, "band_trim": DEFAULT_BAND_TRIM, } def seasonal_percentile(series: pd.Series) -> dict[str, Any] | None: series = pd.to_numeric(series, errors="coerce").dropna().sort_index() if series.empty: return None latest_date = series.index.max() latest_value = float(series.loc[latest_date]) history = series[series.index < latest_date].dropna() if history.empty: return None day = int(latest_date.dayofyear) day_diff = np.abs(history.index.dayofyear - day) sample = history[day_diff <= 15] window = 15 if len(sample) < 8: sample = history[day_diff <= 30] window = 30 if len(sample) < 8: sample = history window = 366 less = float((sample < latest_value).sum()) equal = float((sample == latest_value).sum()) raw_pct = 100.0 * (less + 0.5 * equal) / len(sample) latest_year = int(latest_date.year) yearly: dict[str, list[tuple[str, float]]] = {} clipped = series[series.index.year >= latest_year - 8] for dt, val in clipped.items(): yearly.setdefault(str(int(dt.year)), []).append((dt.strftime("%m-%d"), float(val))) return { "latest_date": latest_date.strftime("%Y-%m-%d"), "latest_value": latest_value, "raw_percentile": round(raw_pct, 1), "sample_size": int(len(sample)), "window": window, "years": yearly, "chart_transform": "linear_z_score_in_browser", } def commodity_boards(name: str) -> list[str]: boards = [board for board, names in BOARD_MAP.items() if name in names] return boards or ["其他"] def score_series_at(series: pd.Series, date: pd.Timestamp, inverse: bool = False) -> float | None: series = pd.to_numeric(series, errors="coerce").dropna().sort_index() available = series[series.index <= date] if available.empty: return None value_date = available.index.max() value = float(available.loc[value_date]) history = series[series.index < value_date].dropna() if history.empty: return None band = band_score(value, history, int(value_date.dayofyear), inverse) if band: return float(band["score"]) sample, _ = seasonal_band_sample(history, int(value_date.dayofyear)) if sample.empty: return None less = float((sample < value).sum()) equal = float((sample == value).sum()) raw = 100.0 * (less + 0.5 * equal) / len(sample) return 100.0 - raw if inverse else raw def build_score_history(dim_series: dict[str, pd.Series]) -> list[dict[str, Any]]: available = [s.dropna().sort_index() for s in dim_series.values() if s is not None and not s.dropna().empty] if not available: return [] latest = max(s.index.max() for s in available) start = pd.Timestamp(HISTORY_START_DEFAULT) if latest < start: start = min(s.index.min() for s in available) dates = pd.date_range(start=start, end=latest, freq="D") history: list[dict[str, Any]] = [] for date in dates: scores: dict[str, float] = {} for dim, series in dim_series.items(): score = score_series_at(series, date, dim in INVERSE_DIMS) if score is not None and math.isfinite(score): scores[dim] = score if not scores: continue dims_with_demand = [d for d in DIMENSIONS if d in scores] dims_without_demand = [d for d in DIMENSIONS if d != "需求" and d in scores] total = float(np.mean([scores[d] for d in dims_with_demand])) if dims_with_demand else None total_no_demand = float(np.mean([scores[d] for d in dims_without_demand])) if dims_without_demand else None history.append( { "date": date.strftime("%Y-%m-%d"), "total": round(total, 1) if total is not None else None, "totalNoDemand": round(total_no_demand, 1) if total_no_demand is not None else None, } ) return history def make_records() -> dict[str, Any]: metrics = [] metrics.extend(read_metrics(WIND_FILE, "wind", 1, 3, 2, 8, "Wind")) metrics.extend(read_metrics(STEEL_FILE, "钢联", 1, 2, 5, 10, "钢联")) metric_index, metric_names = build_metric_index(metrics) confirm = pd.read_excel(CONFIRM_FILE).fillna("") commodities: list[dict[str, Any]] = [] for idx, row in confirm.iterrows(): name = str(row["品种"]).strip() if not name: continue dims: dict[str, Any] = {} dim_series: dict[str, pd.Series] = {} valid_scores: list[float] = [] for dim in DIMENSIONS: series, missing, used_metrics = eval_expression(row.get(dim, ""), metric_index, metric_names) if series is not None: dim_series[dim] = series stat = seasonal_percentile(series) if series is not None else None if stat: latest_dt = pd.Timestamp(stat["latest_date"]) history = series[series.index < latest_dt].dropna() band = band_score(stat["latest_value"], history, int(latest_dt.dayofyear), dim in INVERSE_DIMS) if band: score = band["score"] stat.update(band) else: raw = stat["raw_percentile"] score = 100.0 - raw if dim in INVERSE_DIMS else raw stat["score"] = round(score, 1) stat["direction"] = "低库存更优,按正常区间反向计分" if dim in INVERSE_DIMS else "高于正常区间更优" stat["expr"] = str(row.get(dim, "")).strip() stat["metrics"] = [ {"name": m.name, "source": m.source, "unit": m.unit, "freq": m.freq} for m in used_metrics ] valid_scores.append(float(score)) dims[dim] = stat or { "score": None, "raw_percentile": None, "direction": "缺数据", "expr": str(row.get(dim, "")).strip(), "missing": missing, "metrics": [{"name": m.name, "source": m.source, "unit": m.unit, "freq": m.freq} for m in used_metrics], } total = round(float(np.mean(valid_scores)), 1) if valid_scores else None boards = commodity_boards(name) commodities.append( { "rank": None, "name": name, "board": " / ".join(boards), "boards": boards, "total": total, "valid_count": len(valid_scores), "dims": dims, "scoreHistory": build_score_history(dim_series), } ) ranked = sorted([c for c in commodities if c["total"] is not None], key=lambda x: x["total"], reverse=True) for i, item in enumerate(ranked, 1): item["rank"] = i no_score = [c for c in commodities if c["total"] is None] commodities = ranked + no_score return { "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "source_files": { "wind": WIND_FILE.name, "steel": STEEL_FILE.name, "confirm": CONFIRM_FILE.name, }, "dimensions": DIMENSIONS + ["月差"], "boards": sorted(set(board for c in commodities for board in c.get("boards", [c["board"]]))), "commodities": commodities, } def html_template(data: dict[str, Any]) -> str: payload = json.dumps(data, ensure_ascii=False, allow_nan=False) return f"""