"""
CatalogProcessor - Seismic Catalog Preprocessing Utility Class
Provides unified earthquake catalog quality control, time conversion, and filtering functions.
Format Support:
This module uses HORUS format (10 columns) as the internal standard:
year, month, day, hour, minute, second, lat, lon, depth, mag
Extended format conversion support (via catalog_processor_extensions):
- Input formats: ZMAP, CSEP, QuakeML, DataFrame, SeismoStats, pyCSEP, INGV HORUS
- Output formats: MATLAB .mat, SeismoStats Catalog, pyCSEP CSEPCatalog
- Methods: from_zmap(), from_csep(), from_quakeml(), from_dataframe(),
from_seismostats(), from_pycsep(), from_ingv_horus(),
to_mat(), to_seismostats(), to_pycsep()
All conversion methods are available as static methods on CatalogProcessor class.
"""
import numpy as np
from datetime import datetime
# Import all conversion functions from extensions module
try:
from . import catalog_processor_extensions as ext
_EXTENSIONS_AVAILABLE = True
except ImportError:
_EXTENSIONS_AVAILABLE = False
[docs]
def decyear(date_array):
"""
Convert date to decimal year.
Args:
date_array: Nx6 matrix [year, month, day, hour, minute, second]
Returns:
np.array: Decimal year
"""
result = np.zeros(date_array.shape[0])
for i in range(date_array.shape[0]):
year = int(date_array[i, 0])
month = int(date_array[i, 1])
day = int(date_array[i, 2])
hour = int(date_array[i, 3])
minute = int(date_array[i, 4])
second = float(date_array[i, 5])
# Handle anomalous second values (some datasets may have second >= 60)
# Convert seconds over 60 to minutes
if second >= 60.0:
extra_minutes = int(second / 60.0)
minute += extra_minutes
second = second - extra_minutes * 60.0
# Handle anomalous minute values
if minute >= 60:
extra_hours = int(minute / 60)
hour += extra_hours
minute = minute - extra_hours * 60
# Handle anomalous hour values
if hour >= 24:
extra_days = int(hour / 24)
day += extra_days
hour = hour - extra_days * 24
# Calculate day of year (using total seconds method to handle month overflow)
try:
dt = datetime(year, month, day, hour, minute, int(second))
except ValueError:
# If date is still invalid (e.g. month overflow), use approximation
# This is fault-tolerant handling to ensure program continues
dt = datetime(year, 1, 1, 0, 0, 0)
# Add total seconds
total_seconds = ((month - 1) * 30 * 86400 + (day - 1) * 86400 +
hour * 3600 + minute * 60 + second)
from datetime import timedelta
dt = dt + timedelta(seconds=total_seconds)
start_of_year = datetime(year, 1, 1)
days_in_year = (datetime(year + 1, 1, 1) - start_of_year).days
day_of_year = (dt - start_of_year).total_seconds() / 86400.0
result[i] = year + day_of_year / days_in_year
return result
[docs]
class CatalogProcessor:
"""Seismic catalog preprocessing utility class"""
[docs]
@staticmethod
def preprocess_catalog(HORUS, catalog_start_year, learn_start_year, learn_end_year,
completeness_threshold=2.0, max_depth=40, target_magnitude=None,
filter_target_mag=False, verbose=False):
"""
General preprocessing pipeline for earthquake catalogs.
Standardizes magnitudes, filters quality, converts time coordinates.
Args:
HORUS: Raw seismic catalog matrix
catalog_start_year: Catalog start year
learn_start_year: Learning start year
learn_end_year: Learning end year
completeness_threshold: Completeness magnitude threshold
max_depth: Maximum depth (km)
target_magnitude: PPE target magnitude
filter_target_mag: Whether to enable target magnitude filtering
verbose: Verbose output mode
Returns:
tuple: (HORUS_processed, T1, T2)
"""
HORUS = HORUS.copy() # Avoid modifying original data
if verbose:
print(f'Starting catalog preprocessing (initial size: {HORUS.shape[0]})...')
# 1. Round earthquake magnitudes to first decimal place
# MATLAB's round(x,1) rounds ties away from zero; numpy.round uses bankers rounding.
mags = HORUS[:, 9]
HORUS[:, 9] = np.sign(mags) * np.floor(np.abs(mags) * 10 + 0.5) / 10.0
if verbose:
print(f'After magnitude rounding: {HORUS.shape[0]} earthquakes')
# 2. Filter depth events (remove earthquakes with depth > max_depth)
# Note: MATLAB indexing starts from 1, Python from 0, so column indices subtract 1
depth_filter = HORUS[:, 8] <= max_depth # Column 9 is depth (index 8)
HORUS = HORUS[depth_filter, :]
if verbose:
print(f'After depth filtering (depth<={max_depth}km): {HORUS.shape[0]} earthquakes')
# 3. Filter events below completeness threshold
mag_filter = HORUS[:, 9] >= completeness_threshold # Column 10 is magnitude (index 9)
HORUS = HORUS[mag_filter, :]
if verbose:
print(f'After completeness filtering (magnitude>={completeness_threshold}): {HORUS.shape[0]} earthquakes')
# 4. Convert time to decimal year and relative days
date_array = HORUS[:, 0:6] # First 6 columns: year/month/day/hour/minute/second
DECY = decyear(date_array)
# If HORUS only has 10 columns, add column 11
if HORUS.shape[1] == 10:
HORUS = np.column_stack([HORUS, np.zeros(HORUS.shape[0])])
HORUS[:, 10] = DECY # Column 11 stores decimal year
HORUS[:, 10] = (HORUS[:, 10] - catalog_start_year) * 365.2425 # Convert to relative days
if verbose:
print(f'Time conversion completed (relative to year {catalog_start_year})')
# 5. Filter events outside time range
time_filter = HORUS[:, 0] < learn_end_year # Column 1 is year (index 0)
HORUS = HORUS[time_filter, :]
if verbose:
print(f'After time range filtering ({catalog_start_year}~{learn_end_year}): {HORUS.shape[0]} earthquakes')
# Calculate time conversion constants
T1 = (learn_start_year - catalog_start_year) * 365.2425
T2 = (learn_end_year - catalog_start_year) * 365.2425
# 6. Optional: filter earthquakes below target magnitude
if filter_target_mag and target_magnitude is not None:
target_mag_filter = HORUS[:, 9] >= target_magnitude
HORUS = HORUS[target_mag_filter, :]
if verbose:
print(f'After target magnitude filtering (magnitude>={target_magnitude}): {HORUS.shape[0]} earthquakes')
if verbose:
print(f'✅ Preprocessing completed, final size: {HORUS.shape[0]} earthquakes')
return HORUS, T1, T2
[docs]
@staticmethod
def create_catalogs(HORUS, params, learn_start_year, learn_end_year, catalog_start_year):
"""
Create sub-catalogs from preprocessed earthquake catalog (without spatial filtering).
This function creates three sub-catalogs for EEPAS learning. When no RegionManager
is provided, all events are treated as being within both the Neighborhood Region
and Testing Region (no spatial filtering applied).
Naming conventions (from main_gji.tex paper):
- Index i: Historical source events (CatI), from Neighborhood Region N
- Index j: Target events (CatJ), from Testing Region R
- NLL = -Σ_j log λ(t_j, m_j, x_j, y_j | CatI) + ∫∫∫∫_R λ(...) dt dm dx dy
Args:
HORUS: Preprocessed earthquake catalog (numpy array, N×11)
params: Model parameters dictionary, must contain 'mT' (target magnitude threshold)
learn_start_year: Learning start year
learn_end_year: Learning end year
catalog_start_year: Catalog start year (for time conversion)
Returns:
tuple: (CatE, CatI, CatJ)
- CatE: EEPAS precursor catalog (M≥m₀, all time)
- CatI: Historical source events (M≥mT, all time, Neighborhood Region) - paper's index i
- CatJ: Target events (M≥mT, learning period, Testing Region) - paper's index j
Note:
Return order is (CatE, CatI, CatJ), corresponding to (precursor, i, j) in paper.
This differs from versions before v1.1.0 (old order was CatE, CatJ, CatI).
"""
print('Creating sub-catalogs for different purposes...')
# 1. EEPAS catalog - contains all preprocessed earthquakes
CatE = HORUS.copy()
# 2. Historical source catalog (paper's i) - events with magnitude >= mT in neighborhood region
CatI = HORUS.copy()
target_mag_filter = CatI[:, 9] >= params['mT'] # Column 10 is magnitude (index 9)
CatI = CatI[target_mag_filter, :]
# 3. Target catalog (paper's j) - earthquakes during learning period with magnitude >= mT in testing region
CatJ = HORUS.copy()
# Filter events before learning start time
learning_start_filter = CatJ[:, 0] >= learn_start_year # Column 1 is year (index 0)
CatJ = CatJ[learning_start_filter, :]
# Filter events below target magnitude
target_mag_filter_J = CatJ[:, 9] >= params['mT']
CatJ = CatJ[target_mag_filter_J, :]
# Output statistics
print('Catalog creation completed:')
print(f' EEPAS catalog (CatE): {CatE.shape[0]} events')
print(f' Historical source catalog (CatI): {CatI.shape[0]} events (magnitude >= {params["mT"]}) - paper\'s i')
print(f' Target catalog (CatJ): {CatJ.shape[0]} events (learning period, magnitude >= {params["mT"]}) - paper\'s j')
return CatE, CatI, CatJ
[docs]
@staticmethod
def filter_by_region(catalog, region_manager, region_type='testing', x_col=7, y_col=6):
"""
Filter earthquake catalog by spatial region.
Purpose:
- Integrates with RegionManager for spatial filtering
- Automatically handles both grid and polygon region formats
Args:
catalog: numpy array (N×M) earthquake catalog
HORUS format: [..., y(col6), x(col7), depth(col8), mag(col9), ...]
region_manager: RegionManager instance
region_type: 'testing' or 'neighborhood'
- 'testing': filter events within testing region (for target events)
- 'neighborhood': filter events within neighborhood region (for source events)
x_col: X coordinate column (default 7)
y_col: Y coordinate column (default 6)
Returns:
numpy.ndarray: Filtered earthquake catalog
"""
if region_type == 'testing':
return region_manager.filter_events_for_targets(catalog, x_col, y_col)
elif region_type == 'neighborhood':
return region_manager.filter_events_for_learning(catalog, x_col, y_col)
else:
raise ValueError(
f"Unsupported region_type: {region_type}. "
f"Must be 'testing' or 'neighborhood'"
)
[docs]
@staticmethod
def create_catalogs_with_regions(HORUS, params, learn_start_year, learn_end_year,
catalog_start_year, region_manager=None):
"""
Create sub-catalogs from preprocessed earthquake catalog (with optional spatial filtering).
This function supports both spatial filtering modes depending on the region_manager parameter.
When region_manager is None, all events are used without spatial filtering.
When region_manager is provided, events are filtered by Neighborhood and Testing Regions.
Naming conventions (from main_gji.tex paper):
- Index i: Historical source events (CatI), from Neighborhood Region N
- Index j: Target events (CatJ), from Testing Region R
- NLL = -Σ_j log λ(t_j, m_j, x_j, y_j | CatI) + ∫∫∫∫_R λ(...) dt dm dx dy
Difference from create_catalogs():
- Supports RegionManager spatial filtering
- Without region_manager (None): Behaves identically to create_catalogs()
- With region_manager provided:
* CatE: All events within Neighborhood Region N (for edge compensation)
* CatI: M≥mT events within Neighborhood Region N (historical sources, paper's index i)
* CatJ: M≥mT events within Testing Region R during learning period (targets, paper's index j)
Args:
HORUS: Preprocessed earthquake catalog (numpy array, N×11)
params: Model parameters dictionary, must contain 'mT' (target magnitude threshold)
learn_start_year: Learning start year
learn_end_year: Learning end year
catalog_start_year: Catalog start year (for time conversion)
region_manager: RegionManager instance (optional)
- None: No spatial filtering applied
- RegionManager: Apply spatial filtering using Neighborhood and Testing Regions
Returns:
tuple: (CatE, CatI, CatJ) containing:
- CatE: EEPAS precursor catalog (M≥m₀)
- CatI: Historical source events (M≥mT, Neighborhood Region) - paper's index i
- CatJ: Target events (M≥mT, learning period, Testing Region) - paper's index j
Note:
Return order is (CatE, CatI, CatJ), corresponding to (precursor, i, j) in paper.
This differs from versions before v1.1.0 (old order was CatE, CatJ, CatI).
"""
print('Creating sub-catalogs for different purposes...')
if region_manager is None:
# No spatial filtering: backward compatible, use original logic
return CatalogProcessor.create_catalogs(
HORUS, params, learn_start_year, learn_end_year, catalog_start_year
)
# With spatial filtering: use RegionManager to filter events by spatial regions
print(' Using spatial region filtering')
# 1. EEPAS catalog - all events within neighborhood region (for edge compensation)
CatE = CatalogProcessor.filter_by_region(
HORUS, region_manager, region_type='neighborhood'
)
# 2. Historical source catalog (paper's i) - events with magnitude >= mT within neighborhood region
CatI = CatE.copy()
target_mag_filter = CatI[:, 9] >= params['mT']
CatI = CatI[target_mag_filter, :]
# 3. Target catalog (paper's j) - events with magnitude >= mT during learning period within testing region
# First filter by time range
CatJ_time_filtered = HORUS.copy()
learning_start_filter = CatJ_time_filtered[:, 0] >= learn_start_year
CatJ_time_filtered = CatJ_time_filtered[learning_start_filter, :]
# Then filter by magnitude
target_mag_filter_J = CatJ_time_filtered[:, 9] >= params['mT']
CatJ_time_filtered = CatJ_time_filtered[target_mag_filter_J, :]
# Finally filter by spatial region (testing region)
CatJ = CatalogProcessor.filter_by_region(
CatJ_time_filtered, region_manager, region_type='testing'
)
# Output statistics
print('Catalog creation completed:')
print(f' EEPAS catalog (CatE): {CatE.shape[0]} events '
f'(neighborhood region, all magnitudes)')
print(f' Historical source catalog (CatI): {CatI.shape[0]} events '
f'(neighborhood region, magnitude >= {params["mT"]}) - paper\'s i')
print(f' Target catalog (CatJ): {CatJ.shape[0]} events '
f'(testing region, learning period, magnitude >= {params["mT"]}) - paper\'s j')
return CatE, CatI, CatJ
[docs]
@staticmethod
def filter_by_magnitude(catalog, min_magnitude, mag_col=9):
"""
Filter earthquake catalog by magnitude.
Args:
catalog: numpy array earthquake catalog
min_magnitude: Minimum magnitude threshold
mag_col: Magnitude column (default 9)
Returns:
numpy array: Filtered catalog
"""
mag_filter = catalog[:, mag_col] >= min_magnitude
return catalog[mag_filter]
[docs]
@staticmethod
def filter_by_time_range(catalog, start_year, end_year, year_col=0):
"""
Filter earthquake catalog by time range.
Args:
catalog: numpy array earthquake catalog
start_year: Start year
end_year: End year
year_col: Year column (default 0)
Returns:
numpy array: Filtered catalog
"""
time_filter = (catalog[:, year_col] >= start_year) & (catalog[:, year_col] < end_year)
return catalog[time_filter]
# ========================================================================
# Format Conversion Methods (delegated to catalog_processor_extensions)
# ========================================================================
[docs]
@staticmethod
def from_zmap(file_path, delimiter=None, skiprows=0):
"""Read ZMAP format and convert to HORUS format. See catalog_processor_extensions.from_zmap()"""
if not _EXTENSIONS_AVAILABLE:
raise ImportError("catalog_processor_extensions module not available")
return ext.from_zmap(file_path, delimiter, skiprows)
[docs]
@staticmethod
def from_csep(file_path):
"""Read CSEP ASCII format and convert to HORUS format. See catalog_processor_extensions.from_csep()"""
if not _EXTENSIONS_AVAILABLE:
raise ImportError("catalog_processor_extensions module not available")
return ext.from_csep(file_path)
[docs]
@staticmethod
def from_quakeml(file_path):
"""Read QuakeML format and convert to HORUS format. See catalog_processor_extensions.from_quakeml()"""
if not _EXTENSIONS_AVAILABLE:
raise ImportError("catalog_processor_extensions module not available")
return ext.from_quakeml(file_path)
[docs]
@staticmethod
def from_dataframe(df, column_mapping=None):
"""Convert Pandas DataFrame to HORUS format. See catalog_processor_extensions.from_dataframe()"""
if not _EXTENSIONS_AVAILABLE:
raise ImportError("catalog_processor_extensions module not available")
return ext.from_dataframe(df, column_mapping)
[docs]
@staticmethod
def from_horus_text(file_path, delimiter=None, skiprows=0):
"""Read HORUS text format. See catalog_processor_extensions.from_horus_text()"""
if not _EXTENSIONS_AVAILABLE:
raise ImportError("catalog_processor_extensions module not available")
return ext.from_horus_text(file_path, delimiter, skiprows)
[docs]
@staticmethod
def from_ingv_horus(file_path, delimiter='\t', skiprows=1, filter_events=True):
"""Read INGV HORUS catalog format. See catalog_processor_extensions.from_ingv_horus()"""
if not _EXTENSIONS_AVAILABLE:
raise ImportError("catalog_processor_extensions module not available")
return ext.from_ingv_horus(file_path, delimiter, skiprows, filter_events)
[docs]
@staticmethod
def from_seismostats(catalog):
"""Convert SeismoStats Catalog object to HORUS format. See catalog_processor_extensions.from_seismostats()"""
if not _EXTENSIONS_AVAILABLE:
raise ImportError("catalog_processor_extensions module not available")
return ext.from_seismostats(catalog)
[docs]
@staticmethod
def from_pycsep(catalog):
"""Convert pyCSEP CSEPCatalog object to HORUS format. See catalog_processor_extensions.from_pycsep()"""
if not _EXTENSIONS_AVAILABLE:
raise ImportError("catalog_processor_extensions module not available")
return ext.from_pycsep(catalog)
[docs]
@staticmethod
def from_mat(file_path, variable_name='HORUS'):
"""Read HORUS catalog from MATLAB .mat file. See catalog_processor_extensions.from_mat()"""
if not _EXTENSIONS_AVAILABLE:
raise ImportError("catalog_processor_extensions module not available")
return ext.from_mat(file_path, variable_name)
[docs]
@staticmethod
def to_seismostats(horus_catalog, mc=None, delta_m=0.1, b_value=None, a_value=None):
"""Convert HORUS format to SeismoStats Catalog object. See catalog_processor_extensions.to_seismostats()"""
if not _EXTENSIONS_AVAILABLE:
raise ImportError("catalog_processor_extensions module not available")
return ext.to_seismostats(horus_catalog, mc, delta_m, b_value, a_value)
[docs]
@staticmethod
def to_pycsep(horus_catalog, name='EEPAS_catalog', region=None):
"""Convert HORUS format to pyCSEP CSEPCatalog object. See catalog_processor_extensions.to_pycsep()"""
if not _EXTENSIONS_AVAILABLE:
raise ImportError("catalog_processor_extensions module not available")
return ext.to_pycsep(horus_catalog, name, region)
[docs]
@staticmethod
def to_mat(horus_catalog, output_file, variable_name='HORUS', matlab_compatible=True):
"""Write HORUS format catalog to MATLAB .mat file. See catalog_processor_extensions.to_mat()"""
if not _EXTENSIONS_AVAILABLE:
raise ImportError("catalog_processor_extensions module not available")
return ext.to_mat(horus_catalog, output_file, variable_name, matlab_compatible)