Source code for analysis.optimize_psi_results

# -*- coding: utf-8 -*-
"""
PSI Step 9 Optimization (Deduplication) - Robust Implementation

Supports two methods for identifying "identical" Ψ identifications:
  (1) round mode: Group by rounding to decimal places
  (2) tolerance mode: Group by binning with tolerance (±t_tol, ±mp_tol, ±r_tol)  ← RECOMMENDED

Features:
  - Fixed 14-column output format matching paper specification
  - Detailed reporting of 9.1/9.2 counts before/after
  - Per-mainshock identification counts

Usage Example (tolerance mode, RECOMMENDED):
    python optimize_psi_results.py \
        --input testopt.ou4a --output optimized_psi.ou4 \
        --mode tolerance --t-tol 0.03 --mp-tol 0.05 --r-tol 0.25

Usage Example (round mode, more conservative):
    python optimize_psi_results.py \
        --input testopt.ou4a --output optimized_psi.ou4 \
        --mode round --t-decimals 2 --mp-decimals 1 --r-decimals 0

Algorithm Steps (from Christophersen et al. 2024, SRL 95(6):3464-3481 Section 2.3):
    Step 9.1: For identical (eq_name, tmin, tp, mp) → keep max sloperatio (ties: min ap)
    Step 9.2: For identical (eq_name, tp, r) → keep min ap
"""

import argparse
from collections import OrderedDict
from typing import Tuple

import numpy as np
import pandas as pd

# === Paper-specified 14 columns (fixed output order) ===
CANONICAL_14 = [
    "eq_name", "mm", "mp", "mminus", "sloperatio",
    "tmin", "time_onset", "tmax", "tp", "ap",
    "rn_max", "rn_min", "re_max", "re_min",
]


# -------------------------------
# File reading and column normalization
# -------------------------------
def _read_ou4a_like(input_file: str) -> pd.DataFrame:
    """
    Read first 14 columns from .ou4a file and assign CANONICAL_14 column names.

    Does not rely on file's built-in column names to avoid misalignment from
    debug columns or inconsistent naming.

    Args:
        input_file: Path to .ou4a file (str)

    Returns:
    pd.DataFrame
        DataFrame with 14 standardized columns and proper types

    Raises:
    ValueError
        If file has fewer than 14 columns
    """
    raw = pd.read_csv(input_file, sep=r"\s+", header=None, dtype=str)
    if raw.shape[1] < 14:
        raise ValueError(f"{input_file} has {raw.shape[1]} columns < 14, incorrect format.")

    df = raw.iloc[:, :14].copy()
    df.columns = CANONICAL_14

    # Type conversion (eq_name remains string)
    for c in CANONICAL_14[1:]:
        df[c] = pd.to_numeric(df[c], errors="coerce")
    df["eq_name"] = df["eq_name"].astype(str).str.strip()
    return df


def _ensure_canonical_columns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Ensure DataFrame contains only CANONICAL_14 columns in fixed order.

    Supports legacy column name 'time_mina' -> 'time_onset' for compatibility.

    Args:
        df: Input DataFrame (pd.DataFrame)

    Returns:
    pd.DataFrame
        DataFrame with standardized column order

    Raises:
    ValueError
        If required columns are missing
    """
    cols = list(df.columns)
    if "time_onset" not in cols and "time_mina" in cols:
        df = df.rename(columns={"time_mina": "time_onset"})
    missing = [c for c in CANONICAL_14 if c not in df.columns]
    if missing:
        raise ValueError(f"Step 9 missing required columns: {missing}")
    return df.reindex(columns=CANONICAL_14).copy()


# -------------------------------
# Step 9.1/9.2: Two grouping implementations
# -------------------------------
def _run_once_round(df_in: pd.DataFrame,
                    t_decimals: int, mp_decimals: int, r_decimals: int) -> Tuple[pd.DataFrame, int, int]:
    """
    Round mode: discretize keys by rounding to decimal places, then apply Steps 9.1/9.2.

    Args:
        df_in: Input Ψ identifications (pd.DataFrame)
        t_decimals: Decimal places for time rounding (int)
        mp_decimals: Decimal places for magnitude rounding (int)
    r_decimals : int
        Decimal places for slope ratio rounding

    Returns:
    tuple
        (df_after_9.2, count_after_9.1, count_after_9.2)
    """
    df1 = df_in.copy()
    # Quantize by rounding
    df1["tmin_q"] = np.round(df1["tmin"].astype(float), t_decimals)
    df1["tp_q"]   = np.round(df1["tp"].astype(float),   t_decimals)
    df1["mp_q"]   = np.round(df1["mp"].astype(float),   mp_decimals)
    df1["r_q"]    = np.round(df1["sloperatio"].astype(float), r_decimals)

    # Step 9.1: For same (eq_name, tmin_q, tp_q, mp_q) → max sloperatio; ties: min ap
    g91 = ["eq_name", "tmin_q", "tp_q", "mp_q"]
    df_91 = (
        df1.sort_values(by=["sloperatio", "ap"], ascending=[False, True])
           .groupby(g91, as_index=False, sort=False)
           .head(1)
    )
    n91 = len(df_91)

    # Step 9.2: For same (eq_name, tp_q, r_q) → min ap
    g92 = ["eq_name", "tp_q", "r_q"]
    df_92 = (
        df_91.sort_values(by=["ap"], ascending=True)
             .groupby(g92, as_index=False, sort=False)
             .head(1)
    )
    n92 = len(df_92)

    # Remove auxiliary columns
    df_92 = df_92.drop(columns=["tmin_q", "tp_q", "mp_q", "r_q"], errors="ignore")
    return df_92, n91, n92


def _bin_with_tolerance(x: np.ndarray, tol: float) -> np.ndarray:
    """
    Bin values with tolerance: discretize using bin width tol.

    Equivalent to round(x/tol) * tol, but uses floor((x/tol)+0.5)*tol
    to avoid system-dependent rounding at 0.5 boundaries.

    Args:
        x: Input values (np.ndarray)
        tol: Tolerance (bin width) (float)

    Returns:
    np.ndarray
        Binned values
    """
    x = np.asarray(x, float)
    return np.floor((x / tol) + 0.5) * tol


def _run_once_tolerance(df_in: pd.DataFrame,
                        t_tol: float, mp_tol: float, r_tol: float) -> Tuple[pd.DataFrame, int, int]:
    """
    Tolerance mode: discretize keys by binning with tolerance, then apply Steps 9.1/9.2.

    Args:
        df_in: Input Ψ identifications (pd.DataFrame)
        t_tol: Time tolerance (years) (float)
        mp_tol: Magnitude tolerance (float)
    r_tol : float
        Slope ratio tolerance

    Returns:
    tuple
        (df_after_9.2, count_after_9.1, count_after_9.2)
    """
    df1 = df_in.copy()
    df1["tmin_b"] = _bin_with_tolerance(df1["tmin"].values, t_tol)
    df1["tp_b"]   = _bin_with_tolerance(df1["tp"].values,   t_tol)
    df1["mp_b"]   = _bin_with_tolerance(df1["mp"].values,   mp_tol)
    df1["r_b"]    = _bin_with_tolerance(df1["sloperatio"].values, r_tol)

    # Step 9.1: For same (eq_name, tmin_b, tp_b, mp_b) → max sloperatio; ties: min ap
    g91 = ["eq_name", "tmin_b", "tp_b", "mp_b"]
    df_91 = (
        df1.sort_values(by=["sloperatio", "ap"], ascending=[False, True])
           .groupby(g91, as_index=False, sort=False)
           .head(1)
    )
    n91 = len(df_91)

    # Step 9.2: For same (eq_name, tp_b, r_b) → min ap
    g92 = ["eq_name", "tp_b", "r_b"]
    df_92 = (
        df_91.sort_values(by=["ap"], ascending=True)
             .groupby(g92, as_index=False, sort=False)
             .head(1)
    )
    n92 = len(df_92)

    # Remove auxiliary columns
    df_92 = df_92.drop(columns=["tmin_b", "tp_b", "mp_b", "r_b"], errors="ignore")
    return df_92, n91, n92


# -------------------------------
# Main function
# -------------------------------
[docs] def optimize_psi_results( input_file: str = "psi.ou4a", output_file: str = "optimized_psi.ou4", # Round mode parameters t_decimals: int = 3, mp_decimals: int = 1, r_decimals: int = 1, # Auto-relax precision (only for round mode; tolerance mode usually doesn't need this) auto_relax: bool = True, verbose: bool = True, # Mode selection mode: str = "round", # "round" or "tolerance" # Tolerance mode parameters (years, magnitude, slope ratio) t_tol: float = 0.03, # ≈ 11 days mp_tol: float = 0.05, r_tol: float = 0.25, ): """ Apply Step 9 two-stage deduplication to Ψ identification results. This implements the deduplication procedure from Christophersen et al. (2024, SRL 95(6):3464-3481): Step 9.1: For same (eq_name, tmin, tp, mp) → keep max sloperatio (ties: min ap) Step 9.2: For same (eq_name, tp, r) → keep min ap Two discretization modes: - mode="round": Discretize by rounding to decimal places (simple, controllable) - mode="tolerance": Discretize by binning with tolerance (RECOMMENDED, closer to physical resolution) Output format: Fixed 14 columns in CANONICAL_14 order. Args: input_file: Input .ou4a file with raw Ψ identifications output_file: Output .ou4 file with deduplicated results t_decimals: Decimal places for time (round mode) mp_decimals: Decimal places for magnitude (round mode) r_decimals: Decimal places for slope ratio (round mode) auto_relax: Automatically relax precision if no reduction (round mode only) verbose: Print detailed progress information mode: "round" or "tolerance" t_tol: Time tolerance in years (tolerance mode, default 0.03 ≈ 11 days) mp_tol: Magnitude tolerance (tolerance mode) r_tol: Slope ratio tolerance (tolerance mode) Returns: Tuple (n_initial, n_after_9.1, n_after_9.2) - counts at each stage """ df = _read_ou4a_like(input_file) n0 = len(df) if verbose: print("=== PSI Result Optimization (Step 9) ===") print("Following the exact algorithm from the paper") print(f"Read {n0} raw Ψ identification records") print(f"Mode: {mode}") # ---- Execute Steps 9.1/9.2 ---- if mode.lower() == "tolerance": df2, n91, n92 = _run_once_tolerance(df, t_tol=t_tol, mp_tol=mp_tol, r_tol=r_tol) else: df2, n91, n92 = _run_once_round(df, t_decimals, mp_decimals, r_decimals) # Auto-relax precision for round mode (fault tolerance) if auto_relax and n91 == n0 and n92 == n91: if verbose: print("\n⚠️ No reduction in both steps, enabling auto-relax precision retry...") tried = OrderedDict() for (td, md, rd) in [(2, mp_decimals, r_decimals), (2, 0, r_decimals), (2, 0, 0)]: key = (td, md, rd) if key in tried: continue tried[key] = True df_try, n91_try, n92_try = _run_once_round(df, td, md, rd) if verbose: print(f" → Precision (t={td}, mp={md}, r={rd}): after 9.1={n91_try}, after 9.2={n92_try}") if (n91_try < n0) or (n92_try < n91_try): df2, n91, n92 = df_try, n91_try, n92_try t_decimals, mp_decimals, r_decimals = td, md, rd break if verbose: print("\nStep 9.1: For identical tmin, tp, mp → keep max sloperatio...") print(f" After first filter: {n91} records") print("\nStep 9.2: For identical tp, sloperatio → keep min ap...") print(f" After second filter: {n92} records") # ---- Fixed 14-column output ---- df2 = _ensure_canonical_columns(df2) df2["eq_name"] = df2["eq_name"].astype(str).str.strip() df2.to_csv(output_file, sep=" ", header=False, index=False, float_format="%.4f") if verbose: print(f"\nAfter optimization: {df2['eq_name'].nunique()} earthquakes have Ψ identifications") print("Number of Ψ identifications per earthquake (top 10):") print(df2["eq_name"].value_counts().head(10)) print(f"\nSaved to {output_file} (fixed 14 columns)") return n0, n91, n92