#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2016-2021 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Interface to NUCAPS Retrieval NetCDF files.
NUCAPS stands for NOAA Unique Combined Atmospheric Processing System.
NUCAPS retrievals include temperature, moisture, trace gas, and cloud-cleared
radiance profiles. Product details can be found at:
https://www.ospo.noaa.gov/Products/atmosphere/soundings/nucaps/
This reader supports both standard NOAA NUCAPS EDRs, and Science EDRs,
which are essentially a subset of the standard EDRs with some additional
parameters such as relative humidity and boundary layer temperature.
NUCAPS data is derived from Cross-track Infrared Sounder (CrIS) data, and
from Advanced Technology Microwave Sounder (ATMS) data, instruments
onboard Joint Polar Satellite System spacecraft.
"""
import logging
from collections import defaultdict
import numpy as np
import pandas as pd
import xarray as xr
from satpy.readers.netcdf_utils import NetCDF4FileHandler
from satpy.readers.yaml_reader import FileYAMLReader
LOG = logging.getLogger(__name__)
# It's difficult to do processing without knowing the pressure levels beforehand
ALL_PRESSURE_LEVELS = [
0.0161, 0.0384, 0.0769, 0.137, 0.2244, 0.3454, 0.5064, 0.714,
0.9753, 1.2972, 1.6872, 2.1526, 2.7009, 3.3398, 4.077, 4.9204,
5.8776, 6.9567, 8.1655, 9.5119, 11.0038, 12.6492, 14.4559, 16.4318,
18.5847, 20.9224, 23.4526, 26.1829, 29.121, 32.2744, 35.6505,
39.2566, 43.1001, 47.1882, 51.5278, 56.126, 60.9895, 66.1253,
71.5398, 77.2396, 83.231, 89.5204, 96.1138, 103.017, 110.237,
117.777, 125.646, 133.846, 142.385, 151.266, 160.496, 170.078,
180.018, 190.32, 200.989, 212.028, 223.441, 235.234, 247.408,
259.969, 272.919, 286.262, 300, 314.137, 328.675, 343.618, 358.966,
374.724, 390.893, 407.474, 424.47, 441.882, 459.712, 477.961,
496.63, 515.72, 535.232, 555.167, 575.525, 596.306, 617.511, 639.14,
661.192, 683.667, 706.565, 729.886, 753.628, 777.79, 802.371,
827.371, 852.788, 878.62, 904.866, 931.524, 958.591, 986.067,
1013.95, 1042.23, 1070.92, 1100
]
[docs]
class NUCAPSFileHandler(NetCDF4FileHandler):
"""File handler for NUCAPS netCDF4 format."""
def __init__(self, *args, **kwargs):
"""Initialize file handler."""
# remove kwargs that reader instance used that file handler does not
kwargs.pop('mask_surface', None)
kwargs.pop('mask_quality', None)
kwargs.setdefault('xarray_kwargs', {}).setdefault(
'decode_times', False)
super(NUCAPSFileHandler, self).__init__(*args, **kwargs)
def __contains__(self, item):
"""Return item from file content."""
return item in self.file_content
[docs]
def _parse_datetime(self, datestr):
"""Parse NUCAPS datetime string."""
return pd.to_datetime(datestr).to_pydatetime()
@property
def start_time(self):
"""Get start time."""
try:
return self._parse_datetime(self['/attr/time_coverage_start'])
except KeyError:
# If attribute not present, use time from file name
return self.filename_info['start_time']
@property
def end_time(self):
"""Get end time."""
try:
return self._parse_datetime(self['/attr/time_coverage_end'])
except KeyError:
# If attribute not present, use time from file name
return self.filename_info['end_time']
@property
def start_orbit_number(self):
"""Return orbit number for the beginning of the swath."""
try:
return int(self['/attr/start_orbit_number'])
except KeyError:
return 0
@property
def end_orbit_number(self):
"""Return orbit number for the end of the swath."""
try:
return int(self['/attr/end_orbit_number'])
except KeyError:
return 0
@property
def platform_name(self):
"""Return standard platform name for the file's data."""
try:
res = self['/attr/platform_name']
if isinstance(res, np.ndarray):
return str(res.astype(str))
return res
except KeyError:
return self.filename_info['platform_shortname']
@property
def sensor_names(self):
"""Return standard sensor or instrument name for the file's data."""
try:
res = self['/attr/instrument_name']
res = [x.strip() for x in res.split(',')]
if len(res) == 1:
return res[0].lower()
except KeyError:
res = ['CrIS', 'ATMS', 'VIIRS']
return set(name.lower() for name in res)
[docs]
def get_shape(self, ds_id, ds_info):
"""Return data array shape for item specified."""
var_path = ds_info.get('file_key', '{}'.format(ds_id['name']))
if var_path + '/shape' not in self:
# loading a scalar value
shape = 1
else:
shape = self[var_path + "/shape"]
if "index" in ds_info:
shape = shape[1:]
if "pressure_index" in ds_info:
shape = shape[:-1]
return shape
[docs]
def get_dataset(self, dataset_id, ds_info):
"""Load data array and metadata for specified dataset."""
var_path = ds_info.get('file_key', '{}'.format(dataset_id['name']))
metadata = self.get_metadata(dataset_id, ds_info)
valid_min, valid_max = self[var_path + '/attr/valid_range']
fill_value = self.get(var_path + '/attr/_FillValue')
d_tmp = self[var_path]
if "index" in ds_info:
d_tmp = d_tmp[int(ds_info["index"])]
if "pressure_index" in ds_info:
d_tmp = d_tmp[..., int(ds_info["pressure_index"])]
# this is a pressure based field
# include surface_pressure as metadata
sp = self['Surface_Pressure']
# Older format
if 'number_of_FORs' in sp.dims:
sp = sp.rename({'number_of_FORs': 'y'})
# Newer format
if 'Number_of_CrIS_FORs' in sp.dims:
sp = sp.rename({'Number_of_CrIS_FORs': 'y'})
if 'surface_pressure' in ds_info:
ds_info['surface_pressure'] = xr.concat((ds_info['surface_pressure'], sp), dim='y')
else:
ds_info['surface_pressure'] = sp
# include all the pressure levels
ds_info.setdefault('pressure_levels', self['Pressure'][0])
data = d_tmp
if valid_min is not None and valid_max is not None:
# the original .cfg/INI based reader only checked valid_max
data = data.where((data <= valid_max)) # | (data >= valid_min))
if fill_value is not None:
data = data.where(data != fill_value)
# this _FillValue is no longer valid
metadata.pop('_FillValue', None)
data.attrs.pop('_FillValue', None)
data.attrs.update(metadata)
# Older format
if 'number_of_FORs' in data.dims:
data = data.rename({'number_of_FORs': 'y'})
# Newer format
if 'Number_of_CrIS_FORs' in data.dims:
data = data.rename({'Number_of_CrIS_FORs': 'y'})
return data
[docs]
class NUCAPSReader(FileYAMLReader):
"""Reader for NUCAPS NetCDF4 files."""
def __init__(self, config_files, mask_surface=True, mask_quality=True, **kwargs):
"""Configure reader behavior.
Args:
mask_surface (boolean): mask anything below the surface pressure
mask_quality (boolean): mask anything where the `Quality_Flag` metadata is ``!= 1``.
"""
self.pressure_dataset_names = defaultdict(list)
super(NUCAPSReader, self).__init__(config_files,
**kwargs)
self.mask_surface = self.info.get('mask_surface', mask_surface)
self.mask_quality = self.info.get('mask_quality', mask_quality)
[docs]
def load_ds_ids_from_config(self):
"""Convert config dataset entries to DataIDs.
Special handling is done to provide level specific datasets
for any pressured based datasets. For example, a dataset is
added for each pressure level of 'Temperature' with each
new dataset being named 'Temperature_Xmb' where X is the
pressure level.
"""
super(NUCAPSReader, self).load_ds_ids_from_config()
for ds_id in list(self.all_ids.keys()):
ds_info = self.all_ids[ds_id]
if ds_info.get('pressure_based', False):
for idx, lvl_num in enumerate(ALL_PRESSURE_LEVELS):
if lvl_num < 5.0:
suffix = "_{:0.03f}mb".format(lvl_num)
else:
suffix = "_{:0.0f}mb".format(lvl_num)
new_info = ds_info.copy()
new_info['pressure_level'] = lvl_num
new_info['pressure_index'] = idx
new_info['file_key'] = '{}'.format(ds_id['name'])
new_info['name'] = ds_id['name'] + suffix
new_ds_id = ds_id._replace(name=new_info['name'])
new_info['id'] = new_ds_id
self.all_ids[new_ds_id] = new_info
self.pressure_dataset_names[ds_id['name']].append(new_info['name'])
[docs]
def load(self, dataset_keys, previous_datasets=None, pressure_levels=None):
"""Load data from one or more set of files.
:param pressure_levels: mask out certain pressure levels:
True for all levels
(min, max) for a range of pressure levels
[...] list of levels to include
"""
dataset_keys = set(self.get_dataset_key(x) for x in dataset_keys)
if pressure_levels is not None:
self._filter_dataset_keys_outside_pressure_levels(dataset_keys, pressure_levels)
# Add pressure levels to the datasets to load if needed so
# we can do further filtering after loading
plevels_ds_id = self.get_dataset_key('Pressure_Levels')
remove_plevels = False
if plevels_ds_id not in dataset_keys:
dataset_keys.add(plevels_ds_id)
remove_plevels = True
datasets_loaded = super(NUCAPSReader, self).load(
dataset_keys, previous_datasets=previous_datasets)
if pressure_levels is not None:
if remove_plevels:
plevels_ds = datasets_loaded.pop(plevels_ds_id)
dataset_keys.remove(plevels_ds_id)
else:
plevels_ds = datasets_loaded[plevels_ds_id]
_remove_data_at_pressure_levels(datasets_loaded, plevels_ds, pressure_levels)
if self.mask_surface:
_mask_data_below_surface_pressure(datasets_loaded, dataset_keys)
if self.mask_quality:
_mask_data_with_quality_flag(datasets_loaded, dataset_keys)
return datasets_loaded
[docs]
def _filter_dataset_keys_outside_pressure_levels(self, dataset_keys, pressure_levels):
for ds_id in dataset_keys.copy():
ds_info = self.all_ids[ds_id]
ds_level = ds_info.get("pressure_level")
if ds_level is not None:
if pressure_levels is True:
# they want all pressure levels
continue
elif len(pressure_levels) == 2 and pressure_levels[0] <= ds_level <= pressure_levels[1]:
# given a min and a max pressure level
continue
elif np.isclose(pressure_levels, ds_level).any():
# they asked for this specific pressure level
continue
else:
# they don't want this dataset at this pressure level
LOG.debug("Removing dataset to load: %s", ds_id)
dataset_keys.remove(ds_id)
continue
[docs]
def _remove_data_at_pressure_levels(datasets_loaded, plevels_ds, pressure_levels):
cond = _get_pressure_level_condition(plevels_ds, pressure_levels)
if cond is not None:
new_plevels = plevels_ds.where(cond, drop=True)
else:
new_plevels = plevels_ds
for ds_id in datasets_loaded.keys():
ds_obj = datasets_loaded[ds_id]
if plevels_ds.dims[0] not in ds_obj.dims:
continue
if cond is not None:
datasets_loaded[ds_id] = ds_obj.where(cond, drop=True)
datasets_loaded[ds_id].attrs['pressure_levels'] = new_plevels
[docs]
def _get_pressure_level_condition(plevels_ds, pressure_levels):
if pressure_levels is True:
return None
if len(pressure_levels) == 2:
cond = (plevels_ds >= pressure_levels[0]) & (plevels_ds <= pressure_levels[1])
else:
cond = plevels_ds == pressure_levels
# convert dask-based DataArray to a computed numpy-based DataArray to
# avoid unknown shapes of dask arrays when this condition is used for masking
return cond.compute()
[docs]
def _mask_data_below_surface_pressure(datasets_loaded, dataset_keys):
LOG.debug("Filtering pressure levels at or below the surface pressure")
for ds_id in sorted(dataset_keys):
ds = datasets_loaded[ds_id]
if "surface_pressure" not in ds.attrs or "pressure_levels" not in ds.attrs:
continue
data_pressure = ds.attrs["pressure_levels"]
surface_pressure = ds.attrs["surface_pressure"]
if isinstance(surface_pressure, float):
# scalar needs to become array for each record
surface_pressure = np.repeat(surface_pressure, ds.shape[0])
if surface_pressure.ndim == 1 and surface_pressure.shape[0] == ds.shape[0]:
# surface is one element per record
LOG.debug("Filtering %s at and below the surface pressure", ds_id)
if ds.ndim == 2:
surface_pressure = np.repeat(surface_pressure[:, None], data_pressure.shape[0], axis=1)
data_pressure = np.repeat(data_pressure[None, :], surface_pressure.shape[0], axis=0)
datasets_loaded[ds_id] = ds.where(data_pressure < surface_pressure)
else:
# entire dataset represents one pressure level
data_pressure = ds.attrs["pressure_level"]
datasets_loaded[ds_id] = ds.where(data_pressure < surface_pressure)
else:
LOG.warning("Not sure how to handle shape of 'surface_pressure' metadata")
[docs]
def _mask_data_with_quality_flag(datasets_loaded, dataset_keys):
LOG.debug("Filtering data based on quality flags")
for ds_id in sorted(dataset_keys):
ds = datasets_loaded[ds_id]
quality_flag = [
x for x in ds.attrs.get('ancillary_variables', [])
if x.attrs.get('name') == 'Quality_Flag']
if not quality_flag:
continue
quality_flag = quality_flag[0]
if quality_flag.dims[0] not in ds.dims:
continue
LOG.debug("Masking %s where quality flag doesn't equal 1", ds_id)
datasets_loaded[ds_id] = ds.where(quality_flag == 0)