diff --git a/gmprocess/data/testdata/demo_steps/compute_metrics/ci38457511/workspace.h5 b/gmprocess/data/testdata/demo_steps/compute_metrics/ci38457511/workspace.h5 index 5280ee26f..0702b83d5 100644 Binary files a/gmprocess/data/testdata/demo_steps/compute_metrics/ci38457511/workspace.h5 and b/gmprocess/data/testdata/demo_steps/compute_metrics/ci38457511/workspace.h5 differ diff --git a/gmprocess/subcommands/base.py b/gmprocess/subcommands/base.py index 0d75c1a71..da4a57c00 100644 --- a/gmprocess/subcommands/base.py +++ b/gmprocess/subcommands/base.py @@ -1,10 +1,12 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +import atexit +import logging import os import sys + from abc import ABC, abstractmethod -import logging from gmprocess.subcommands.lazy_loader import LazyLoader @@ -33,8 +35,16 @@ def __init__(self): """Dictionary instance variable to track files created by module.""" self.files_created = {} + self.workspace = None + + # Make sure we close the workspace on exit + atexit.register(self.close_workspace) + def open_workspace(self, eventid): """Open workspace, add as attribute.""" + + self.close_workspace() + event_dir = os.path.join(self.gmrecords.data_path, eventid) workname = os.path.join(event_dir, const.WORKSPACE_NAME) if not os.path.isfile(workname): @@ -46,10 +56,23 @@ def open_workspace(self, eventid): return eventid self.workspace = ws.StreamWorkspace.open(workname) + self.gmrecords.conf = self._get_config() + + return self.workspace.dataset + + def event_dir(self, event_id): + return os.path.normpath( + os.path.join(self.gmrecords.data_path, event_id) + ) def close_workspace(self): """Close workspace.""" - self.workspace.close() + try: + logging.debug("Closing the workspace.") + self.workspace.close() + except AttributeError: + # Not dataset to close + pass @property @abstractmethod @@ -198,7 +221,12 @@ def _get_labels(self): def _get_config(self): if hasattr(self, "workspace") and hasattr(self.workspace, "config"): + logging.info('Loading config from workspace.') config = self.workspace.config else: - config = confmod.get_config() + logging.info('Loading config from file.') + config = confmod.get_config(config_path=self.gmrecords.conf_path) + # Update the config in the workspace + self.workspace.addConfig(config=config) + return config diff --git a/gmprocess/subcommands/compute_station_metrics.py b/gmprocess/subcommands/compute_station_metrics.py index b2c157c9d..1064892fe 100644 --- a/gmprocess/subcommands/compute_station_metrics.py +++ b/gmprocess/subcommands/compute_station_metrics.py @@ -61,23 +61,10 @@ def main(self, gmrecords): def _event_station_metrics(self, event): self.eventid = event.id logging.info(f"Computing station metrics for event {self.eventid}...") - event_dir = os.path.join(self.gmrecords.data_path, self.eventid) - workname = os.path.normpath( - os.path.join(event_dir, utils.constants.WORKSPACE_NAME) - ) - if not os.path.isfile(workname): - logging.info( - "No workspace file found for event %s. Please run " - "subcommand 'assemble' to generate workspace file." % self.eventid - ) - logging.info("Continuing to next event.") - return event.id - self.workspace = ws.StreamWorkspace.open(workname) - ds = self.workspace.dataset + ds = self.open_workspace(event.id) self._get_labels() - - config = self._get_config() + config = self.gmrecords.conf if not hasattr(self, "vs30_grids"): vs30_grids = None @@ -92,10 +79,9 @@ def _event_station_metrics(self, event): station_list = ds.waveforms.list() if not len(station_list): - self.workspace.close() return event.id - rupture_file = rupt_utils.get_rupture_file(event_dir) + rupture_file = rupt_utils.get_rupture_file(self.event_dir(self.eventid)) origin = rupt.origin.Origin( { "id": self.eventid, @@ -257,7 +243,6 @@ def _event_station_metrics(self, event): "with tag '%s'." % self.gmrecords.args.label ) - self.workspace.close() return event.id def _get_ps2ff_splines(self): @@ -292,6 +277,11 @@ def _get_ps2ff_splines(self): kind="linear", copy=False, assume_sorted=True, + bounds_error=False, + fill_value=( + (Rjb_hat[0], Rjb_var[0]), + (Rjb_hat[-1], Rjb_var[-1]) + ) ) self.rrup_spline = spint.interp1d( repi, @@ -299,4 +289,9 @@ def _get_ps2ff_splines(self): kind="linear", copy=False, assume_sorted=True, + bounds_error=False, + fill_value=( + (Rrup_hat[0], Rrup_var[0]), + (Rrup_hat[-1], Rrup_var[-1]) + ) ) diff --git a/gmprocess/subcommands/compute_waveform_metrics.py b/gmprocess/subcommands/compute_waveform_metrics.py index 556af9345..3b567382c 100644 --- a/gmprocess/subcommands/compute_waveform_metrics.py +++ b/gmprocess/subcommands/compute_waveform_metrics.py @@ -53,21 +53,12 @@ def main(self, gmrecords): def _compute_event_waveform_metrics(self, event): self.eventid = event.id logging.info(f"Computing waveform metrics for event {self.eventid}...") - event_dir = os.path.join(self.gmrecords.data_path, self.eventid) - workname = os.path.normpath(os.path.join(event_dir, const.WORKSPACE_NAME)) - if not os.path.isfile(workname): - logging.info( - "No workspace file found for event %s. Please run " - "subcommand 'assemble' to generate workspace file." % self.eventid - ) - logging.info("Continuing to next event.") - return event.id - self.workspace = ws.StreamWorkspace.open(workname) - ds = self.workspace.dataset - station_list = ds.waveforms.list() + ds = self.open_workspace(event.id) self._get_labels() - config = self._get_config() + config = self.gmrecords.conf + + station_list = ds.waveforms.list() summaries = [] metricpaths = [] @@ -148,5 +139,4 @@ def _compute_event_waveform_metrics(self, event): overwrite=self.gmrecords.args.overwrite, ) - self.workspace.close() return event.id diff --git a/gmprocess/subcommands/export_failure_tables.py b/gmprocess/subcommands/export_failure_tables.py index 485fb6bfe..84fde044f 100644 --- a/gmprocess/subcommands/export_failure_tables.py +++ b/gmprocess/subcommands/export_failure_tables.py @@ -59,19 +59,11 @@ def main(self, gmrecords): for event in self.events: self.eventid = event.id logging.info(f"Creating failure tables for event {self.eventid}...") - event_dir = os.path.join(self.gmrecords.data_path, self.eventid) - workname = os.path.normpath(os.path.join(event_dir, const.WORKSPACE_NAME)) - if not os.path.isfile(workname): - logging.info( - "No workspace file found for event %s. Please run " - "subcommand 'assemble' to generate workspace file." % self.eventid - ) - logging.info("Continuing to next event.") - continue - self.workspace = ws.StreamWorkspace.open(workname) + self.open_workspace(event.id) + self._get_labels() + self._get_pstreams() - self.workspace.close() if not (hasattr(self, "pstreams") and len(self.pstreams) > 0): logging.info( @@ -84,7 +76,7 @@ def main(self, gmrecords): base_file_name = os.path.normpath( os.path.join( - event_dir, + self.event_dir(event.id), "%s_%s_failure_reasons_%s" % ( gmrecords.project, diff --git a/gmprocess/subcommands/export_metric_tables.py b/gmprocess/subcommands/export_metric_tables.py index 6afd0fddc..d6209dec9 100644 --- a/gmprocess/subcommands/export_metric_tables.py +++ b/gmprocess/subcommands/export_metric_tables.py @@ -44,19 +44,10 @@ def main(self, gmrecords): for event in self.events: self.eventid = event.id logging.info(f"Creating tables for event {self.eventid}...") - event_dir = os.path.join(gmrecords.data_path, self.eventid) - workname = os.path.normpath(os.path.join(event_dir, const.WORKSPACE_NAME)) - if not os.path.isfile(workname): - logging.info( - "No workspace file found for event %s. Please run " - "subcommand 'assemble' to generate workspace file." % self.eventid - ) - logging.info("Continuing to next event.") - continue - self.workspace = ws.StreamWorkspace.open(workname) + self.open_workspace(event.id) self._get_labels() - config = self._get_config() + config = self.gmrecords.conf event_table, imc_tables, readmes = self.workspace.getTables( self.gmrecords.args.label, config @@ -72,7 +63,6 @@ def main(self, gmrecords): snr_table, snr_readme = self.workspace.getSNRTable( self.eventid, self.gmrecords.args.label, config ) - self.workspace.close() outdir = gmrecords.data_path diff --git a/gmprocess/subcommands/export_provenance_tables.py b/gmprocess/subcommands/export_provenance_tables.py index f0f18d43a..35b21c4c8 100644 --- a/gmprocess/subcommands/export_provenance_tables.py +++ b/gmprocess/subcommands/export_provenance_tables.py @@ -41,41 +41,31 @@ def main(self, gmrecords): for event in self.events: self.eventid = event.id logging.info(f"Creating provenance tables for event {self.eventid}...") - event_dir = os.path.normpath( - os.path.join(gmrecords.data_path, self.eventid) - ) - workname = os.path.join(event_dir, const.WORKSPACE_NAME) - if not os.path.isfile(workname): - logging.info( - "No workspace file found for event %s. Please run " - "subcommand 'assemble' to generate workspace file." % self.eventid - ) - logging.info("Continuing to next event.") - continue - self.workspace = ws.StreamWorkspace.open(workname) + self.open_workspace(event.id) self._get_pstreams() if not (hasattr(self, "pstreams") and len(self.pstreams) > 0): logging.info( "No processed waveforms available. No provenance tables created." ) - self.workspace.close() continue provdata = self.workspace.getProvenance( self.eventid, labels=self.gmrecords.args.label ) - self.workspace.close() - basename = f"{gmrecords.project}_{gmrecords.args.label}_provenance" + basename = os.path.join( + self.event_dir(event.id), + f"{gmrecords.project}_{gmrecords.args.label}_provenance" + ) if gmrecords.args.output_format == "csv": - csvfile = os.path.join(event_dir, f"{basename}.csv") - self.append_file("Provenance", csvfile) - provdata.to_csv(csvfile, index=False) + fname = basename + '.csv' + self.append_file("Provenance", fname) + provdata.to_csv(fname, index=False) else: - excelfile = os.path.join(event_dir, f"{basename}.xlsx") - self.append_file("Provenance", excelfile) - provdata.to_excel(excelfile, index=False) + fname = basename + '.xslx' + self.append_file("Provenance", fname) + provdata.to_excel(fname, index=False) self._summarize_files_created() diff --git a/gmprocess/subcommands/export_shakemap.py b/gmprocess/subcommands/export_shakemap.py index 576a1554c..22808e4fd 100644 --- a/gmprocess/subcommands/export_shakemap.py +++ b/gmprocess/subcommands/export_shakemap.py @@ -54,31 +54,19 @@ def main(self, gmrecords): self.eventid = event.id logging.info(f"Creating shakemap files for event {self.eventid}...") - event_dir = os.path.normpath(os.path.join(gmrecords.data_path, event.id)) - workname = os.path.join(event_dir, const.WORKSPACE_NAME) - if not os.path.isfile(workname): - logging.info( - "No workspace file found for event %s. Please run " - "subcommand 'assemble' to generate workspace file." % event.id - ) - logging.info("Continuing to next event.") - continue - - self.workspace = ws.StreamWorkspace.open(workname) + self.open_workspace(event.id) self._get_labels() - config = self._get_config() expanded_imts = self.gmrecords.args.expand_imts jsonfile, stationfile, _ = sm_utils.create_json( self.workspace, event, - event_dir, + self.event_dir(event.id), self.gmrecords.args.label, - config=config, + config=self.gmrecords.conf, expanded_imts=expanded_imts, ) - self.workspace.close() if jsonfile is not None: self.append_file("shakemap", jsonfile) if stationfile is not None: diff --git a/gmprocess/subcommands/generate_report.py b/gmprocess/subcommands/generate_report.py index 62903da0c..a4f550adc 100644 --- a/gmprocess/subcommands/generate_report.py +++ b/gmprocess/subcommands/generate_report.py @@ -44,25 +44,27 @@ def main(self, gmrecords): logging.info(f"Running subcommand '{self.command_name}'") self.gmrecords = gmrecords + self._check_arguments() self._get_events() for event in self.events: - event_dir = os.path.join(self.gmrecords.data_path, event.id) + self.open_workspace(event.id) + config = self.gmrecords.conf + pstreams = self.generate_diagnostic_plots(event) if pstreams is None: return logging.info(f"Generating summary report for event {event.id}...") - config = self._get_config() build_conf = config["build_report"] if build_conf["enabled"]: report_format = build_conf["format"] if report_format == "latex": report_file, success = report.build_report_latex( pstreams, - event_dir, + self.event_dir(event.id), event, prefix=f"{gmrecords.project}_{gmrecords.args.label}", config=config, @@ -77,18 +79,6 @@ def main(self, gmrecords): self._summarize_files_created() def generate_diagnostic_plots(self, event): - event_dir = os.path.join(self.gmrecords.data_path, event.id) - workname = os.path.join(event_dir, const.WORKSPACE_NAME) - if not os.path.isfile(workname): - logging.info( - "No workspace file found for event %s. Please run " - "subcommand 'assemble' to generate workspace file." % event.id - ) - logging.info("Continuing to next event.") - return False - - self.workspace = ws.StreamWorkspace.open(workname) - config = self._get_config() ds = self.workspace.dataset station_list = ds.waveforms.list() if len(station_list) == 0: @@ -103,6 +93,7 @@ def generate_diagnostic_plots(self, event): ) logging.info(f"Creating diagnostic plots for event {event.id}...") + event_dir = self.event_dir(event.id) plot_dir = os.path.join(event_dir, "plots") if not os.path.isdir(plot_dir): os.makedirs(plot_dir) @@ -114,7 +105,7 @@ def generate_diagnostic_plots(self, event): event.id, stations=[station_id], labels=[self.gmrecords.args.label], - config=config, + config=self.gmrecords.conf, ) if not len(streams): logging.info("No matching streams found. Cannot generate report.") @@ -128,12 +119,12 @@ def generate_diagnostic_plots(self, event): stream, plot_dir, event, - config=config, + config=self.gmrecords.conf, ) futures.append(future) else: results.append( - plot.summary_plots(stream, plot_dir, event, config=config) + plot.summary_plots(stream, plot_dir, event, config=self.gmrecords.conf) ) if self.gmrecords.args.num_processes > 0: @@ -145,6 +136,4 @@ def generate_diagnostic_plots(self, event): plot.plot_moveout(pstreams, event.latitude, event.longitude, file=moveoutfile) self.append_file("Moveout plot", moveoutfile) - self.workspace.close() - return pstreams diff --git a/gmprocess/subcommands/generate_station_maps.py b/gmprocess/subcommands/generate_station_maps.py index 0a793a52d..d4f653813 100644 --- a/gmprocess/subcommands/generate_station_maps.py +++ b/gmprocess/subcommands/generate_station_maps.py @@ -41,25 +41,15 @@ def main(self, gmrecords): self._get_events() for event in self.events: - event_dir = os.path.join(self.gmrecords.data_path, event.id) - workname = os.path.join(event_dir, const.WORKSPACE_NAME) - if not os.path.isfile(workname): - logging.info( - "No workspace file found for event %s. Please run " - "subcommand 'assemble' to generate workspace file." % event.id - ) - logging.info("Continuing to next event.") - return False + ds = self.open_workspace(event.id) - self.workspace = ws.StreamWorkspace.open(workname) - ds = self.workspace.dataset station_list = ds.waveforms.list() if len(station_list) == 0: logging.info("No processed waveforms available. No report generated.") return False self._get_labels() - config = self.workspace.config + config = self.gmrecords.config logging.info(f"Generating station maps for event {event.id}...") pstreams = [] @@ -76,7 +66,8 @@ def main(self, gmrecords): for stream in streams: pstreams.append(stream) - mapfile = report_utils.draw_stations_map(pstreams, event, event_dir) + mapfile = report_utils.draw_stations_map( + pstreams, event, self.event_dir(event.id)) self.append_file("Station map", mapfile) self._summarize_files_created() diff --git a/gmprocess/subcommands/process_waveforms.py b/gmprocess/subcommands/process_waveforms.py index bd07ba6fd..d6b0534c0 100644 --- a/gmprocess/subcommands/process_waveforms.py +++ b/gmprocess/subcommands/process_waveforms.py @@ -82,12 +82,11 @@ def _process_event(self, event): for station_id in station_list: # Cannot parallelize IO to ASDF file - config = self._get_config() raw_streams = self.workspace.getStreams( event.id, stations=[station_id], labels=["unprocessed"], - config=config, + config=self.gmrecords.conf, ) if self.gmrecords.args.reprocess: # Don't use "processed_streams" variable name because that is what is @@ -98,7 +97,7 @@ def _process_event(self, event): event.id, stations=[station_id], labels=[self.process_tag], - config=config, + config=self.gmrecords.conf, ) logging.debug(old_streams.describe()) else: @@ -119,14 +118,14 @@ def _process_event(self, event): processing.process_streams, raw_streams, event, - config, + self.gmrecords.conf, old_streams, ) futures.append(future) else: processed_streams.append( processing.process_streams( - raw_streams, event, config, old_streams + raw_streams, event, self.gmrecords.conf, old_streams ) ) diff --git a/gmprocess/version.py b/gmprocess/version.py index 0e0a8889b..20514be44 100644 --- a/gmprocess/version.py +++ b/gmprocess/version.py @@ -1 +1 @@ -__version__ = "1.1.11.dev554+g586c33a7.d20220726" \ No newline at end of file +__version__ = "1.1.11.dev563+g0a3a6fc6.d20220801" \ No newline at end of file