import scipy.io
import pandas as pd
import numpy as np
import csep
from csep.utils import datasets, time_utils
from datetime import datetime, timezone
from tqdm import tqdm
import os
# ----------------------------------------------------------------------
# Function definitions (From your dataset (2).py, with modifications)
# ----------------------------------------------------------------------
# --- This is the new, corrected create_subgrids function ---
[docs]
def create_subgrids_spatial(data, grid_resolution=0.1):
"""
Divide large grid (possibly irregular lon/lat boundaries) into 0.1x0.1 sub-grids.
Using 'Centroid-in-Bounding-Box' method for spatial downscaling.
Args:
data: Coarse grid forecast data (pd.DataFrame)
grid_resolution: Sub-grid resolution in degrees (default: 0.1) (float, optional)
Returns:
pd.DataFrame
Downscaled forecast data with sub-grids
"""
print(f"Downscaling large grids to {grid_resolution}°×{grid_resolution}° sub-grids (Using spatial method)...")
new_rows = []
# Tqdm for displaying processing progress
for _, row in tqdm(data.iterrows(), total=len(data), desc="Processing sub-grids"):
# Get lon/lat bounding box of large grid
coarse_lon_0 = row['LON_0']
coarse_lon_1 = row['LON_1']
coarse_lat_0 = row['LAT_0']
coarse_lat_1 = row['LAT_1']
coarse_rate = row['RATE']
mag_0 = row['MAG_0']
mag_1 = row['MAG_1']
period = row.get('PERIOD', 1)
# Find grid range that completely covers this bounding box with 0.1° grids
# Round down to 0.1 multiples
sub_lon_start_grid = np.floor(coarse_lon_0 * 10) / 10
# Round up to 0.1 multiples
sub_lon_end_grid = np.ceil(coarse_lon_1 * 10) / 10
sub_lat_start_grid = np.floor(coarse_lat_0 * 10) / 10
sub_lat_end_grid = np.ceil(coarse_lat_1 * 10) / 10
# Generate 0.1° candidate starting points for sub-grids
sub_lons = np.arange(sub_lon_start_grid, sub_lon_end_grid, grid_resolution)
sub_lats = np.arange(sub_lat_start_grid, sub_lat_end_grid, grid_resolution)
# Find all sub-grids whose center points fall within the large grid bounding box interior
sub_cells_in_this_coarse_cell = []
for lon_0 in sub_lons:
for lat_0 in sub_lats:
# Calculate center point of 0.1x0.1 sub-grid
lon_center = lon_0 + (grid_resolution / 2)
lat_center = lat_0 + (grid_resolution / 2)
# Check if center point is within bounding box interior
# Note: Lon/lat ranges are typically left-closed, right-open [lon_min, lon_max)
if (coarse_lon_0 <= lon_center < coarse_lon_1) and \
(coarse_lat_0 <= lat_center < coarse_lat_1):
sub_cells_in_this_coarse_cell.append((lon_0, lat_0))
num_subcells = len(sub_cells_in_this_coarse_cell)
# Divide large grid's RATE evenly among all found sub-grids
subcell_rate = coarse_rate / num_subcells if num_subcells > 0 else 0
if num_subcells == 0:
# If large grid is too small, may have no 0.1° grid center points falling within
# print(f"Warning: Grid ({coarse_lon_0:.2f}, {coarse_lat_0:.2f}) did not find corresponding 0.1° sub-grid center points.")
continue
# Add new row for each valid sub-grid
for lon_0, lat_0 in sub_cells_in_this_coarse_cell:
new_rows.append({
'LON_0': round(lon_0, 2), # Round to avoid floating point issues
'LON_1': round(lon_0 + grid_resolution, 2),
'LAT_0': round(lat_0, 2),
'LAT_1': round(lat_0 + grid_resolution, 2),
'Z_0': 0.0,
'Z_1': 30.0,
'MAG_0': mag_0,
'MAG_1': mag_1,
'RATE': subcell_rate,
'FLAG': 1,
'PERIOD': period
})
return pd.DataFrame(new_rows)
def generate_all_periods_forecast(forecast_data, num_magnitude_steps, magnitude_bins, num_regions, cell_bounds, start_period=1, end_period=None):
"""
Generate forecast data for all periods, summing rates across different periods for same grid and magnitude.
(This function now calls the new create_subgrids_spatial)
Args:
forecast_data: Complete forecast data matrix (ndarray)
num_magnitude_steps: Number of magnitude bins (int)
magnitude_bins: List of magnitude bin tuples (list)
num_regions : int
Number of spatial regions
cell_bounds : DataFrame
Grid boundary definitions
start_period : int, optional
Starting period number (default: 1)
end_period : int, optional
Ending period number (default: None, uses maximum available)
Returns:
pd.DataFrame
Combined and summed forecast data across all periods
"""
if end_period is None:
max_periods = forecast_data.shape[0] // num_magnitude_steps
end_period = max_periods
print(f"Generating forecast data for periods {start_period} to {end_period}...")
all_periods_data = []
for period in range(start_period, end_period + 1):
print(f"Processing period {period} of {end_period}...")
try:
period_forecast = extract_period_forecast(period, forecast_data, num_magnitude_steps, magnitude_bins, num_regions, cell_bounds)
# Call the new spatial downscaling function
period_subgrid = create_subgrids_spatial(period_forecast)
all_periods_data.append(period_subgrid)
except Exception as e:
print(f"Error occurred while processing period {period}: {str(e)}")
if not all_periods_data:
return pd.DataFrame()
combined_data = pd.concat(all_periods_data, ignore_index=True)
print("Summing forecast rates across periods...")
# Round before grouping to avoid floating point precision issues
combined_data['LON_0'] = combined_data['LON_0'].round(2)
combined_data['LAT_0'] = combined_data['LAT_0'].round(2)
# Group by 0.1° grid starting point and magnitude
group_cols_simple = ['LON_0', 'LAT_0', 'MAG_0']
# Define aggregation functions
agg_funcs = {
'RATE': 'sum', # Sum forecast rates
'LON_1': 'first', # Take first value for other columns
'LAT_1': 'first',
'Z_0': 'first',
'Z_1': 'first',
'MAG_1': 'first',
'FLAG': 'first'
}
summed_data = combined_data.groupby(group_cols_simple, as_index=False).agg(agg_funcs)
print(f"Original data points: {len(combined_data)}, Summed data points: {len(summed_data)}")
return summed_data
# Create CSEP-compatible output file
def create_csep_forecast_file(data, output_file):
"""
Create CSEP-compatible forecast file.
Args:
data: Forecast data to write (pd.DataFrame)
output_file: Path to output file (str)
Returns:
None
Writes data to file in CSEP format
"""
df = data.copy()
# Ensure correct numeric format
df['LON_0'] = df['LON_0'].astype(float)
df['LON_1'] = df['LON_1'].astype(float)
df['LAT_0'] = df['LAT_0'].astype(float)
df['LAT_1'] = df['LAT_1'].astype(float)
df['Z_0'] = df['Z_0'].astype(float)
df['Z_1'] = df['Z_1'].astype(float)
df['MAG_0'] = df['MAG_0'].astype(float)
df['MAG_1'] = df['MAG_1'].astype(float)
df['RATE'] = df['RATE'].astype(float)
df['FLAG'] = df['FLAG'].astype(int)
order = ['LON_0', 'LON_1', 'LAT_0', 'LAT_1', 'Z_0', 'Z_1', 'MAG_0', 'MAG_1', 'RATE', 'FLAG']
with open(output_file, 'w') as f:
for _, row in df[order].iterrows():
line = f"{row['LON_0']}\t{row['LON_1']}\t{row['LAT_0']}\t{row['LAT_1']}\t{row['Z_0']}\t{row['Z_1']}\t{row['MAG_0']}\t{row['MAG_1']}\t{row['RATE']:.16e}\t{row['FLAG']}\n"
f.write(line)
print(f"Created CSEP-compatible file: {output_file}")
# Calculate date range
def calculate_date_range(start_year, period):
"""
Calculate start and end dates based on starting year and period number.
Args:
start_year: Starting year of forecast (int)
period: Period number (starting from 1) (int)
Returns:
tuple
(start_date, end_date) representing the 3-month period
"""
# Calculate quarter start
quarter = ((period - 1) % 4) + 1 # Convert period to quarter (1-4)
year_offset = (period - 1) // 4 # Calculate year offset
year = start_year + year_offset
# Set month based on quarter
month = (quarter - 1) * 3 + 1
# Create start date
start_date = time_utils.strptime_to_utc_datetime(f'{year}-{month:02d}-01 00:00:00.0')
# Calculate end date (last day of the three months)
if month == 1: # Quarter starting in January
end_date = time_utils.strptime_to_utc_datetime(f'{year}-03-31 23:59:59.0')
elif month == 4: # Quarter starting in April
end_date = time_utils.strptime_to_utc_datetime(f'{year}-06-30 23:59:59.0')
elif month == 7: # Quarter starting in July
end_date = time_utils.strptime_to_utc_datetime(f'{year}-09-30 23:59:59.0')
else: # Quarter starting in October
end_date = time_utils.strptime_to_utc_datetime(f'{year}-12-31 23:59:59.0')
return start_date, end_date
def get_top_earthquakes(catalog_file, start_date, end_date, n=3):
"""
Find the largest n earthquakes within the specified time range from earthquake catalog.
*** WARNING: ***
The provided logs show "Found 0 earthquakes". This most likely means that
the hard-coded field indices (e.g., quake[6], quake[7], quake[9]) in this function
do not match the actual format of your 'HORUS_Italy_filtered.mat' file.
You must manually modify these indices (e.g., quake[i]) to correctly read earthquakes.
Args:
catalog_file: Path to earthquake catalog .mat file (str)
start_date: Start of time range (datetime or str)
end_date: End of time range (datetime or str)
n : int, optional
Number of top earthquakes to return (default: 3)
Returns:
list
List of tuples (lon, lat, magnitude, time, depth) for top earthquakes
"""
print(f"Reading earthquake catalog: {catalog_file}")
if not os.path.exists(catalog_file):
print(f"*** ERROR: Cannot find earthquake catalog file {catalog_file}. Skipping earthquake marking. ***")
return []
try:
mat_data = scipy.io.loadmat(catalog_file)
print(f"MATLAB file variable names: {list(mat_data.keys())}")
catalog_key = None
# Your logs show 'HORUS' is the correct key
if 'HORUS' in mat_data:
catalog_key = 'HORUS'
else:
data_vars = [k for k in mat_data.keys() if not k.startswith('__') and isinstance(mat_data[k], np.ndarray)]
if data_vars:
data_vars.sort(key=lambda k: mat_data[k].size, reverse=True)
catalog_key = data_vars[0]
print(f"Warning: 'HORUS' not found. Auto-using largest array: '{catalog_key}'")
else:
print("ERROR: Cannot find any earthquake catalog data array.")
return []
catalog = mat_data[catalog_key]
print(f"Using variable '{catalog_key}' (Dimensions: {catalog.shape})")
if isinstance(start_date, str):
start_date = time_utils.strptime_to_utc_datetime(start_date)
if isinstance(end_date, str):
end_date = time_utils.strptime_to_utc_datetime(end_date)
filtered_quakes = []
# --- !!! You likely need to modify the indices here !!! ---
# Assumed format: [0:Y, 1:M, 2:D, 3:H, 4:Min, 5:Sec, 6:Lat, 7:Lon, 8:Depth, 9:Mag]
print("Iterating through catalog (assuming format: Y,M,D,H,M,S,Lat,Lon,Depth,Mag)...")
print("...If format differs, please modify the quake[i] indices in this function.")
for quake in catalog:
try:
year, month, day = int(quake[0]), int(quake[1]), int(quake[2])
hour, minute = int(quake[3]), int(quake[4])
second = float(quake[5])
lat, lon = float(quake[6]), float(quake[7]) # Latitude in column 7, longitude in column 8
depth, magnitude = float(quake[8]), float(quake[9])
second_int = int(second)
microsecond = int((second - second_int) * 1000000)
quake_time = datetime(
year, month, day,
hour, minute, second_int,
microsecond=microsecond,
tzinfo=timezone.utc
)
if start_date <= quake_time <= end_date:
filtered_quakes.append((lon, lat, magnitude, quake_time, depth))
except (ValueError, IndexError) as e:
pass # Skip invalid dates or data that doesn't match format
print(f"Found {len(filtered_quakes)} earthquakes within the specified time range")
top_earthquakes = sorted(filtered_quakes, key=lambda x: x[2], reverse=True)[:n]
print(f"Top {len(top_earthquakes)} earthquakes:")
for i, eq in enumerate(top_earthquakes):
print(f" {i+1}. Magnitude {eq[2]:.1f}, Location: ({eq[0]:.4f}°E, {eq[1]:.4f}°N), Depth: {eq[4]:.1f}km")
return top_earthquakes
except Exception as e:
print(f"Serious error occurred while reading earthquake catalog: {str(e)}")
import traceback
traceback.print_exc()
return []
def my_custom_loader_function(filename):
"""
Custom loader function for HORUS_TW_A.mat catalog data.
Args:
filename: Path to the .mat file containing the earthquake catalog (str)
Returns:
eventlist: list
List of events compatible with PyCSEP, each event as a tuple:
(event_id, origin_time, latitude, longitude, depth, magnitude)
"""
# Load the .mat file
try:
mat_data = scipy.io.loadmat(filename)
# Try to find the HORUS data
if 'HORUS' in mat_data:
horus_data = mat_data['HORUS']
else:
# Find the largest array in the .mat file as a fallback
largest_var = None
largest_size = 0
for key, value in mat_data.items():
if isinstance(value, np.ndarray) and value.size > largest_size:
if not key.startswith('__'): # Skip metadata variables
largest_var = key
largest_size = value.size
if largest_var is None:
raise ValueError(f"No suitable data array found in {filename}")
horus_data = mat_data[largest_var]
except Exception as e:
raise IOError(f"Failed to load {filename}: {str(e)}")
# Process each earthquake event
eventlist = []
skipped = 0
for i, event in enumerate(horus_data):
try:
# Extract data from columns according to the provided format
year = int(event[0])
month = int(event[1])
day = int(event[2])
hour = int(event[3])
minute = int(event[4])
# Properly handle floating-point seconds
second_float = float(event[5])
second = int(second_float)
microsecond = int((second_float - second) * 1000000)
latitude = float(event[6])
longitude = float(event[7])
depth = float(event[8])
magnitude = round(float(event[9]), 1) # Round magnitude to one decimal place
# Create datetime object with proper microsecond handling
try:
dt = datetime(year, month, day, hour, minute, second, microsecond, tzinfo=timezone.utc)
# Convert to PyCSEP compatible format (float timestamp)
origin_time = time_utils.datetime_to_utc_epoch(dt)
except (ValueError, OverflowError) as e:
# Skip events with invalid dates
if skipped < 10: # Only show first 10 errors to avoid excessive output
print(f"Skipping event {i}: Invalid datetime ({year}-{month}-{day} {hour}:{minute}:{second_float}) - {e}")
elif skipped == 10:
print("More invalid events found. Suppressing further messages...")
skipped += 1
continue
# Generate a unique event ID (string)
event_id = f'HORUS-{i+1:06d}'
# Add the event to the list with proper types
eventlist.append((event_id, origin_time, latitude, longitude, depth, magnitude))
except (ValueError, IndexError) as e:
if skipped < 10:
print(f"Skipping event {i}: {e}")
elif skipped == 10:
print("More errors found. Suppressing further messages...")
skipped += 1
continue
print(f"Successfully loaded {len(eventlist)} events from {filename}")
if skipped > 0:
print(f"Note: {skipped} events were skipped due to errors")
return eventlist