From 8a6d0ebf0e1f935d0834b4fdf9185b7c0b9bae36 Mon Sep 17 00:00:00 2001 From: Robert Jackson Date: Wed, 7 Jan 2026 10:45:44 -0600 Subject: [PATCH] ADD: First working package, no unit tests yet. --- radclss/config/default_config.py | 1 - radclss/core/radclss_core.py | 141 +++++++++++++++++++++---------- radclss/io/__init__.py | 1 + radclss/io/write.py | 52 ++++++++++++ radclss/util/__init__py | 3 +- radclss/util/column_utils.py | 39 ++++++--- radclss/util/dod.py | 106 ----------------------- radclss/vis/quicklooks.py | 69 ++++++--------- 8 files changed, 202 insertions(+), 210 deletions(-) create mode 100644 radclss/io/__init__.py create mode 100644 radclss/io/write.py delete mode 100644 radclss/util/dod.py diff --git a/radclss/config/default_config.py b/radclss/config/default_config.py index 9698199..67f5c95 100644 --- a/radclss/config/default_config.py +++ b/radclss/config/default_config.py @@ -6,7 +6,6 @@ """ # Define variables to drop from RadCLss from the respective datastreams DEFAULT_DISCARD_VAR = {'radar' : ['classification_mask', - 'censor_mask', 'uncorrected_copol_correlation_coeff', 'uncorrected_differential_phase', 'uncorrected_differential_reflectivity', diff --git a/radclss/core/radclss_core.py b/radclss/core/radclss_core.py index 23ae3db..bed70cc 100644 --- a/radclss/core/radclss_core.py +++ b/radclss/core/radclss_core.py @@ -3,9 +3,9 @@ import xarray as xr import act import xradar as xd +import numpy as np from ..util.column_utils import subset_points, match_datasets_act -from ..util.dod import adjust_radclss_dod from ..config.default_config import DEFAULT_DISCARD_VAR from ..config.output_config import get_output_config from dask.distributed import Client, as_completed @@ -68,7 +68,7 @@ def radclss(volumes, input_site_dict, serial=True, dod_version='', discard_var={ current_client = Client.current() if current_client is None: raise RuntimeError("No Dask client found. Please start a Dask client before running in parallel mode.") - results = current_client.map(subset_points, volumes["radar"], input_site_dict=input_site_dict) + results = current_client.map(subset_points, volumes["radar"], sonde=volumes["sonde"], input_site_dict=input_site_dict) for done_work in as_completed(results, with_results=False): try: columns.append(done_work.result()) @@ -78,7 +78,7 @@ def radclss(volumes, input_site_dict, serial=True, dod_version='', discard_var={ for rad in volumes['radar']: if verbose: print(f"Processing file: {rad}") - columns.append(subset_points(rad, input_site_dict=input_site_dict)) + columns.append(subset_points(rad, sonde=volumes["sonde"], input_site_dict=input_site_dict)) if verbose: print("Processed file: ", rad) print("Current number of successful columns: ", len(columns)) @@ -86,37 +86,70 @@ def radclss(volumes, input_site_dict, serial=True, dod_version='', discard_var={ print(columns[-1]) # Assemble individual columns into single DataSet - try: - # Concatenate all extracted columns across time dimension to form daily timeseries - output_config = get_output_config() - output_platform = output_config['platform'] - output_level = output_config['level'] - ds_concat = xr.concat([data for data in columns if data], dim="time") - ds = act.io.create_ds_from_arm_dod(f'{output_platform}-{output_level}', - {'time': ds_concat.sizes['time'], - 'height': ds_concat.sizes['height'], - 'station': ds_concat.sizes['station']}, - version=dod_version) - - - ds['time'] = ds_concat.sel(station=base_station).base_time - ds['time_offset'] = ds_concat.sel(station=base_station).base_time - ds['base_time'] = ds_concat.sel(station=base_station).isel(time=0).base_time - ds['lat'] = ds_concat.isel(time=0).lat - ds['lon'] = ds_concat.isel(time=0).lon - ds['alt'] = ds_concat.isel(time=0).alt - for var in ds_concat.data_vars: - if var not in ['time', 'time_offset', 'base_time', 'lat', 'lon', 'alt']: + #try: + # Concatenate all extracted columns across time dimension to form daily timeseries + output_config = get_output_config() + output_platform = output_config['platform'] + output_level = output_config['level'] + ds_concat = xr.concat([data for data in columns if data], dim="time") + if verbose: + print("Grabbing DOD for platform/level: ", f'{output_platform}.{output_level}') + ds = act.io.create_ds_from_arm_dod(f'{output_platform}.{output_level}', + {'time': ds_concat.sizes['time'], + 'height': ds_concat.sizes['height'], + 'station': ds_concat.sizes['station']}, + version=dod_version) + + + ds['time'] = ds_concat.sel(station=base_station).base_time + ds['time_offset'] = ds_concat.sel(station=base_station).base_time + ds['base_time'] = ds_concat.sel(station=base_station).isel(time=0).base_time + ds['station'] = ds_concat['station'] + ds['height'] = ds_concat['height'] + ds['lat'][:] = ds_concat.isel(time=0)["lat"][:] + ds['lon'][:] = ds_concat.isel(time=0)["lon"][:] + ds['alt'][:] = ds_concat.isel(time=0)["alt"][:] + + for var in ds_concat.data_vars: + if var not in ['time', 'time_offset', 'base_time', 'lat', 'lon', 'alt']: + if var in ds.data_vars: + if verbose: + print(f"Adding variable to output dataset: {var}") + print(f"Original dtype: {ds[var].dtype}, New dtype: {ds_concat[var].dtype}") + old_type = ds[var].dtype + + # Assign data and convert to original dtype ds[var][:] = ds_concat[var][:] + ds[var] = ds[var].astype(old_type) + if "_FillValue" in ds[var].attrs: + if isinstance(ds[var].attrs["_FillValue"], str): + if ds[var].dtype == 'float32': + ds[var].attrs["_FillValue"] = np.float32(ds[var].attrs["_FillValue"]) + elif ds[var].dtype == 'float64': + ds[var].attrs["_FillValue"] = np.float64(ds[var].attrs["_FillValue"]) + elif ds[var].dtype == 'int32': + ds[var].attrs["_FillValue"] = np.int32(ds[var].attrs["_FillValue"]) + elif ds[var].dtype == 'int64': + ds[var].attrs["_FillValue"] = np.int64(ds[var].attrs["_FillValue"]) + ds[var] = ds[var].fillna(ds[var].attrs["_FillValue"]).astype(float) + if "missing_value" in ds[var].attrs: + if isinstance(ds[var].attrs["missing_value"], str): + if ds[var].dtype == 'float32': + ds[var].attrs["missing_value"] = np.float32(ds[var].attrs["missing_value"]) + elif ds[var].dtype == 'float64': + ds[var].attrs["missing_value"] = np.float64(ds[var].attrs["missing_value"]) + elif ds[var].dtype == 'int32': + ds[var].attrs["missing_value"] = np.int32(ds[var].attrs["missing_value"]) + elif ds[var].dtype == 'int64': + ds[var].attrs["missing_value"] = np.int64(ds[var].attrs["missing_value"]) + ds[var] = ds[var].fillna(ds[var].attrs["missing_value"]).astype(float) - # Remove all the unused CMAC variables - ds = ds.drop_vars(discard_var["radar"]) - # Drop duplicate latitude and longitude - ds = ds.drop_vars(['latitude', 'longitude']) - del ds_concat - except ValueError as e: - print(f"Error concatenating columns: {e}") - ds = None + # Remove all the unused CMAC variables + # Drop duplicate latitude and longitude + del ds_concat + #except ValueError as e: + # print(f"Error concatenating columns: {e}") + # ds = None # Free up Memory del columns @@ -130,14 +163,24 @@ def radclss(volumes, input_site_dict, serial=True, dod_version='', discard_var={ # Find all of the met stations and match to columns vol_keys = list(volumes.keys()) for k in vol_keys: - instrument, site = k.split("_", 1) - + if len(volumes[k]) == 0: + if verbose: + print(f"No files found for instrument/site: {k}") + continue + if "_" in k: + instrument, site = k.split("_", 1) + else: + instrument = k + site = base_station if instrument == "met": + if verbose: + print(f"Matching MET data for site: {site}") ds = match_datasets_act(ds, volumes[k][0], site.upper(), resample="mean", - discard=discard_var['met']) + discard=discard_var['met'], + verbose=verbose) # Radiosonde if instrument == "sonde": @@ -156,45 +199,51 @@ def radclss(volumes, input_site_dict, serial=True, dod_version='', discard_var={ site.upper(), discard=discard_var[instrument], DataSet=True, - resample="mean") + resample="mean", + verbose=verbose) # clean up del grd_ds if instrument == "pluvio": # Weighing Bucket Rain Gauge ds = match_datasets_act(ds, - volumes[k][0], + volumes[k], site.upper(), - discard=discard_var["pluvio"]) + discard=discard_var["pluvio"], + verbose=verbose) if instrument == "ld": ds = match_datasets_act(ds, - volumes[k][0], + volumes[k], site.upper(), discard=discard_var['ldquants'], resample="mean", - prefix="ldquants_") - + prefix="ldquants_", + verbose=verbose) if instrument == "vd": # Laser Disdrometer - Supplemental Site ds = match_datasets_act(ds, - volumes[k][0], + volumes[k], site.upper(), discard=discard_var['vdisquants'], resample="mean", - prefix="vdisquants_") + prefix="vdisquants_", + verbose=verbose) if instrument == "wxt": # Laser Disdrometer - Supplemental Site ds = match_datasets_act(ds, - volumes[k][0], + volumes[k], site.upper(), discard=discard_var['wxt'], - resample="mean") + resample="mean", + verbose=verbose) else: # There is no column extraction raise RuntimeError(": RadCLss FAILURE (All Columns Failed to Extract): ") - + del ds["base_time"].attrs["units"] + del ds["time_offset"].attrs["units"] + del ds["time"].attrs["units"] return ds \ No newline at end of file diff --git a/radclss/io/__init__.py b/radclss/io/__init__.py new file mode 100644 index 0000000..fc48cac --- /dev/null +++ b/radclss/io/__init__.py @@ -0,0 +1 @@ +from .write import write_radclss_output diff --git a/radclss/io/write.py b/radclss/io/write.py new file mode 100644 index 0000000..6dd87d5 --- /dev/null +++ b/radclss/io/write.py @@ -0,0 +1,52 @@ +import json +import urllib.request +import warnings + +def write_radclss_output(ds, output_filename, process, version=None): + """Write the RadCLSS output dataset to a NetCDF file. + + Parameters + ---------- + ds : xarray.Dataset + The RadCLSS output dataset. + output_filename : str + The path to the output NetCDF file. + process : str + The process name (i.e. radclss) + version : str + The version of the process used. Set to None to use the latest version. + """ + # Write the dataset to a NetCDF file + base_url = 'https://pcm.arm.gov/pcm/api/dods/' + + # Get data from DOD api + with urllib.request.urlopen(base_url + process) as url: + data = json.loads(url.read().decode()) + keys = list(data['versions'].keys()) + #if version not in keys: + # warnings.warn( + # ' '.join( + # ['Version:', version, 'not available or not specified. Using Version:', keys[-1]] + # ), + # UserWarning, + # ) + version = keys[-1] + variables = data['versions'][version]['vars'] + encoding = {} + for v in variables: + type_str = v['type'] + if v['name'] in ds.variables: + if type_str == 'float': + encoding[v['name']] = {'dtype': 'float32'} + elif type_str == 'double': + encoding[v['name']] = {'dtype': 'float64'} + elif type_str == 'short': + encoding[v['name']] = {'dtype': 'int16'} + elif type_str == 'int': + encoding[v['name']] = {'dtype': 'int32'} + elif type_str == "char": + encoding[v['name']] = {'dtype': 'S1'} + elif type_str == "byte": + encoding[v['name']] = {'dtype': 'int8'} + + ds.to_netcdf(output_filename, format='NETCDF4_CLASSIC', encoding=encoding) \ No newline at end of file diff --git a/radclss/util/__init__py b/radclss/util/__init__py index 558da06..46a4235 100644 --- a/radclss/util/__init__py +++ b/radclss/util/__init__py @@ -1,2 +1 @@ -from .column_utils import subset_points, match_datasets_act -from .dod import adjust_radclss_dod +from .column_utils import subset_points, match_datasets_act diff --git a/radclss/util/column_utils.py b/radclss/util/column_utils.py index 0df4638..aa8da81 100644 --- a/radclss/util/column_utils.py +++ b/radclss/util/column_utils.py @@ -2,7 +2,6 @@ import act import numpy as np import xarray as xr -import xradar as xd import datetime from ..config import DEFAULT_DISCARD_VAR @@ -51,15 +50,16 @@ def subset_points(nfile, input_site_dict, sonde=None, height_bins=np.arange(500, #try: # Read in the file - xradar_ds = xd.io.open_cfradial1_datatree(nfile) - for var in DEFAULT_DISCARD_VAR['radar']: - if var in xradar_ds.data_vars: - xradar_ds = xradar_ds.drop_vars(var) - if xradar_ds["/sweep_0"]["sweep_mode"] == "rhi": - xradar_ds.close() - return None - radar = xradar_ds.pyart.to_radar() - xradar_ds.close() + #xradar_ds = xd.io.open_cfradial1_datatree(nfile) + #for var in DEFAULT_DISCARD_VAR['radar']: + # if var in xradar_ds.data_vars: + # xradar_ds = xradar_ds.drop_vars(var) + #if xradar_ds["/sweep_0"]["sweep_mode"] == "rhi": + # xradar_ds.close() + # return None + #radar = xradar_ds.pyart.to_radar() + #xradar_ds.close() + radar = pyart.io.read(nfile, exclude_fields=DEFAULT_DISCARD_VAR['radar']) # Check for single sweep scans if np.ma.is_masked(radar.sweep_start_ray_index["data"][1:]): radar.sweep_start_ray_index["data"] = np.ma.array([0]) @@ -160,7 +160,8 @@ def match_datasets_act(column, discard, resample='sum', DataSet=False, - prefix=None): + prefix=None, + verbose=False): """ Time synchronization of a Ground Instrumentation Dataset to a Radar Column for Specific Locations using the ARM ACT package @@ -195,6 +196,9 @@ def match_datasets_act(column, prefix : str prefix for the desired spelling of variable names for the input datastream (to fix duplicate variable names between instruments) + + verbose : boolean + Boolean flag to set verbose output during processing. Default is False. Returns ------- @@ -255,6 +259,15 @@ def match_datasets_act(column, matched[var].attrs.update(source=matched.datastream) # Merge the two DataSets - column = xr.merge([column, matched]) - + for k in matched.data_vars: + if k in column.data_vars: + column[k].sel(station=site)[:] = matched.sel(station=site)[k][:].astype(column[k].dtype) + if "_FillValue" in column[k].attrs: + if isinstance(column[k].attrs["_FillValue"], str): + column[k].attrs["_FillValue"] = float(column[k].attrs["_FillValue"]) + column[k] = column[k].fillna(column[k].attrs["_FillValue"]).astype(float) + if "missing_value" in column[k].attrs: + if isinstance(column[k].attrs["missing_value"], str): + column[k].attrs["missing_value"] = float(column[k].attrs["missing_value"]) + column[k] = column[k].fillna(column[k].attrs["missing_value"]).astype(float) return column diff --git a/radclss/util/dod.py b/radclss/util/dod.py deleted file mode 100644 index 72152ea..0000000 --- a/radclss/util/dod.py +++ /dev/null @@ -1,106 +0,0 @@ -import numpy as np -import xarray as xr -import datetime -import getpass -import socket - -from ..config import get_output_config - -def adjust_radclss_dod(radclss, dod): - """ - Adjust the RadCLss DataSet to include missing datastreams - - Parameters - ---------- - radclss : Xarray DataSet - extracted columns and in-situ sensor file - dod : Xarray DataSet - expected datastreams and data standards for RadCLss - - returns - ------- - radclss : Xarray DataSet - Corrected RadCLss that has all expected parmeters and attributes - """ - # Supplied DOD has correct data attributes and all required parameters. - # Update the RadCLss dataset variable values with the DOD dataset. - for var in dod.variables: - # Make sure the variable isn't a dimension - if var not in dod.dims: - # check to see if variable is within RadCLss - # note: it may not be if file is missing. - if var not in radclss.variables: - new_size = [] - for dim in dod[var].dims: - if dim == "time": - new_size.append(radclss.sizes['time']) - else: - new_size.append(dod.sizes[dim]) - #new_data = np.full(()) - # create a new array to hold the updated values - new_data = np.full(new_size, dod[var].data[0]) - # create a new DataArray and add back into RadCLss - new_array = xr.DataArray(new_data, dims=dod[var].dims) - new_array.attrs = dod[var].attrs - radclss[var] = new_array - - # clean up some saved values - del new_size, new_data, new_array - - # Adjust the radclss time attributes - if hasattr(radclss['time'], "units"): - del radclss["time"].attrs["units"] - if hasattr(radclss['time_offset'], "units"): - del radclss["time_offset"].attrs["units"] - if hasattr(radclss['base_time'], "units"): - del radclss["base_time"].attrs["units"] - - # reorder the DataArrays to match the ARM Data Object Identifier - first = ["base_time", "time_offset", "time", "height", "station", "gate_time"] - last = ["lat", "lon", "alt"] # the three you want last - - # Keep only data variables, preserve order, and drop the ones already in first/last - middle = [v for v in radclss.data_vars if v not in first + last] - - ordered = first + middle + last - radclss = radclss[ordered] - - # Update the global attributes (some) to match the DOD - - config = get_output_config() - OUTPUT_SITE = config['site'] - OUTPUT_FACILITY = config['facility'] - OUTPUT_PLATFORM = config['platform'] - OUTPUT_LEVEL = config['level'] - radclss.drop_attrs() - radclss.attrs = dod.attrs - radclss.attrs['vap_name'] = "" - radclss.attrs['command_line'] = "python bnf_radclss.py --serial True --array True" - radclss.attrs['site_id'] = OUTPUT_SITE - radclss.attrs['platform_id'] = OUTPUT_PLATFORM - radclss.attrs['facility_id'] = OUTPUT_FACILITY - radclss.attrs['data_level'] = OUTPUT_LEVEL - radclss.attrs['location_description'] = "Southeast U.S. in Bankhead National Forest (BNF), Decatur, Alabama" - radclss.attrs['datastream'] = "bnfcsapr2radclssS3.c2" - radclss.attrs['input_datastreams'] = ["bnfcsapr2cmacS3.c1", - 'bnfmetM1.b1', - 'bnfmetS20.b1', - "bnfmetS30.b1", - "bnfmetS40.b1", - "bnfsondewnpnM1.b1", - "bnfwbpluvio2M1.a1", - "bnfldquantsM1.c1", - "bnfldquantsS30.c1", - "bnfvdisquantsM1.c1", - "bnfmetwxtS13.b1" - ] - username = getpass.getuser() - hostname = socket.gethostname() - radclss.attrs['history'] = ("created by user " + username + " on machine " + hostname + " at " + - str(datetime.datetime.now()) + - " using Py-ART and ACT" - ) - # return radclss, close DOD file - del dod - - return radclss \ No newline at end of file diff --git a/radclss/vis/quicklooks.py b/radclss/vis/quicklooks.py index 424cbd2..3cf4199 100644 --- a/radclss/vis/quicklooks.py +++ b/radclss/vis/quicklooks.py @@ -11,15 +11,14 @@ def create_radclss_columns(radclss, field="corrected_reflectivity", p_vmin=-5, - p_vmax=65, - outdir="./"): + p_vmax=65): """ With the RadCLss product, generate a figure of all extracted columns Input ----- - radclss : str - Filepath to the RadCLss file. + radclss : str or xarray dataset + Filepath to the RadCLss file or xarray dataset with the columns. field : str Specific CMAC field to display extracted column of vmin : int @@ -28,14 +27,13 @@ def create_radclss_columns(radclss, vmax : int Maximum value to display between all subplots for the specific radar parameter - outdir : str - Path to desired output directory. If not supplied, assumes current - working directory. Output ------ - timeseries : png - Saved image of the RadCLss timeseris + fig: matplotlib figure + Figure containing the extracted columns. + axarr : matplotlib axes array + Array of matplotlib axes containing the extracted columns. """ # Create the figure @@ -43,19 +41,25 @@ def create_radclss_columns(radclss, plt.subplots_adjust(hspace=0.8) # read the RadCLss file - try: - ds = xr.open_dataset(radclss[0], decode_timedelta=False) - except Exception as e: - exc_type, exc_obj, exc_tb = sys.exc_info() - # 'e' will contain the error object - print("\nERROR - (create_radclss_timeseries):" + - f" \n\tOccured When Reading in RadCLss File: \n\t{e}") - print(f"\tError type: {type(e)}") - print(f"\tLine Number: ", exc_tb.tb_lineno) - print(f"\tFile Name: ", exc_tb.tb_frame.f_code.co_filename) - print("\n") - return - + if isinstance(radclss, str): + try: + ds = xr.open_dataset(radclss, decode_timedelta=False) + except Exception as e: + exc_type, exc_obj, exc_tb = sys.exc_info() + # 'e' will contain the error object + print("\nERROR - (create_radclss_timeseries):" + + f" \n\tOccured When Reading in RadCLss File: \n\t{e}") + print(f"\tError type: {type(e)}") + print(f"\tLine Number: ", exc_tb.tb_lineno) + print(f"\tFile Name: ", exc_tb.tb_frame.f_code.co_filename) + print("\n") + return + elif isinstance(radclss, xr.Dataset): + ds = radclss + else: + raise TypeError("\nERROR - (create_radclss_timeseries):" + + f" \n\tRadCLss Input is Not a String or xarray Dataset\n") + # Define the time of the radar file we are plotting against radar_time = datetime.datetime.strptime(np.datetime_as_string(ds['time'].data[0], unit="s"), "%Y-%m-%dT%H:%M:%S") @@ -116,26 +120,7 @@ def create_radclss_columns(radclss, cmap="ChaseSpectral" ) - # save the figure - try: - fig.savefig(outdir + - 'bnf-radclss-columns.' + - radclss[0].split('.')[-3]+ - '.png') - plt.close(fig) - STATUS = "COLUMNS SUCCESS" - except Exception as e: - exc_type, exc_obj, exc_tb = sys.exc_info() - # 'e' will contain the error object - print("\nERROR - (create_radclss_timeseries):" + - f" \n\tOccured When Saving Figure to File: \n\t{e}") - print(f"\tError type: {type(e)}") - print(f"\tLine Number: ", exc_tb.tb_lineno) - print(f"\tFile Name: ", exc_tb.tb_frame.f_code.co_filename) - print("\n") - STATUS = "COLUMNS FAILED" - - return STATUS + return fig, axarr def create_radclss_timeseries(radclss, field="corrected_reflectivity",