Source code for utils.data_loader

"""
DataLoader - EEPAS System Data Loading Utility Class
Provides unified configuration file loading, model parameter management, and seismic catalog reading functionality
"""

import json
import os
import scipy.io as sio
import numpy as np


[docs] class DataLoader: """Unified data loading utility class"""
[docs] @staticmethod def load_config(config_file='config.json'): """ Load system configuration from JSON config file. Automatically fills in missing default values to ensure backward compatibility. Args: config_file: Configuration file path Returns: dict: Configuration dictionary """ if not os.path.isfile(config_file): raise FileNotFoundError(f'Configuration file {config_file} not found') with open(config_file, 'r', encoding='utf-8') as f: cfg = json.load(f) # Fill in default values to ensure backward compatibility defaults = { 'dataDir': 'data', 'resultsDir': 'results', 'catalogStartYear': 1991, 'learnStartYear': 2002, 'learnEndYear': 2016, 'forecastStartYear': 2016, 'forecastEndYear': 2025 } for key, value in defaults.items(): cfg.setdefault(key, value) # Process inputFiles if 'inputFiles' not in cfg: cfg['inputFiles'] = {} # Backward compatibility: support old names if 'horusFile' in cfg['inputFiles'] and 'catalogFile' not in cfg['inputFiles']: cfg['inputFiles']['catalogFile'] = cfg['inputFiles']['horusFile'] if 'cptiFile' in cfg['inputFiles'] and 'neighborhoodRegionFile' not in cfg['inputFiles']: cfg['inputFiles']['neighborhoodRegionFile'] = cfg['inputFiles']['cptiFile'] if 'celleFile' in cfg['inputFiles'] and 'testingRegionFile' not in cfg['inputFiles']: cfg['inputFiles']['testingRegionFile'] = cfg['inputFiles']['celleFile'] # Set default values cfg['inputFiles'].setdefault('catalogFile', 'GDMScatalog_A_twd97.mat') cfg['inputFiles'].setdefault('neighborhoodRegionFile', 'CPTI11.mat') cfg['inputFiles'].setdefault('testingRegionFile', 'CELLE_ter_TW_twd97.mat') # Process outputFiles if 'outputFiles' not in cfg: cfg['outputFiles'] = {} cfg['outputFiles'].setdefault('EEPASParamPattern', 'Fitted_par_EEPAS_%d_%d.csv') cfg['outputFiles'].setdefault('PPEParamPattern', 'Fitted_par_PPE_%d_%d.csv') cfg['outputFiles'].setdefault('AftershockParamPattern', 'Fitted_par_aftershock_%d_%d.csv') cfg['outputFiles'].setdefault('EEPASForecastPattern', 'PREVISIONI_3m_EEPAS_%d_%d.mat') cfg['outputFiles'].setdefault('PPEForecastPattern', 'PREVISIONI_3m_PPE_%d_%d.mat') # Process optimization configuration if 'optimization' not in cfg: cfg['optimization'] = { 'stage1': { 'parameters': ['am', 'at', 'Sa', 'u'], 'initialValues': [2.32, 0.11, 0.80, 0.20], 'lowerBounds': [0.5, 0.001, 0.01, 0.0], 'upperBounds': [4.0, 2.5, 2.0, 0.75], 'fixedValues': {'bm': 0.75, 'Sm': 0.23, 'bt': 0.63, 'St': 0.43, 'ba': 0.57} }, 'stage2': { 'parameters': ['Sm', 'bt', 'St', 'ba', 'u'], 'initialValues': [0.23, 0.63, 0.43, 0.57, 'u_from_stage1'], 'lowerBounds': [0.05, 0.05, 0.05, 0.05, 0.0], 'upperBounds': [1.0, 1.0, 1.0, 1.0, 0.75] }, 'stage3': { 'parameters': ['am', 'Sm', 'at', 'bt', 'St', 'ba', 'Sa', 'u'], 'lowerBounds': [0.5, 0.05, 0.001, 0.05, 0.05, 0.05, 0.01, 0.0], 'upperBounds': [4.0, 1.0, 2.5, 1.0, 1.0, 1.0, 2.0, 0.75], 'fixedValues': {'bm': 0.75} } } return cfg
[docs] @staticmethod def load_catalogs(input_arg): """ Load seismic catalog files. Args: input_arg: Configuration file path (.json) or data directory path Returns: tuple: (HORUS, CPTI11, CELLE) three data matrices """ # Determine if input is config file or data directory if input_arg.endswith('.json'): cfg = DataLoader.load_config(input_arg) data_path = cfg['dataDir'] # If relative path, resolve relative to config file directory if not os.path.isabs(data_path): config_dir = os.path.dirname(os.path.abspath(input_arg)) data_path = os.path.join(config_dir, data_path) catalog_file = os.path.join(data_path, cfg['inputFiles']['catalogFile']) neighborhood_file = os.path.join(data_path, cfg['inputFiles']['neighborhoodRegionFile']) testing_file = os.path.join(data_path, cfg['inputFiles']['testingRegionFile']) else: raise ValueError('Configuration file path required') if not os.path.isdir(data_path): raise FileNotFoundError(f'Data directory not found: {data_path}') if not os.path.isfile(catalog_file): raise FileNotFoundError(f'Catalog file not found: {catalog_file}') if not os.path.isfile(neighborhood_file): raise FileNotFoundError(f'Neighborhood region data file not found: {neighborhood_file}') if not os.path.isfile(testing_file): raise FileNotFoundError(f'Testing region data file not found: {testing_file}') # Load MATLAB .mat files catalog_data = sio.loadmat(catalog_file) neighborhood_data = sio.loadmat(neighborhood_file) testing_data = sio.loadmat(testing_file) # Auto-detect key (backward compatibility) # Seismic catalog file (Catalog) catalog_keys = [k for k in catalog_data.keys() if not k.startswith('__')] if len(catalog_keys) == 0: raise ValueError(f'No data found in catalog file: {catalog_file}') HORUS = catalog_data[catalog_keys[0]] # Neighborhood region file (could be CPTI11, CPTI15, CELLE, etc.) neighborhood_keys = [k for k in neighborhood_data.keys() if not k.startswith('__')] if len(neighborhood_keys) == 0: raise ValueError(f'No data found in neighborhood region file: {neighborhood_file}') CPTI11 = neighborhood_data[neighborhood_keys[0]] # Testing region file (could be CELLE, CELLESD, etc.) testing_keys = [k for k in testing_data.keys() if not k.startswith('__')] if len(testing_keys) == 0: raise ValueError(f'No data found in testing region file: {testing_file}') CELLE = testing_data[testing_keys[0]] return HORUS, CPTI11, CELLE
[docs] @staticmethod def load_model_params(config_file='config.json'): """ Load model parameters from configuration. Args: config_file: Configuration file path Returns: dict: Model parameters dictionary """ cfg = DataLoader.load_config(config_file) # Set default values defaults = { 'delay': 10, 'B': 0.93, 'm0': 2.35, 'mT': 5.0, 'delta': 1.2, 'p': 1.09, 'c': 0.001, 'sigmaU': 0.006, 'weightFlag': 0, 'useCausalEW': 1, 'useRollingUpdate': True, 'forecastPeriodDays': 91.31 } params = defaults.copy() # If config file contains modelParams field, read parameters if 'modelParams' in cfg: # First, load default keys for key in defaults.keys(): if key in cfg['modelParams']: params[key] = cfg['modelParams'][key] # Also load additional keys not in defaults (e.g., timeComp) for key in cfg['modelParams'].keys(): if key not in params: params[key] = cfg['modelParams'][key] return params
[docs] @staticmethod def load_spatial_regions(config_file='config.json'): """ Load spatial region definitions (supports single or dual region configurations). Single region configuration uses the same grid for both testing and neighborhood regions. Dual region configuration uses a grid for testing and a polygon for neighborhood region. Args: config_file: Configuration file path Returns: dict: Dictionary containing the following keys: - 'testing_region': Testing region data (numpy array) - 'testing_type': 'grid' or 'polygon' - 'neighborhood_region': Neighborhood region data (numpy array) - 'neighborhood_type': 'grid' or 'polygon' - 'config': Region configuration (if exists) """ cfg = DataLoader.load_config(config_file) data_path = cfg['dataDir'] # Resolve absolute path if not os.path.isabs(data_path): config_dir = os.path.dirname(os.path.abspath(config_file)) data_path = os.path.join(config_dir, data_path) # Get file paths neighborhood_file = os.path.join(data_path, cfg['inputFiles']['neighborhoodRegionFile']) testing_file = os.path.join(data_path, cfg['inputFiles']['testingRegionFile']) if not os.path.isfile(neighborhood_file): raise FileNotFoundError(f'Neighborhood region data file not found: {neighborhood_file}') if not os.path.isfile(testing_file): raise FileNotFoundError(f'Testing region data file not found: {testing_file}') # Load testing region (always grid format) testing_data = sio.loadmat(testing_file) # Auto-find non-private keys (not starting with __) testing_keys = [k for k in testing_data.keys() if not k.startswith('__')] if len(testing_keys) == 0: raise ValueError(f'No data found in testing region file: {testing_file}') testing_key = testing_keys[0] # Take first non-private key testing_region = testing_data[testing_key] # Load neighborhood region (could be grid or polygon) neighborhood_data = sio.loadmat(neighborhood_file) neighborhood_keys = [k for k in neighborhood_data.keys() if not k.startswith('__')] if len(neighborhood_keys) == 0: raise ValueError(f'No data found in neighborhood region file: {neighborhood_file}') neighborhood_key = neighborhood_keys[0] neighborhood_region_raw = neighborhood_data[neighborhood_key] # Determine region type # Default: testing region is always grid testing_type = 'grid' # neighborhood region type determination # If explicitly specified in regionConfig, use the specified value # Otherwise, auto-detect based on data shape region_config = cfg.get('regionConfig', {}) neighborhood_type = region_config.get('neighborhoodRegionType', None) if neighborhood_type is None: # Auto-detect: if columns <= 4 and rows < 100, treat as polygon # Otherwise treat as grid n_rows, n_cols = neighborhood_region_raw.shape if n_cols <= 4 and n_rows < 100: neighborhood_type = 'polygon' else: neighborhood_type = 'grid' # Handle multi-column polygon data (e.g. CPTI15 has 4 columns: lat, lon, y_proj, x_proj) # CPTI15 format: [latitude, longitude, Y_projected, X_projected] # If polygon with 4 columns, use last two columns and swap order to (X, Y) if neighborhood_type == 'polygon' and neighborhood_region_raw.shape[1] == 4: # Take last two columns: [Y_projected, X_projected] # Swap order to: [X_projected, Y_projected] neighborhood_region_raw = neighborhood_region_raw[:, [3, 2]] # X first, Y second # Special handling for single region mode: if Testing and Neighborhood regions are identical, use same data # (backward compatibility) if testing_key == neighborhood_key and testing_region.shape == neighborhood_region_raw.shape: # Possibly single region mode (same file or same data) if np.array_equal(testing_region, neighborhood_region_raw): neighborhood_region = testing_region neighborhood_type = 'grid' else: neighborhood_region = neighborhood_region_raw else: neighborhood_region = neighborhood_region_raw result = { 'testing_region': testing_region, 'testing_type': testing_type, 'neighborhood_region': neighborhood_region, 'neighborhood_type': neighborhood_type, 'config': region_config } return result
[docs] @staticmethod def load_custom_stages(config_file='config.json'): """ Load custom optimization stages configuration. Args: config_file: Configuration file path Returns: dict or None: Custom stages configuration, or None if not enabled { 'enable': bool, 'stages': [ { 'name': str, 'parameters': list[str], 'initialValues': list[float] or None, 'lowerBounds': list[float], 'upperBounds': list[float], 'fixedValues': dict }, ... ] } """ cfg = DataLoader.load_config(config_file) # Check if custom stages are enabled custom_config = cfg.get('optimization', {}).get('customStages', {}) if not custom_config.get('enable', False): return None # Validate custom stages configuration if 'stages' not in custom_config: raise ValueError('customStages.enable is true but no stages defined') stages = custom_config['stages'] if not isinstance(stages, list) or len(stages) == 0: raise ValueError('customStages.stages must be a non-empty list') # Validate each stage DataLoader.validate_custom_stages(stages) return custom_config
[docs] @staticmethod def validate_custom_stages(stages): """ Validate custom stages configuration. Checks: 1. All parameter names are valid EEPAS parameters 2. Bounds and initial values have correct lengths 3. Fixed parameters + optimized parameters = 9 parameters 4. No duplicate optimization of same parameter in same stage Args: stages: List of stage configurations Raises: ValueError: If validation fails """ # Valid EEPAS parameter names valid_params = {'am', 'bm', 'Sm', 'at', 'bt', 'St', 'ba', 'Sa', 'u'} for stage_idx, stage in enumerate(stages): stage_num = stage_idx + 1 stage_name = stage.get('name', f'Stage {stage_num}') # Check required fields if 'parameters' not in stage: raise ValueError(f'{stage_name}: missing "parameters" field') if 'lowerBounds' not in stage: raise ValueError(f'{stage_name}: missing "lowerBounds" field') if 'upperBounds' not in stage: raise ValueError(f'{stage_name}: missing "upperBounds" field') parameters = stage['parameters'] lower_bounds = stage['lowerBounds'] upper_bounds = stage['upperBounds'] initial_values = stage.get('initialValues', None) fixed_values = stage.get('fixedValues', {}) # Check parameters are valid if not isinstance(parameters, list) or len(parameters) == 0: raise ValueError(f'{stage_name}: "parameters" must be a non-empty list') # Check all parameter names are valid for param in parameters: if param not in valid_params: raise ValueError(f'{stage_name}: invalid parameter name "{param}". Must be one of {valid_params}') # Check no duplicates in parameters if len(parameters) != len(set(parameters)): raise ValueError(f'{stage_name}: duplicate parameters in optimization list') # Check bounds lengths match if len(lower_bounds) != len(parameters): raise ValueError(f'{stage_name}: lowerBounds length ({len(lower_bounds)}) does not match parameters length ({len(parameters)})') if len(upper_bounds) != len(parameters): raise ValueError(f'{stage_name}: upperBounds length ({len(upper_bounds)}) does not match parameters length ({len(parameters)})') # Check initial values length (if provided) if initial_values is not None and not isinstance(initial_values, str): if len(initial_values) != len(parameters): raise ValueError(f'{stage_name}: initialValues length ({len(initial_values)}) does not match parameters length ({len(parameters)})') # Check fixed parameters are valid for param in fixed_values.keys(): if param not in valid_params: raise ValueError(f'{stage_name}: invalid fixed parameter name "{param}". Must be one of {valid_params}') # Check no overlap between optimized and fixed parameters optimized_set = set(parameters) fixed_set = set(fixed_values.keys()) overlap = optimized_set & fixed_set if overlap: raise ValueError(f'{stage_name}: parameters appear in both optimization and fixed lists: {overlap}') # Check total parameters = 9 (optimized + fixed, other parameters will be inherited) # For first stage: must specify all 9 parameters # For later stages: can inherit from previous stages total_specified = optimized_set | fixed_set if stage_idx == 0: # First stage must specify all parameters if len(total_specified) != 9: missing = valid_params - total_specified raise ValueError(f'{stage_name}: First stage must specify all 9 parameters. Missing: {missing}') else: # Later stages can inherit, just check no duplicate # (Inheritance will be handled by optimize_custom_stages) pass print(f'✓ {stage_name} validation passed: optimizing {len(parameters)} parameters, fixing {len(fixed_values)} parameters')
[docs] @staticmethod def detect_stage1_config_type(config_file='config.json'): """ Detect whether stage1 configuration is for single-stage or three-stage optimization. Detection logic: - If stage1 optimizes all 8 parameters (am, Sm, at, bt, St, ba, Sa, u) → Single-stage - Otherwise → Three-stage (stage1 is part of three-stage optimization) Args: config_file: Configuration file path Returns: str: 'single' if stage1 is single-stage config, 'three' if it's three-stage config """ cfg = DataLoader.load_config(config_file) # Check if stage1 exists if 'optimization' not in cfg or 'stage1' not in cfg['optimization']: raise ValueError('No stage1 configuration found in optimization section') stage1 = cfg['optimization']['stage1'] # Check if stage1 has parameters field if 'parameters' not in stage1: raise ValueError('stage1 configuration missing "parameters" field') parameters = stage1['parameters'] # Single-stage: optimizes all 8 parameters (bm is always fixed) # Expected: ['am', 'Sm', 'at', 'bt', 'St', 'ba', 'Sa', 'u'] single_stage_params = {'am', 'Sm', 'at', 'bt', 'St', 'ba', 'Sa', 'u'} if set(parameters) == single_stage_params: return 'single' else: return 'three'