Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion radclss/config/default_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
"""
# Define variables to drop from RadCLss from the respective datastreams
DEFAULT_DISCARD_VAR = {'radar' : ['classification_mask',
'censor_mask',
'uncorrected_copol_correlation_coeff',
'uncorrected_differential_phase',
'uncorrected_differential_reflectivity',
Expand Down
141 changes: 95 additions & 46 deletions radclss/core/radclss_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import xarray as xr
import act
import xradar as xd
import numpy as np

from ..util.column_utils import subset_points, match_datasets_act
from ..util.dod import adjust_radclss_dod
from ..config.default_config import DEFAULT_DISCARD_VAR
from ..config.output_config import get_output_config
from dask.distributed import Client, as_completed
Expand Down Expand Up @@ -68,7 +68,7 @@ def radclss(volumes, input_site_dict, serial=True, dod_version='', discard_var={
current_client = Client.current()
if current_client is None:
raise RuntimeError("No Dask client found. Please start a Dask client before running in parallel mode.")
results = current_client.map(subset_points, volumes["radar"], input_site_dict=input_site_dict)
results = current_client.map(subset_points, volumes["radar"], sonde=volumes["sonde"], input_site_dict=input_site_dict)
for done_work in as_completed(results, with_results=False):
try:
columns.append(done_work.result())
Expand All @@ -78,45 +78,78 @@ def radclss(volumes, input_site_dict, serial=True, dod_version='', discard_var={
for rad in volumes['radar']:
if verbose:
print(f"Processing file: {rad}")
columns.append(subset_points(rad, input_site_dict=input_site_dict))
columns.append(subset_points(rad, sonde=volumes["sonde"], input_site_dict=input_site_dict))
if verbose:
print("Processed file: ", rad)
print("Current number of successful columns: ", len(columns))
print("Last processed file results: ")
print(columns[-1])

# Assemble individual columns into single DataSet
try:
# Concatenate all extracted columns across time dimension to form daily timeseries
output_config = get_output_config()
output_platform = output_config['platform']
output_level = output_config['level']
ds_concat = xr.concat([data for data in columns if data], dim="time")
ds = act.io.create_ds_from_arm_dod(f'{output_platform}-{output_level}',
{'time': ds_concat.sizes['time'],
'height': ds_concat.sizes['height'],
'station': ds_concat.sizes['station']},
version=dod_version)


ds['time'] = ds_concat.sel(station=base_station).base_time
ds['time_offset'] = ds_concat.sel(station=base_station).base_time
ds['base_time'] = ds_concat.sel(station=base_station).isel(time=0).base_time
ds['lat'] = ds_concat.isel(time=0).lat
ds['lon'] = ds_concat.isel(time=0).lon
ds['alt'] = ds_concat.isel(time=0).alt
for var in ds_concat.data_vars:
if var not in ['time', 'time_offset', 'base_time', 'lat', 'lon', 'alt']:
#try:
# Concatenate all extracted columns across time dimension to form daily timeseries
output_config = get_output_config()
output_platform = output_config['platform']
output_level = output_config['level']
ds_concat = xr.concat([data for data in columns if data], dim="time")
if verbose:
print("Grabbing DOD for platform/level: ", f'{output_platform}.{output_level}')
ds = act.io.create_ds_from_arm_dod(f'{output_platform}.{output_level}',
{'time': ds_concat.sizes['time'],
'height': ds_concat.sizes['height'],
'station': ds_concat.sizes['station']},
version=dod_version)


ds['time'] = ds_concat.sel(station=base_station).base_time
ds['time_offset'] = ds_concat.sel(station=base_station).base_time
ds['base_time'] = ds_concat.sel(station=base_station).isel(time=0).base_time
ds['station'] = ds_concat['station']
ds['height'] = ds_concat['height']
ds['lat'][:] = ds_concat.isel(time=0)["lat"][:]
ds['lon'][:] = ds_concat.isel(time=0)["lon"][:]
ds['alt'][:] = ds_concat.isel(time=0)["alt"][:]

for var in ds_concat.data_vars:
if var not in ['time', 'time_offset', 'base_time', 'lat', 'lon', 'alt']:
if var in ds.data_vars:
if verbose:
print(f"Adding variable to output dataset: {var}")
print(f"Original dtype: {ds[var].dtype}, New dtype: {ds_concat[var].dtype}")
old_type = ds[var].dtype

# Assign data and convert to original dtype
ds[var][:] = ds_concat[var][:]
ds[var] = ds[var].astype(old_type)
if "_FillValue" in ds[var].attrs:
if isinstance(ds[var].attrs["_FillValue"], str):
if ds[var].dtype == 'float32':
ds[var].attrs["_FillValue"] = np.float32(ds[var].attrs["_FillValue"])
elif ds[var].dtype == 'float64':
ds[var].attrs["_FillValue"] = np.float64(ds[var].attrs["_FillValue"])
elif ds[var].dtype == 'int32':
ds[var].attrs["_FillValue"] = np.int32(ds[var].attrs["_FillValue"])
elif ds[var].dtype == 'int64':
ds[var].attrs["_FillValue"] = np.int64(ds[var].attrs["_FillValue"])
ds[var] = ds[var].fillna(ds[var].attrs["_FillValue"]).astype(float)
if "missing_value" in ds[var].attrs:
if isinstance(ds[var].attrs["missing_value"], str):
if ds[var].dtype == 'float32':
ds[var].attrs["missing_value"] = np.float32(ds[var].attrs["missing_value"])
elif ds[var].dtype == 'float64':
ds[var].attrs["missing_value"] = np.float64(ds[var].attrs["missing_value"])
elif ds[var].dtype == 'int32':
ds[var].attrs["missing_value"] = np.int32(ds[var].attrs["missing_value"])
elif ds[var].dtype == 'int64':
ds[var].attrs["missing_value"] = np.int64(ds[var].attrs["missing_value"])
ds[var] = ds[var].fillna(ds[var].attrs["missing_value"]).astype(float)

# Remove all the unused CMAC variables
ds = ds.drop_vars(discard_var["radar"])
# Drop duplicate latitude and longitude
ds = ds.drop_vars(['latitude', 'longitude'])
del ds_concat
except ValueError as e:
print(f"Error concatenating columns: {e}")
ds = None
# Remove all the unused CMAC variables
# Drop duplicate latitude and longitude
del ds_concat
#except ValueError as e:
# print(f"Error concatenating columns: {e}")
# ds = None

# Free up Memory
del columns
Expand All @@ -130,14 +163,24 @@ def radclss(volumes, input_site_dict, serial=True, dod_version='', discard_var={
# Find all of the met stations and match to columns
vol_keys = list(volumes.keys())
for k in vol_keys:
instrument, site = k.split("_", 1)

if len(volumes[k]) == 0:
if verbose:
print(f"No files found for instrument/site: {k}")
continue
if "_" in k:
instrument, site = k.split("_", 1)
else:
instrument = k
site = base_station
if instrument == "met":
if verbose:
print(f"Matching MET data for site: {site}")
ds = match_datasets_act(ds,
volumes[k][0],
site.upper(),
resample="mean",
discard=discard_var['met'])
discard=discard_var['met'],
verbose=verbose)

# Radiosonde
if instrument == "sonde":
Expand All @@ -156,45 +199,51 @@ def radclss(volumes, input_site_dict, serial=True, dod_version='', discard_var={
site.upper(),
discard=discard_var[instrument],
DataSet=True,
resample="mean")
resample="mean",
verbose=verbose)
# clean up
del grd_ds

if instrument == "pluvio":
# Weighing Bucket Rain Gauge
ds = match_datasets_act(ds,
volumes[k][0],
volumes[k],
site.upper(),
discard=discard_var["pluvio"])
discard=discard_var["pluvio"],
verbose=verbose)

if instrument == "ld":
ds = match_datasets_act(ds,
volumes[k][0],
volumes[k],
site.upper(),
discard=discard_var['ldquants'],
resample="mean",
prefix="ldquants_")

prefix="ldquants_",
verbose=verbose)

if instrument == "vd":
# Laser Disdrometer - Supplemental Site
ds = match_datasets_act(ds,
volumes[k][0],
volumes[k],
site.upper(),
discard=discard_var['vdisquants'],
resample="mean",
prefix="vdisquants_")
prefix="vdisquants_",
verbose=verbose)

if instrument == "wxt":
# Laser Disdrometer - Supplemental Site
ds = match_datasets_act(ds,
volumes[k][0],
volumes[k],
site.upper(),
discard=discard_var['wxt'],
resample="mean")
resample="mean",
verbose=verbose)

else:
# There is no column extraction
raise RuntimeError(": RadCLss FAILURE (All Columns Failed to Extract): ")

del ds["base_time"].attrs["units"]
del ds["time_offset"].attrs["units"]
del ds["time"].attrs["units"]
return ds
1 change: 1 addition & 0 deletions radclss/io/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .write import write_radclss_output
52 changes: 52 additions & 0 deletions radclss/io/write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import json
import urllib.request
import warnings

def write_radclss_output(ds, output_filename, process, version=None):
"""Write the RadCLSS output dataset to a NetCDF file.

Parameters
----------
ds : xarray.Dataset
The RadCLSS output dataset.
output_filename : str
The path to the output NetCDF file.
process : str
The process name (i.e. radclss)
version : str
The version of the process used. Set to None to use the latest version.
"""
# Write the dataset to a NetCDF file
base_url = 'https://pcm.arm.gov/pcm/api/dods/'

# Get data from DOD api
with urllib.request.urlopen(base_url + process) as url:
data = json.loads(url.read().decode())
keys = list(data['versions'].keys())
#if version not in keys:
# warnings.warn(
# ' '.join(
# ['Version:', version, 'not available or not specified. Using Version:', keys[-1]]
# ),
# UserWarning,
# )
version = keys[-1]
variables = data['versions'][version]['vars']
encoding = {}
for v in variables:
type_str = v['type']
if v['name'] in ds.variables:
if type_str == 'float':
encoding[v['name']] = {'dtype': 'float32'}
elif type_str == 'double':
encoding[v['name']] = {'dtype': 'float64'}
elif type_str == 'short':
encoding[v['name']] = {'dtype': 'int16'}
elif type_str == 'int':
encoding[v['name']] = {'dtype': 'int32'}
elif type_str == "char":
encoding[v['name']] = {'dtype': 'S1'}
elif type_str == "byte":
encoding[v['name']] = {'dtype': 'int8'}

ds.to_netcdf(output_filename, format='NETCDF4_CLASSIC', encoding=encoding)
3 changes: 1 addition & 2 deletions radclss/util/__init__py
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
from .column_utils import subset_points, match_datasets_act
from .dod import adjust_radclss_dod
from .column_utils import subset_points, match_datasets_act
39 changes: 26 additions & 13 deletions radclss/util/column_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import act
import numpy as np
import xarray as xr
import xradar as xd
import datetime

from ..config import DEFAULT_DISCARD_VAR
Expand Down Expand Up @@ -51,15 +50,16 @@ def subset_points(nfile, input_site_dict, sonde=None, height_bins=np.arange(500,
#try:
# Read in the file

xradar_ds = xd.io.open_cfradial1_datatree(nfile)
for var in DEFAULT_DISCARD_VAR['radar']:
if var in xradar_ds.data_vars:
xradar_ds = xradar_ds.drop_vars(var)
if xradar_ds["/sweep_0"]["sweep_mode"] == "rhi":
xradar_ds.close()
return None
radar = xradar_ds.pyart.to_radar()
xradar_ds.close()
#xradar_ds = xd.io.open_cfradial1_datatree(nfile)
#for var in DEFAULT_DISCARD_VAR['radar']:
# if var in xradar_ds.data_vars:
# xradar_ds = xradar_ds.drop_vars(var)
#if xradar_ds["/sweep_0"]["sweep_mode"] == "rhi":
# xradar_ds.close()
# return None
#radar = xradar_ds.pyart.to_radar()
#xradar_ds.close()
radar = pyart.io.read(nfile, exclude_fields=DEFAULT_DISCARD_VAR['radar'])
# Check for single sweep scans
if np.ma.is_masked(radar.sweep_start_ray_index["data"][1:]):
radar.sweep_start_ray_index["data"] = np.ma.array([0])
Expand Down Expand Up @@ -160,7 +160,8 @@ def match_datasets_act(column,
discard,
resample='sum',
DataSet=False,
prefix=None):
prefix=None,
verbose=False):
"""
Time synchronization of a Ground Instrumentation Dataset to
a Radar Column for Specific Locations using the ARM ACT package
Expand Down Expand Up @@ -195,6 +196,9 @@ def match_datasets_act(column,
prefix : str
prefix for the desired spelling of variable names for the input
datastream (to fix duplicate variable names between instruments)

verbose : boolean
Boolean flag to set verbose output during processing. Default is False.

Returns
-------
Expand Down Expand Up @@ -255,6 +259,15 @@ def match_datasets_act(column,
matched[var].attrs.update(source=matched.datastream)

# Merge the two DataSets
column = xr.merge([column, matched])

for k in matched.data_vars:
if k in column.data_vars:
column[k].sel(station=site)[:] = matched.sel(station=site)[k][:].astype(column[k].dtype)
if "_FillValue" in column[k].attrs:
if isinstance(column[k].attrs["_FillValue"], str):
column[k].attrs["_FillValue"] = float(column[k].attrs["_FillValue"])
column[k] = column[k].fillna(column[k].attrs["_FillValue"]).astype(float)
if "missing_value" in column[k].attrs:
if isinstance(column[k].attrs["missing_value"], str):
column[k].attrs["missing_value"] = float(column[k].attrs["missing_value"])
column[k] = column[k].fillna(column[k].attrs["missing_value"]).astype(float)
return column
Loading
Loading