Source code for utils.numerical_integration

"""
Numerical Integration Functions for EEPAS.

This module provides low-level numerical integration functions with Numba acceleration.

Functions:
    PPE Spatial Integration:
        - fast_kernel_sum_2d: Cauchy kernel summation (Numba parallel)
        - trapezoidal_2d: 2D trapezoidal integration (Numba parallel)

    EEPAS Magnitude Integration:
        - fast_magnitude_integral: Midpoint rectangular rule (Numba JIT)
        - magnitude_integral_accurate: quad_vec wrapper
"""

from typing import Union
import math
import numpy as np
from numba import jit, prange
from scipy.integrate import quad_vec


# ============================================================================
# PPE Spatial Integration: Shared Numba-Accelerated Functions
# ============================================================================

[docs] @jit(nopython=True, parallel=True, fastmath=True, cache=True) def fast_kernel_sum_2d(x_grid, y_grid, d, xj, yj, weights): """ Calculate PPE spatial kernel summation on 2D grid (vectorized version). Seismological Significance: For each grid point (x,y), calculate the Cauchy kernel summation from all historical earthquakes: h₀(x,y) = Σⱼ [wⱼ/(π(d²+rⱼ²))] Mathematical Principle: Cauchy kernel κ(r) = 1/(π(d²+r²)) is the spatial distribution function in PPE model. By summing contributions from all historical earthquakes, obtain spatial background rate. Optimization Strategy: 1. Numba JIT compilation: boost pure Python loop performance 2. Parallelization (prange): multi-core computation for grid points 3. fastmath: allow floating-point optimizations (tiny precision loss for speed gain) Args: x_grid, y_grid: 2D grid coordinate matrices (meshgrid format) [nx × ny] d: Characteristic distance (km) xj, yj: Historical earthquake location arrays [n_events] weights: Magnitude weight array wⱼ = a·(mⱼ-mT)·t_integral [n_events] Returns: numpy.ndarray: Kernel function summation on 2D grid [nx × ny] Usage Scenarios: - ppe_optimization.py: PPE Learning spatial integration - neg_log_like_aftershock.py: Fit Aftershock spatial integration - ppe_make_forecast.py: PPE Forecast grid integration """ nx, ny = x_grid.shape result = np.zeros((nx, ny)) # Parallelize grid point computation (each CPU core processes some grid points) for i in prange(nx): for j in range(ny): val = 0.0 # Sum Cauchy kernel for all historical earthquakes for k in range(len(xj)): dx = x_grid[i, j] - xj[k] dy = y_grid[i, j] - yj[k] # Cauchy kernel: wⱼ / [π(d²+r²)] val += weights[k] / (np.pi * (d**2 + dx**2 + dy**2)) result[i, j] = val return result
[docs] @jit(nopython=True, parallel=True, fastmath=True, cache=True) def trapezoidal_2d(f_values, dx, dy): """ 2D trapezoidal integration method. Mathematical Principle: Divide integration region into regular grid, approximate each small rectangle using trapezoidal rule: ∫∫ f(x,y) dx dy ≈ Σᵢⱼ [(f₀₀+f₁₀+f₀₁+f₁₁)/4] · Δx·Δy where f₀₀, f₁₀, f₀₁, f₁₁ are function values at the four vertices of rectangle Error Analysis: For smooth functions, error is O(Δx² + Δy²). PPE Cauchy kernel becomes relatively smooth after summing many earthquakes, trapezoidal method has good accuracy. Optimization Strategy: 1. Numba parallelization: multi-core computation for grid cell summation 2. Vectorization-friendly: can be used together with fast_kernel_sum_2d Args: f_values: 2D grid of function values (shape: nx × ny) dx, dy: Grid spacing (unit: km) Returns: float: Integral value (scalar) Accuracy Test Results: Trapezoidal 50×50 vs Gauss-Legendre 10×10: relative error 0.0000%, speed improvement 2.9x """ nx, ny = f_values.shape integral = 0.0 # Parallelize grid cell summation for i in prange(nx - 1): for j in range(ny - 1): # Trapezoidal rule: average of four vertices integral += 0.25 * (f_values[i, j] + f_values[i+1, j] + f_values[i, j+1] + f_values[i+1, j+1]) return integral * dx * dy
[docs] def ppe_spatial_integral_accurate(x_bounds, y_bounds, event_data, d, s, s_coeff, a_coeff=1.0): """ PPE spatial integration: accurate mode (dblquad adaptive integration). Seismological Significance: Calculate spatial integral of PPE background rate: ∫∫ h(x,y) dx dy where h(x,y) = a_coeff · [Σⱼ wⱼ/(π(d²+rⱼ²))] + s·s_coeff Mathematical Principle: Uses scipy.dblquad for adaptive double integration. Higher accuracy (relative error <0.001%) but slower (about 10-20x). Applicable Scenarios: - PPE Learning accurate integration mode - Fit Aftershock accurate integration mode - PPE Forecast accurate integration mode - Paper validation or scenarios requiring extremely high precision Args: x_bounds: (x_min, x_max) integration range y_bounds: (y_min, y_max) integration range event_data: Event list, each event is (weight, x, y) tuple where weight already includes all coefficients (e.g., (m-mT)·t_integral) d: PPE characteristic distance (km) s: PPE background rate s_coeff: Coefficient for s (e.g., mag_integral * t_sum) a_coeff: Coefficient for kernel summation term (default 1.0) - PPE Learning/Fit Aftershock: a_coeff = a (PPE parameter) - PPE Forecast: a_coeff = 1.0 (weight already includes a) Returns: float: Integral value (scalar) Note: This function is suitable for calling separately for each grid cell in a loop. Not suitable for overall vectorization (dblquad doesn't support it). """ from scipy.integrate import dblquad x_min, x_max = x_bounds y_min, y_max = y_bounds def integrand(y, x): """Integrand function h(x,y)""" val = 0.0 # PPE kernel summation term: a_coeff · Σⱼ [wⱼ/(π(d²+rⱼ²))] for weight, xj, yj in event_data: denom = d**2 + (x - xj)**2 + (y - yj)**2 val += a_coeff * weight / (np.pi * denom) # PPE background term: s·s_coeff val += s * s_coeff return val # dblquad adaptive integration result, error = dblquad( integrand, x_min, x_max, # x range y_min, y_max, # y range (can be a function) epsabs=1e-6, # absolute error tolerance epsrel=1e-3 # relative error tolerance ) return result
# ============================================================================ # Usage Notes # ============================================================================ # Functions provided by this module should be used as follows: # # 1. In ppe_optimization.py, neg_log_like_aftershock.py, etc.: # from utils.numerical_integration import fast_kernel_sum_2d, trapezoidal_2d # # # Then use these low-level functions in fast integration functions # kernel_grid = fast_kernel_sum_2d(x_grid, y_grid, d, xj, yj, weights) # integral = trapezoidal_2d(kernel_grid, dx, dy) # # 2. In eepas_likelihood.py, eepas_make_forecast.py: # from utils.numerical_integration import fast_magnitude_integral, magnitude_integral_accurate # # # Fast mode # result = fast_magnitude_integral(m1, m2, mee, am, bm, Sm, m0, B, magnitude_samples=20) # # # Accurate mode # result = magnitude_integral_accurate(m1, m2, mee, am, bm, Sm, m0, B) # # Advantages of this design: # - Eliminates code duplication (fast_kernel_sum_2d, trapezoidal_2d repeated in two files) # - Each file retains its own high-level logic and business rules # - Minimizes modification scope and complexity # ============================================================================ # EEPAS Magnitude Integration: Shared Functions # ============================================================================
[docs] @jit(nopython=True, cache=True) def fast_magnitude_integral(m1, m2, mee, am, bm, Sm, m0, B, magnitude_samples=20): """ EEPAS magnitude integration: fast mode (midpoint rectangular rule). Calculate magnitude integral: ∫[m1,m2] fGme(m, me) dm, for all me values Where: - gₑ(m|mₑ): Normal distribution N(am + bm·mₑ, Sm²) - Δ(m): Normalization factor (calculated using erf function) Fast Mode Characteristics: 1. Midpoint rectangular rule: divide integral interval equally, approximate using midpoint values 2. Numba acceleration: JIT compilation 3. Fixed sampling: predictable computational cost Error Analysis: 20 sample points: relative error ~3%, suitable for Forecast (speed priority) Args: m1, m2: Integration range (scalar) mee: Precursor magnitude array (must be array) am, bm, Sm: Magnitude distribution parameters m0: Completeness magnitude B: GR b-value magnitude_samples: Number of sample points (default 20) Returns: numpy.ndarray: Integration result array of length len(mee) """ n_events = len(mee) result = np.zeros(n_events) # Midpoint rectangular rule dm = (m2 - m1) / magnitude_samples sqrt_2 = np.sqrt(2.0) sqrt_2pi = np.sqrt(2.0 * np.pi) for i in range(n_events): me = mee[i] integral = 0.0 # Midpoint integration for j in range(magnitude_samples): m = m1 + (j + 0.5) * dm # Numerator: Gaussian PDF diff = (m - am - bm * me) / Sm numerator = (1.0 / (Sm * sqrt_2pi)) * np.exp(-0.5 * diff * diff) # Denominator: normalization factor (depends on m, not me!) # Use erf function to calculate cumulative distribution erf_arg = (m - am - bm * m0 - Sm**2 * B) / (sqrt_2 * Sm) denominator = 0.5 * (math.erf(erf_arg) + 1.0) # Safe division (avoid dividing by extremely small values) integral += numerator / np.maximum(denominator, 1e-100) result[i] = integral * dm return result
[docs] def magnitude_integral_accurate(m1, m2, mee, am, bm, Sm, m0, B): """ EEPAS magnitude integration: accurate mode (quad_vec vectorized adaptive integration). Uses scipy.quad_vec for adaptive integration, supports vectorization. Accurate Mode Characteristics: 1. Adaptive sampling: adjust integration points based on function characteristics 2. Vectorization: can calculate multiple integrals simultaneously 3. Extremely high precision: <0.001% Applicable Scenarios: - EEPAS Learning (parameter learning requires high precision) - Paper validation Args: m1, m2: Integration range (scalar) mee: Precursor magnitude array am, bm, Sm: Magnitude distribution parameters m0: Completeness magnitude B: GR b-value Returns: float or numpy.ndarray: Integral value (can be scalar or array, depending on input) """ if np.any(m1 >= m2): # Handle invalid integration range if np.isscalar(m1): return 0.0 else: result = np.zeros_like(m1) valid = m1 < m2 if not np.any(valid): return result # Only calculate valid range m1_valid = m1[valid] m2_valid = m2[valid] result[valid] = magnitude_integral_accurate(m1_valid, m2_valid, mee, am, bm, Sm, m0, B) return result sqrt_2pi_Sm = Sm * np.sqrt(2.0 * np.pi) def integrand(m): """Integrand function gₑ(m)/Δ(m)""" # gₑ(m): Normal distribution mag_diff = m - am - bm * mee g_e = (1.0 / sqrt_2pi_Sm) * np.exp(-0.5 * (mag_diff / Sm)**2) # Δ(m): GR distribution integral delta_m = np.exp(-B * (m - m0)) / B return g_e / delta_m # Use quad_vec for vectorized integration result, error = quad_vec(integrand, m1, m2) return result
# Note: Does not provide high-level unified interface (like eepas_magnitude_integral) # Each file directly calls fast_magnitude_integral or magnitude_integral_accurate as needed # ============================================================================ # Version Information # ============================================================================ __version__ = "0.4.0" __date__ = "2025-11-03" # Exported public interface __all__ = [ # PPE spatial integration: shared functions 'fast_kernel_sum_2d', # Fast mode: kernel summation (Numba) 'trapezoidal_2d', # Fast mode: trapezoidal integration (Numba) 'ppe_spatial_integral_accurate', # Accurate mode: dblquad integration # EEPAS magnitude integration: shared functions 'fast_magnitude_integral', # Fast mode: midpoint rectangular rule 'magnitude_integral_accurate', # Accurate mode: quad_vec ]