Source code for utils.coordinate_transform

#!/usr/bin/env python3
"""
Coordinate Transformation Utility for Earthquake Catalogs

Converts HORUS earthquake catalog and CELLE grid coordinates between different
coordinate reference systems (CRS). Supports multiple regional projections.

Supported CRS:
    - WGS84 (EPSG:4326): Global lat/lon coordinate system
    - RDN2008 (EPSG:7794): Italy zone projected system (default)
    - TWD97 (EPSG:3826): Taiwan TM2 121°E projected system
    - Custom: Any EPSG code supported by pyproj

Transformed coordinates are in kilometers and replace original lat/lon columns
in the output MATLAB .mat files.

Usage:
    # Default: WGS84 to RDN2008 (Italy)
    python coordinate_transform.py \\
        --horus-in HORUS_Italy.mat \\
        --celle-in CELLE_Italy.mat \\
        --horus-out HORUS_Italy_RDN2008.mat \\
        --celle-out CELLE_Italy_RDN2008.mat

    # Taiwan: WGS84 to TWD97
    python coordinate_transform.py \\
        --horus-in HORUS_Taiwan.mat \\
        --celle-in CELLE_Taiwan.mat \\
        --horus-out HORUS_Taiwan_TWD97.mat \\
        --celle-out CELLE_Taiwan_TWD97.mat \\
        --target-crs epsg:3826 \\
        --region Taiwan

    # Custom EPSG code
    python coordinate_transform.py \\
        --horus-in input.mat \\
        --celle-in grids.mat \\
        --horus-out output.mat \\
        --celle-out grids_out.mat \\
        --target-crs epsg:32633 \\
        --region "Custom Region"
"""

from pathlib import Path
from scipy.io import loadmat, savemat
from pyproj import Transformer
import numpy as np
import argparse
from typing import Tuple, Optional


# Predefined coordinate systems and validation bounds
CRS_PRESETS = {
    'rdn2008': {
        'epsg': 'epsg:7794',
        'name': 'RDN2008 Italy Zone',
        'bounds': {'lat': (35, 48), 'lon': (6, 20)},
        'description': 'Italy national projected coordinate system'
    },
    'twd97': {
        'epsg': 'epsg:3826',
        'name': 'TWD97 TM2 121°E',
        'bounds': {'lat': (21, 26), 'lon': (119, 123)},
        'description': 'Taiwan projected coordinate system (Transverse Mercator 2-degree)'
    },
    'wgs84': {
        'epsg': 'epsg:4326',
        'name': 'WGS84',
        'bounds': {'lat': (-90, 90), 'lon': (-180, 180)},
        'description': 'Global geographic coordinate system (lat/lon)'
    }
}


[docs] def get_crs_info(crs_key: str) -> dict: """ Get coordinate system information from preset or custom EPSG code. Args: crs_key: CRS preset key ('rdn2008', 'twd97') or EPSG code ('epsg:3826') Returns: dict: CRS information including EPSG code, name, and bounds """ crs_lower = crs_key.lower() # Check if it's a preset if crs_lower in CRS_PRESETS: return CRS_PRESETS[crs_lower] # If starts with 'epsg:', treat as custom EPSG code if crs_lower.startswith('epsg:'): return { 'epsg': crs_lower, 'name': f'Custom {crs_key.upper()}', 'bounds': {'lat': (-90, 90), 'lon': (-180, 180)}, 'description': 'Custom coordinate reference system' } raise ValueError(f'Unknown coordinate system: {crs_key}. Supported presets: {list(CRS_PRESETS.keys())} or custom EPSG code (e.g., epsg:3826)')
[docs] def validate_coordinates(lon: np.ndarray, lat: np.ndarray, bounds: dict, region_name: str = "region") -> Tuple[bool, int]: """ Validate coordinates are within expected regional bounds. Args: lon: Longitude array (degrees East) lat: Latitude array (degrees North) bounds: Dictionary with 'lat' and 'lon' tuples (min, max) region_name: Name of region for warning messages Returns: Tuple[bool, int]: (all_valid, invalid_count) """ lat_min, lat_max = bounds['lat'] lon_min, lon_max = bounds['lon'] valid_mask = (lat >= lat_min) & (lat <= lat_max) & (lon >= lon_min) & (lon <= lon_max) invalid_count = np.sum(~valid_mask) if invalid_count > 0: print(f'⚠️ Warning: {invalid_count} coordinates out of bounds for {region_name}, transformation may be inaccurate') invalid_lats = lat[~valid_mask] invalid_lons = lon[~valid_mask] print(f' Invalid range: lat [{invalid_lats.min():.2f}, {invalid_lats.max():.2f}]°N, ' f'lon [{invalid_lons.min():.2f}, {invalid_lons.max():.2f}]°E') print(f' Expected range: lat [{lat_min}, {lat_max}]°N, lon [{lon_min}, {lon_max}]°E') return False, invalid_count return True, 0
[docs] def convert_coordinates(array: np.ndarray, lon_idx: int, lat_idx: int, transformer: Transformer, bounds: dict, region_name: str = "region", meters_per_km: float = 1000.0) -> np.ndarray: """ Convert lat/lon coordinates in array to projected coordinates. Args: array: Array containing lat/lon data lon_idx: Longitude column index lat_idx: Latitude column index transformer: pyproj Transformer object bounds: Coordinate validation bounds region_name: Region name for validation messages meters_per_km: Meters to kilometers conversion factor (default: 1000.0) Returns: np.ndarray: Transformed array with coordinates in km Note: Validates coordinates are within regional bounds and prints warnings for out-of-range coordinates. """ # Extract lon/lat lon = array[:, lon_idx] lat = array[:, lat_idx] # Validate coordinate range validate_coordinates(lon, lat, bounds, region_name) # Transform coordinates (lon, lat) -> (easting x, northing y) x_m, y_m = transformer.transform(lon, lat) x_km = x_m / meters_per_km y_km = y_m / meters_per_km # Create modified array copy result = array.copy() result[:, lon_idx] = x_km # Replace lon column with x (easting, km) result[:, lat_idx] = y_km # Replace lat column with y (northing, km) return result
[docs] def convert_celle_coordinates(celle: np.ndarray, transformer: Transformer, bounds: dict, region_name: str = "region", meters_per_km: float = 1000.0) -> np.ndarray: """ Convert CELLE grid boundary coordinates. Args: celle: CELLE array (each row: lon_min, lon_max, lat_min, lat_max) transformer: pyproj Transformer object bounds: Coordinate validation bounds region_name: Region name for validation messages meters_per_km: Meters to kilometers conversion factor (default: 1000.0) Returns: np.ndarray: Transformed CELLE array (x_min, x_max, y_min, y_max in km) Note: CELLE format columns: - Column 0: Minimum longitude (°E) - Column 1: Maximum longitude (°E) - Column 2: Minimum latitude (°N) - Column 3: Maximum latitude (°N) """ result = celle.copy() lat_min_bound, lat_max_bound = bounds['lat'] lon_min_bound, lon_max_bound = bounds['lon'] # Transform four corners of each grid cell for i in range(celle.shape[0]): # Extract grid boundaries lon_min = celle[i, 0] lon_max = celle[i, 1] lat_min = celle[i, 2] lat_max = celle[i, 3] # Check coordinate range if not (lon_min_bound <= lon_min <= lon_max_bound and lon_min_bound <= lon_max <= lon_max_bound and lat_min_bound <= lat_min <= lat_max_bound and lat_min_bound <= lat_max <= lat_max_bound): print(f'⚠️ Warning: Grid {i+1} coordinates out of bounds for {region_name}, transformation may be inaccurate') print(f' Grid range: lon [{lon_min:.2f}, {lon_max:.2f}]°E, ' f'lat [{lat_min:.2f}, {lat_max:.2f}]°N') # Transform min lon, min lat corner x1_m, y1_m = transformer.transform(lon_min, lat_min) # Transform max lon, max lat corner x2_m, y2_m = transformer.transform(lon_max, lat_max) # Convert to km and store back to array result[i, 0] = x1_m / meters_per_km # Min lon -> Min easting (x) result[i, 1] = x2_m / meters_per_km # Max lon -> Max easting (x) result[i, 2] = y1_m / meters_per_km # Min lat -> Min northing (y) result[i, 3] = y2_m / meters_per_km # Max lat -> Max northing (y) return result
[docs] def main(horus_infile: Optional[Path], celle_infile: Optional[Path], horus_outfile: Optional[Path], celle_outfile: Optional[Path], target_crs: str = 'rdn2008', region_name: Optional[str] = None) -> None: """ Main function to convert HORUS and CELLE coordinates between CRS. Args: horus_infile: Input HORUS catalog .mat file path (optional) celle_infile: Input CELLE grid .mat file path (optional) horus_outfile: Output HORUS .mat file path (optional) celle_outfile: Output CELLE .mat file path (optional) target_crs: Target coordinate system ('rdn2008', 'twd97', or 'epsg:XXXX') region_name: Custom region name for messages (optional) Returns: None: Writes transformed data to output files and prints status Note: HORUS format columns: - Column 7 (index 6): Latitude (°N) - Column 8 (index 7): Longitude (°E) After transformation, these columns contain: - Column 7 (index 6): Northing Y (km) - Column 8 (index 7): Easting X (km) """ # Get CRS information crs_info = get_crs_info(target_crs) if region_name is None: region_name = crs_info['name'] print('='*60) print('Coordinate Transformation Tool') print('='*60) print(f'Target Coordinate System: {crs_info["name"]} ({crs_info["epsg"]})') print(f'Description: {crs_info["description"]}') print(f'Region: {region_name}') print('='*60) # Data processing constants HORUS_VAR = 'HORUS' # Variable name in HORUS.mat CELLE_VAR = 'CELLE' # Variable name in CELLE.mat # HORUS catalog column indices LAT_COL_IDX = 6 # Latitude column index (column 7) LON_COL_IDX = 7 # Longitude column index (column 8) SRC_CRS = 'epsg:4326' # WGS 84 coordinate system (source) DST_CRS = crs_info['epsg'] # Target coordinate system METERS_PER_KM = 1000.0 # Meters per kilometer # Create coordinate transformer try: transformer = Transformer.from_crs(SRC_CRS, DST_CRS, always_xy=True) print(f'\n✓ Coordinate transformer created: {SRC_CRS} -> {DST_CRS}') except Exception as e: print(f'❌ Error creating coordinate transformer: {e}') return # Process HORUS earthquake catalog if horus_infile and horus_outfile: if horus_infile.exists(): try: print(f'\nProcessing HORUS file: {horus_infile}') # Read MATLAB file data = loadmat(horus_infile) if HORUS_VAR not in data: raise KeyError(f'Variable `{HORUS_VAR}` not found in {horus_infile}') horus = data[HORUS_VAR] print(f' Original data shape: {horus.shape}') print(f' Coordinate range:') print(f' Latitude: [{horus[:, LAT_COL_IDX].min():.2f}, {horus[:, LAT_COL_IDX].max():.2f}]°N') print(f' Longitude: [{horus[:, LON_COL_IDX].min():.2f}, {horus[:, LON_COL_IDX].max():.2f}]°E') # Transform coordinates horus_transformed = convert_coordinates( horus, LON_COL_IDX, LAT_COL_IDX, transformer, crs_info['bounds'], region_name, METERS_PER_KM) print(f' Transformed coordinate range (km):') print(f' Easting (X): [{horus_transformed[:, LON_COL_IDX].min():.2f}, {horus_transformed[:, LON_COL_IDX].max():.2f}] km') print(f' Northing (Y): [{horus_transformed[:, LAT_COL_IDX].min():.2f}, {horus_transformed[:, LAT_COL_IDX].max():.2f}] km') # Save transformed data savemat(horus_outfile, {HORUS_VAR: horus_transformed}) print(f'✓ Data written to {horus_outfile}') print(f' Variable name `{HORUS_VAR}` preserved, original lat/lon replaced with projected coordinates (km)') except Exception as e: print(f'❌ Error processing HORUS file: {e}') else: print(f'⚠️ HORUS file not found: {horus_infile}, skipping') else: if not horus_infile: print('\nNo HORUS input file specified, skipping HORUS transformation') # Process CELLE grid definition if celle_infile and celle_outfile: if celle_infile.exists(): try: print(f'\nProcessing CELLE file: {celle_infile}') # Read MATLAB file data = loadmat(celle_infile) if CELLE_VAR not in data: raise KeyError(f'Variable `{CELLE_VAR}` not found in {celle_infile}') celle = data[CELLE_VAR] print(f' Original data shape: {celle.shape}') print(f' Number of grids: {celle.shape[0]}') print(f' Coordinate range:') print(f' Latitude: [{celle[:, 2].min():.2f}, {celle[:, 3].max():.2f}]°N') print(f' Longitude: [{celle[:, 0].min():.2f}, {celle[:, 1].max():.2f}]°E') # Transform coordinates celle_transformed = convert_celle_coordinates( celle, transformer, crs_info['bounds'], region_name, METERS_PER_KM) print(f' Transformed coordinate range (km):') print(f' Easting (X): [{celle_transformed[:, 0].min():.2f}, {celle_transformed[:, 1].max():.2f}] km') print(f' Northing (Y): [{celle_transformed[:, 2].min():.2f}, {celle_transformed[:, 3].max():.2f}] km') # Save transformed data savemat(celle_outfile, {CELLE_VAR: celle_transformed}) print(f'✓ Data written to {celle_outfile}') print(f' Variable name `{CELLE_VAR}` preserved, original lat/lon replaced with projected coordinates (km)') except Exception as e: print(f'❌ Error processing CELLE file: {e}') else: print(f'⚠️ CELLE file not found: {celle_infile}, skipping') else: if not celle_infile: print('\nNo CELLE input file specified, skipping CELLE transformation') print('\n' + '='*60) print('✓ Coordinate transformation completed!') print('='*60)
if __name__ == '__main__': parser = argparse.ArgumentParser( description='Transform coordinate systems for HORUS and CELLE files (supports multiple projected coordinate systems)', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Usage examples: 1. Italy (WGS84 -> RDN2008, default): python coordinate_transform.py \\ --horus-in HORUS_Italy.mat \\ --celle-in CELLE_Italy.mat \\ --horus-out HORUS_Italy_RDN2008.mat \\ --celle-out CELLE_Italy_RDN2008.mat 2. Taiwan (WGS84 -> TWD97): python coordinate_transform.py \\ --horus-in HORUS_Taiwan.mat \\ --celle-in CELLE_Taiwan.mat \\ --horus-out HORUS_Taiwan_TWD97.mat \\ --celle-out CELLE_Taiwan_TWD97.mat \\ --target-crs twd97 \\ --region Taiwan 3. Custom EPSG code: python coordinate_transform.py \\ --horus-in input.mat \\ --horus-out output.mat \\ --target-crs epsg:32633 \\ --region "Custom Region" Supported default coordinate systems: - rdn2008: RDN2008 Italy Zone (EPSG:7794) [default] - twd97: TWD97 TM2 121°E (EPSG:3826) - or any EPSG code (e.g., epsg:32633) Notes: - Input file coordinates must be WGS84 lat/lon (°N, °E) - Output file coordinates will be in projected coordinates (km) - HORUS file column 7 (index 6) is latitude, column 8 (index 7) is longitude - CELLE file format: [lon_min, lon_max, lat_min, lat_max] - Can transform HORUS or CELLE separately (omit the other file parameter) """ ) parser.add_argument('--horus-in', type=Path, help='Input HORUS earthquake catalog file path (.mat format)') parser.add_argument('--celle-in', type=Path, help='Input CELLE grid definition file path (.mat format)') parser.add_argument('--horus-out', type=Path, help='Output HORUS MAT file path') parser.add_argument('--celle-out', type=Path, help='Output CELLE MAT file path') parser.add_argument('--target-crs', type=str, default='rdn2008', help='Target coordinate system (rdn2008, twd97, or epsg:XXXX) [default: rdn2008]') parser.add_argument('--region', type=str, help='Region name (for message display)') args = parser.parse_args() # Validate input/output pairs if args.horus_in and not args.horus_out: parser.error('--horus-in must be paired with --horus-out') if args.horus_out and not args.horus_in: parser.error('--horus-out must be paired with --horus-in') if args.celle_in and not args.celle_out: parser.error('--celle-in must be paired with --celle-out') if args.celle_out and not args.celle_in: parser.error('--celle-out must be paired with --celle-in') if not args.horus_in and not args.celle_in: parser.error('At least --horus-in or --celle-in must be specified') main(args.horus_in, args.celle_in, args.horus_out, args.celle_out, args.target_crs, args.region)