Source code for analysis.plot_relations

# -*- coding: utf-8 -*-
"""
Ψ Phenomenon Scaling Relations Analysis

This module analyzes the scaling relations from Ψ identifications using
fixed-effects regression to remove within-mainshock AP-TP trade-offs.

Based on:
    - Christophersen et al. (2024, SRL 95(6):3464-3481) Section 2.4
    - The paper Section 5.2 (two-stage estimation)

Scaling Relations:
    1. log₁₀(AP) vs MP  (precursor area vs precursor magnitude)
    2. log₁₀(TP) vs MP  (precursor time vs precursor magnitude)
    3. Mm vs MP         (mainshock magnitude vs precursor magnitude)
"""
import numpy as np
import pandas as pd
import math

# Matplotlib: use basic interface only
import matplotlib.pyplot as plt

# Use scipy.stats.linregress if available; fallback to numpy version
try:
    from scipy import stats
except Exception:
    stats = None

# ---- Shared: Column names ----
CANONICAL_14 = [
    "eq_name","mm","mp","mminus","sloperatio",
    "tmin","time_onset","tmax","tp","ap",
    "rn_max","rn_min","re_max","re_min"
]

# ---- Utility: Check if string is numeric ----
def _isfloat(v):
    """Check if value can be converted to float."""
    try:
        float(v); return True
    except Exception:
        return False

# ---- File reading: Compatible with 14 or wider columns ----
def _read_psi_any_layout(psi_file: str) -> pd.DataFrame:
    """
    Read Ψ identification file with 14 or more columns.

    Compatible with 14-column (canonical) and wider .ou4 files.
    Only keeps first 14 columns, converts to standard names, filters out ap<=0 or tp<=0.

    Args:
        psi_file: Path to .ou4 file

    Returns:
        DataFrame with 14 standardized columns

    Raises:
        ValueError: If file has fewer than 14 columns
    """
    raw = pd.read_csv(psi_file, sep=r"\s+", header=None, dtype=str)
    ncol = raw.shape[1]
    if ncol < 14:
        raise ValueError(f"{psi_file} has {ncol} columns < 14, incorrect format.")

    # Assume first 14 columns are canonical
    df = raw.iloc[:, :14].copy()
    df.columns = CANONICAL_14

    # If eq_name is mostly numeric, find least numeric column for name
    sample = df["eq_name"].astype(str).head(100)
    num_ratio = sum(1 for v in sample if _isfloat(v)) / max(1, len(sample))
    if num_ratio > 0.95:
        best_j = 0; best_nonnum = -1
        for j in range(min(ncol, 20)):
            s = raw.iloc[:, j].astype(str).head(100)
            nonnum = sum(1 for v in s if not _isfloat(v))
            if nonnum > best_nonnum:
                best_nonnum = nonnum; best_j = j
        name_col = raw.iloc[:, best_j].astype(str)
        num_cols = raw.drop(raw.columns[best_j], axis=1).iloc[:, :13]
        num_cols.columns = [c for c in CANONICAL_14 if c != "eq_name"]
        df = pd.concat([name_col, num_cols], axis=1)
        df.columns = CANONICAL_14

    # Type conversion
    for c in CANONICAL_14[1:]:
        df[c] = pd.to_numeric(df[c], errors="coerce")

    # Filter invalid rows
    df = df[(df["ap"] > 0) & (df["tp"] > 0)].copy()
    df["eq_name"] = df["eq_name"].astype(str).str.strip()
    return df

# ---- Fixed-effects (common slope) estimation; fallback to pooled OLS if denominator is 0 ----
[docs] def _fixed_effects_slope_safe(x, y, group, min_group_size=2): """ Estimate common slope using fixed-effects regression (within-group demeaning). Removes within-mainshock trade-off by demeaning x and y within each mainshock, then computing pooled slope across all demeaned observations. Args: x: Independent variable (array-like) y: Dependent variable (array-like) group: Group identifiers (mainshock names, array-like) min_group_size: Minimum group size to include (default: 2) Returns: Common slope estimate (b_AT or b_TA). Falls back to pooled OLS if denominator is zero or no groups meet minimum size. """ tmp = pd.DataFrame({'x': x, 'y': y, 'g': group}) sz = tmp.groupby('g')['g'].transform('size') tmp = tmp[sz >= min_group_size].copy() if len(tmp) == 0: # Fallback to pooled OLS x_all = np.asarray(x, float); y_all = np.asarray(y, float) den = np.sum((x_all - x_all.mean())**2) return float(np.sum((x_all - x_all.mean())*(y_all - y_all.mean())) / den) if den>0 else np.nan x_dm = tmp['x'] - tmp.groupby('g')['x'].transform('mean') y_dm = tmp['y'] - tmp.groupby('g')['y'].transform('mean') den = float(np.sum(x_dm**2)) if den <= 0: x_all = np.asarray(x, float); y_all = np.asarray(y, float) den2 = np.sum((x_all - x_all.mean())**2) return float(np.sum((x_all - x_all.mean())*(y_all - y_all.mean())) / den2) if den2>0 else np.nan num = float(np.sum(x_dm * y_dm)) return num / den
# ---- Numpy version of linregress (if no SciPy) ---- def _linregress_np(x, y): """ Simple linear regression using NumPy (fallback when SciPy unavailable). Args: x: Independent variable (array-like) y: Dependent variable (array-like) Returns: Result object with slope, intercept, rvalue attributes. Returns None if insufficient data. """ x = np.asarray(x, float); y = np.asarray(y, float) mask = np.isfinite(x) & np.isfinite(y) x = x[mask]; y = y[mask] if len(x) < 2: return None xm = x.mean(); ym = y.mean() cov = np.sum((x - xm)*(y - ym)) var = np.sum((x - xm)**2) if var <= 0: return None slope = cov / var intercept = ym - slope*xm r_num = cov r_den = np.sqrt(var*np.sum((y - ym)**2)) r = r_num / r_den if r_den>0 else np.nan class Res: pass res = Res() res.slope = float(slope) res.intercept = float(intercept) res.rvalue = float(r) res.pvalue = np.nan res.stderr = np.nan return res # ---- For plotting: 95% prediction interval (accurate version) ---- def prediction_interval(x_data, x_plot_range, slope, intercept, std_err): """ Compute 95% prediction interval for linear regression. Args: x_data: Original x values used for fitting (array-like) x_plot_range: X coordinates for plotting (array-like) slope: Regression slope (float) intercept: Regression intercept (float) std_err: Standard error of residuals, sigma = sqrt(SSE/(n-2)) (float) Returns: Tuple (y_pred, y_low, y_high) - predicted values and 95% prediction interval bounds. Returns (y_pred, None, None) if insufficient data. """ x_data = np.asarray(x_data, float) x_plot_range = np.asarray(x_plot_range, float) n = len(x_data) if n < 2: return None, None, None x_mean = np.mean(x_data) y_pred = intercept + slope * x_plot_range denom = np.sum((x_data - x_mean)**2) if denom <= 0: return y_pred, None, None se_pred = std_err * np.sqrt(1.0 + 1.0/n + ((x_plot_range - x_mean)**2)/denom) z = 1.96 # 95% confidence level return y_pred, y_pred - z*se_pred, y_pred + z*se_pred # ---- Main function: File reading, fixed-effects projection, regression, plotting ----
[docs] def analyze_scaling_relations(psi_file="optimized_psi.ou4", output_prefix="scaling_final"): """ Analyze Ψ phenomenon scaling relations using fixed-effects regression. This function implements the two-stage estimation procedure to estimate initial values for EEPAS model parameters. It removes the within-mainshock AP-TP trade-off using fixed-effects regression, then fits scaling relations. Args: psi_file: Input file with deduplicated Ψ identifications (.ou4 format) output_prefix: Prefix for output files (plots and CSV) Algorithm: 1. Read all Ψ identifications from file 2. Convert units (years → days, km²) and compute logarithms 3. Estimate fixed-effects common slopes (b_AT, b_TA) to remove trade-off 4. Project each mainshock's identifications to representative point 5. Fit linear regressions on representative points: - log₁₀(AP) vs MP → b_A (spatial scaling), sigma_A - log₁₀(TP) vs MP → b_T, a_T (temporal scaling), sigma_T - Mm vs MP → b_M, a_M (magnitude scaling), sigma_M 6. Generate plots with 95% prediction intervals Output Files: - {output_prefix}_projected_points.csv: Representative points per mainshock - {output_prefix}_mp_relations.png: Plots of AP and TP vs MP (semi-log) - {output_prefix}_mm_mp.png: Plot of Mm vs MP (linear scale) Returns: None (writes output files directly) Note: Estimated parameters (b_A, sigma_A, a_T, b_T, sigma_T, a_M, b_M, sigma_M) can be used as initial values for EEPAS learning. References: Christophersen et al. (2024). Algorithmic Identification of the Precursory Scale Increase Phenomenon in Earthquake Catalogs. SRL 95(6):3464-3481. """ print("="*60) print("=== Ψ Scaling Relations Analysis (Projected Means + Plots) ===") # 1) Read file (all identifications) df = _read_psi_any_layout(psi_file) print(f"✅ Successfully read {len(df)} total Ψ identification records.") n_eq = df['eq_name'].nunique() print(f"ℹ️ Number of mainshocks (unique eq_name) = {n_eq}") print("ℹ️ Number of identifications per mainshock (top 10):") print(df['eq_name'].value_counts().head(10)) # 2) Unit conversion and logarithms df['tp_days'] = df['tp'].astype(float) * 365.25 df = df[(df['ap'] > 0) & (df['tp_days'] >= 1.0)].copy() if len(df) == 0: print("⚠️ All rows filtered out (ap<=0 or tp_days<1).") return df['log_ap_km2'] = np.log10(df['ap'].astype(float)) df['log_tp'] = np.log10(df['tp_days'].astype(float)) # 3) Fixed-effects common slope (remove trade-off) b_AT = _fixed_effects_slope_safe(df['log_tp'].values, df['log_ap_km2'].values, df['eq_name'].values, min_group_size=2) b_TA = _fixed_effects_slope_safe(df['log_ap_km2'].values, df['log_tp'].values, df['eq_name'].values, min_group_size=2) xbar = float(df['log_tp'].mean()) ybar = float(df['log_ap_km2'].mean()) # 4) Generate representative points per mainshock (projected means) using Eqs (13)(14) rows = [] mm_first = df.groupby('eq_name', sort=False)['mm'].first() mp_mean = df.groupby('eq_name', sort=False)['mp'].mean() for g, dfg in df.groupby('eq_name', sort=False): x_i = float(dfg['log_tp'].mean()) y_i = float(dfg['log_ap_km2'].mean()) a_i = y_i - b_AT * x_i logA_p = a_i + b_AT * xbar # Eq (13): projected mean of log10(A) a_i_p = x_i - b_TA * y_i logT_p = a_i_p + b_TA * ybar # Eq (14): projected mean of log10(T) rows.append({ 'eq_name': g, 'mm': float(mm_first.loc[g]), 'mp': float(mp_mean.loc[g]), 'log_ap_km2_proj': float(logA_p), 'log_tp_proj': float(logT_p), }) proj = pd.DataFrame(rows) print(f"✅ Generated {len(proj)} mainshock representative points with trade-off removed (should equal number of mainshocks).") proj.to_csv(f"{output_prefix}_projected_points.csv", index=False) # 5) Linear regression (using projected representative points) def _linreg(x, y): """Perform linear regression and compute residual standard error.""" x = np.asarray(x, float); y = np.asarray(y, float) mask = np.isfinite(x) & np.isfinite(y) x = x[mask]; y = y[mask] if len(x) < 2: return None, None if stats is not None: res = stats.linregress(x, y) slope, intercept, r2, pval = res.slope, res.intercept, res.rvalue**2, getattr(res, "pvalue", np.nan) else: res = _linregress_np(x, y) slope, intercept, r2, pval = res.slope, res.intercept, res.rvalue**2, np.nan # Residual standard deviation (for prediction interval) yhat = intercept + slope * x sse = float(np.sum((y - yhat)**2)) std_err = math.sqrt(sse / max(1, len(x) - 2)) return (slope, intercept, r2, pval, std_err), mask # A: log10(A[km2]) vs MP regA, _ = _linreg(proj['mp'], proj['log_ap_km2_proj']) # T: log10(T[days]) vs MP regT, _ = _linreg(proj['mp'], proj['log_tp_proj']) # M: Mm vs MP regM, _ = _linreg(proj['mp'], proj['mm']) print("\n--- Initial Value Estimates for MLE ---") if regA: slope_A, intercept_A, r2_A, p_A, std_err_A = regA # Calculate sigma_A initial value using simplified estimation method c_for_sigma_A = 0.33 a_A = intercept_A sigma_A_initial = c_for_sigma_A * (10**(a_A / 2.0)) print(f"Relation 1 (Area): log10(A_P[km²]) = {intercept_A:.4f} + {slope_A:.4f}*M_P " f"(R² = {r2_A:.4f}, p-value = {p_A:.6f})") print(f"sigma_A initial value (from simplified estimation): {sigma_A_initial:.4f}") print(f" (Formula: 0.33 * 10^(a_A / 2), where a_A = {a_A:.4f})") else: print("sigma_A initial value: Insufficient samples.") if regT: slope_T, intercept_T, r2_T, p_T, std_err_T = regT print(f"Relation 2 (Time): log10(T_P[days]) = {intercept_T:.4f} + {slope_T:.4f}*M_P " f"(R² = {r2_T:.4f}, p-value = {p_T:.6f})") print(f"sigma_T initial value (residual std from log(T_p) vs M_P): {std_err_T:.4f}") else: print("sigma_T initial value: Insufficient samples.") if regM: slope_M, intercept_M, r2_M, p_M, std_err_M = regM print(f"Relation 3 (Mag): M_m = {intercept_M:.4f} + {slope_M:.4f}*M_P " f"(R² = {r2_M:.4f}, p-value = {p_M:.6f})") print(f"sigma_M initial value (residual std from M_m vs M_P): {std_err_M:.4f}") else: print("sigma_M initial value: Insufficient samples.") # 6) Prepare plotting data # All identifications: use original df data_all = pd.DataFrame({ "mp": df["mp"].astype(float), "ap_1000km2": df["ap"].astype(float) / 1000.0, # Convert to 10^3 km² "tp_days": df["tp_days"].astype(float), "mm": df["mm"].astype(float) }) # Representative points per mainshock (after projection): convert from proj back to linear scale mean_data = pd.DataFrame({ "eq_name": proj["eq_name"].values, "mp": proj["mp"].astype(float).values, "ap_1000km2": (10.0**proj["log_ap_km2_proj"].astype(float).values) / 1000.0, "tp_days": 10.0**proj["log_tp_proj"].astype(float).values, "mm": proj["mm"].astype(float).values }) # 7) X range for prediction interval (based on MP range of representative points) if len(mean_data) >= 2: x_range_mp = np.linspace(mean_data['mp'].min() - 0.2, mean_data['mp'].max() + 0.2, 100) else: x_range_mp = np.linspace(df['mp'].min() - 0.2, df['mp'].max() + 0.2, 100) # 8) Plot (a), (b): semi-log y (fixed colors: gray=all, blue=representative, black=line) plt.figure(figsize=(15, 6)) # (a) MP vs AP (10^3 km²) — log y-axis ax1 = plt.subplot(1, 2, 1) ax1.scatter(data_all['mp'], data_all['ap_1000km2'], alpha=0.15, s=10, color='tab:gray', label='All Identifications') ax1.scatter(mean_data['mp'], mean_data['ap_1000km2'], alpha=0.8, s=25, color='tab:blue', label='Mean per Mainshock') if regA: y_fit_A, lower_A, upper_A = prediction_interval(mean_data['mp'], x_range_mp, slope_A, intercept_A, std_err_A) # Regression plotted on linear scale (original regression on log10(A[km²])) ax1.semilogy(x_range_mp, (10**y_fit_A)/1000.0, color='k', linewidth=2, label=f'$\\log_{{10}}A_P = {intercept_A:.2f} + {slope_A:.2f}M_P$') if lower_A is not None: ax1.semilogy(x_range_mp, (10**lower_A)/1000.0, color='k', linestyle='--', linewidth=1.5, label='95% PI') ax1.semilogy(x_range_mp, (10**upper_A)/1000.0, color='k', linestyle='--', linewidth=1.5) ax1.set_xlabel('$M_P$', fontsize=12) ax1.set_ylabel('$A_P$ (10³ km²)', fontsize=12) ax1.grid(True, which='both', ls='-', alpha=0.2) ax1.legend(loc='best', fontsize=10) ax1.set_title('(a) $A_P$ vs $M_P$', fontsize=14) # (b) MP vs TP (days) — log y-axis ax2 = plt.subplot(1, 2, 2) ax2.scatter(data_all['mp'], data_all['tp_days'], alpha=0.15, s=10, color='tab:gray', label='All Identifications') ax2.scatter(mean_data['mp'], mean_data['tp_days'], alpha=0.8, s=25, color='tab:blue', label='Mean per Mainshock') if regT: y_fit_T, lower_T, upper_T = prediction_interval(mean_data['mp'], x_range_mp, slope_T, intercept_T, std_err_T) ax2.semilogy(x_range_mp, 10**y_fit_T, color='k', linewidth=2, label=f'$\\log_{{10}}T_P = {intercept_T:.2f} + {slope_T:.2f}M_P$') if lower_T is not None: ax2.semilogy(x_range_mp, 10**lower_T, color='k', linestyle='--', linewidth=1.5, label='95% PI') ax2.semilogy(x_range_mp, 10**upper_T, color='k', linestyle='--', linewidth=1.5) ax2.set_xlabel('$M_P$', fontsize=12) ax2.set_ylabel('$T_P$ (days)', fontsize=12) ax2.grid(True, which='both', ls='-', alpha=0.2) ax2.legend(loc='best', fontsize=10) ax2.set_title('(b) $T_P$ vs $M_P$', fontsize=14) plt.tight_layout() plt.savefig(f"{output_prefix}_mp_relations.png", dpi=300, bbox_inches='tight') print(f"\nSaved M_P relation plots to {output_prefix}_mp_relations.png") # 9) Plot (c): Mm vs MP (linear y) plt.figure(figsize=(7, 6)) ax3 = plt.gca() ax3.scatter(data_all['mp'], data_all['mm'], alpha=0.15, s=10, color='tab:gray', label='All Identifications') ax3.scatter(mean_data['mp'], mean_data['mm'], alpha=0.8, s=25, color='tab:blue', label='Mean per Mainshock') if regM: # Extract regression parameters from regM tuple slope_M, intercept_M, _, _, std_err_M = regM y_fit_M, lower_M, upper_M = prediction_interval(mean_data['mp'], x_range_mp, slope_M, intercept_M, std_err_M) ax3.plot(x_range_mp, y_fit_M, color='k', linewidth=2, label=f'$M_m = {intercept_M:.2f} + {slope_M:.2f}M_P$') if lower_M is not None: ax3.plot(x_range_mp, lower_M, color='k', linestyle='--', linewidth=1.5, label='95% PI') ax3.plot(x_range_mp, upper_M, color='k', linestyle='--', linewidth=1.5) ax3.set_xlabel('$M_P$', fontsize=12) ax3.set_ylabel('$M_m$', fontsize=12) ax3.grid(True, alpha=0.2) ax3.legend(loc='best', fontsize=10) ax3.set_title('(c) $M_m$ vs $M_P$', fontsize=14) plt.tight_layout() plt.savefig(f"{output_prefix}_mm_mp.png", dpi=300, bbox_inches='tight') print(f"Saved M_m vs M_P plot to {output_prefix}_mm_mp.png")