Source code for satpy.readers.mws_l1b

# Copyright (c) 2022 Pytroll Developers

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <>.
"""Reader for the EPS-SG Microwave Sounder (MWS) level-1b data.


import logging
from datetime import datetime

import dask.array as da
import numpy as np
from netCDF4 import default_fillvals

from .netcdf_utils import NetCDF4FileHandler

logger = logging.getLogger(__name__)

# dict containing all available auxiliary data parameters to be read using the index map. Keys are the
# parameter name and values are the paths to the variable inside the netcdf

    'scantime_utc': 'data/navigation/mws_scantime_utc',
    'solar_azimuth': 'data/navigation/mws_solar_azimuth_angle',
    'solar_zenith': 'data/navigation/mws_solar_zenith_angle',
    'satellite_azimuth': 'data/navigation/mws_satellite_azimuth_angle',
    'satellite_zenith': 'data/navigation/mws_satellite_zenith_angle',
    'surface_type': 'data/navigation/mws_surface_type',
    'terrain_elevation': 'data/navigation/mws_terrain_elevation',
    'mws_lat': 'data/navigation/mws_lat',
    'mws_lon': 'data/navigation/mws_lon',

MWS_CHANNEL_NAMES_TO_NUMBER = {'1': 1, '2': 2, '3': 3, '4': 4,
                               '5': 5, '6': 6, '7': 7, '8': 8,
                               '9': 9, '10': 10, '11': 11, '12': 12,
                               '13': 13, '14': 14, '15': 15, '16': 16,
                               '17': 17, '18': 18, '19': 19, '20': 20,
                               '21': 21, '22': 22, '23': 23, '24': 24}


[docs] def get_channel_index_from_name(chname): """Get the MWS channel index from the channel name.""" chindex = MWS_CHANNEL_NAMES_TO_NUMBER.get(chname, 0) - 1 if 0 <= chindex < 24: return chindex raise AttributeError(f"Channel name {chname!r} not supported")
[docs] def _get_aux_data_name_from_dsname(dsname): aux_data_name = [key for key in AUX_DATA.keys() if key in dsname] if len(aux_data_name) > 0: return aux_data_name[0]
[docs] class MWSL1BFile(NetCDF4FileHandler): """Class implementing the EPS-SG-A1 MWS L1b Filehandler. This class implements the European Polar System Second Generation (EPS-SG) Microwave Sounder (MWS) Level-1b NetCDF reader. It is designed to be used through the :class:`~satpy.Scene` class using the :mod:`~satpy.Scene.load` method with the reader ``"mws_l1b_nc"``. """ _platform_name_translate = { "SGA1": "Metop-SG-A1", "SGA2": "Metop-SG-A2", "SGA3": "Metop-SG-A3"} def __init__(self, filename, filename_info, filetype_info): """Initialize file handler.""" super().__init__(filename, filename_info, filetype_info, cache_var_size=10000, cache_handle=True) logger.debug('Reading: {}'.format(self.filename)) logger.debug('Start: {}'.format(self.start_time)) logger.debug('End: {}'.format(self.end_time)) self._cache = {} self._channel_names = MWS_CHANNEL_NAMES @property def start_time(self): """Get start time.""" return datetime.strptime(self['/attr/sensing_start_time_utc'], '%Y-%m-%d %H:%M:%S.%f') @property def end_time(self): """Get end time.""" return datetime.strptime(self['/attr/sensing_end_time_utc'], '%Y-%m-%d %H:%M:%S.%f') @property def sensor(self): """Get the sensor name.""" return self['/attr/instrument'] @property def platform_name(self): """Get the platform name.""" return self._platform_name_translate.get(self['/attr/spacecraft']) @property def sub_satellite_longitude_start(self): """Get the longitude of sub-satellite point at start of the product.""" return self['status/satellite/subsat_longitude_start'].data.item() @property def sub_satellite_latitude_start(self): """Get the latitude of sub-satellite point at start of the product.""" return self['status/satellite/subsat_latitude_start'].data.item() @property def sub_satellite_longitude_end(self): """Get the longitude of sub-satellite point at end of the product.""" return self['status/satellite/subsat_longitude_end'].data.item() @property def sub_satellite_latitude_end(self): """Get the latitude of sub-satellite point at end of the product.""" return self['status/satellite/subsat_latitude_end'].data.item()
[docs] def get_dataset(self, dataset_id, dataset_info): """Get dataset using file_key in dataset_info.""" logger.debug('Reading {} from {}'.format(dataset_id['name'], self.filename)) var_key = dataset_info['file_key'] if _get_aux_data_name_from_dsname(dataset_id['name']) is not None: variable = self._get_dataset_aux_data(dataset_id['name']) elif any(lb in dataset_id['name'] for lb in MWS_CHANNELS): logger.debug(f'Reading in file to get dataset with key {var_key}.') variable = self._get_dataset_channel(dataset_id, dataset_info) else: logger.warning(f'Could not find key {var_key} in NetCDF file, no valid Dataset created') # noqa: E501 return None variable = self._manage_attributes(variable, dataset_info) variable = self._drop_coords(variable) variable = self._standardize_dims(variable) return variable
[docs] @staticmethod def _standardize_dims(variable): """Standardize dims to y, x.""" if 'n_scans' in variable.dims: variable = variable.rename({'n_fovs': 'x', 'n_scans': 'y'}) if variable.dims[0] == 'x': variable = variable.transpose('y', 'x') return variable
[docs] @staticmethod def _drop_coords(variable): """Drop coords that are not in dims.""" for coord in variable.coords: if coord not in variable.dims: variable = variable.drop_vars(coord) return variable
[docs] def _manage_attributes(self, variable, dataset_info): """Manage attributes of the dataset.""" variable.attrs.setdefault('units', None) variable.attrs.update(dataset_info) variable.attrs.update(self._get_global_attributes()) return variable
[docs] def _get_dataset_channel(self, key, dataset_info): """Load dataset corresponding to channel measurement. Load a dataset when the key refers to a measurand, whether uncalibrated (counts) or calibrated in terms of brightness temperature or radiance. """ # Get the dataset # Get metadata for given dataset grp_pth = dataset_info['file_key'] channel_index = get_channel_index_from_name(key['name']) data = self[grp_pth][:, :, channel_index] attrs = data.attrs.copy() fv = attrs.pop( "FillValue", default_fillvals.get(data.dtype.str[1:], np.nan)) vr = attrs.get("valid_range", [-np.inf, np.inf]) if key['calibration'] == "counts": attrs["_FillValue"] = fv nfv = fv else: nfv = np.nan data = data.where(data >= vr[0], nfv) data = data.where(data <= vr[1], nfv) # Manage the attributes of the dataset data.attrs.setdefault('units', None) data.attrs.update(dataset_info) dataset_attrs = getattr(data, 'attrs', {}) dataset_attrs.update(dataset_info) dataset_attrs.update({ "platform_name": self.platform_name, "sensor": self.sensor, "orbital_parameters": {'sub_satellite_latitude_start': self.sub_satellite_latitude_start, 'sub_satellite_longitude_start': self.sub_satellite_longitude_start, 'sub_satellite_latitude_end': self.sub_satellite_latitude_end, 'sub_satellite_longitude_end': self.sub_satellite_longitude_end}, }) try: dataset_attrs.update(key.to_dict()) except AttributeError: dataset_attrs.update(key) data.attrs.update(dataset_attrs) return data
[docs] def _get_dataset_aux_data(self, dsname): """Get the auxiliary data arrays using the index map.""" # Geolocation and navigation data: if dsname in ['mws_lat', 'mws_lon', 'solar_azimuth', 'solar_zenith', 'satellite_azimuth', 'satellite_zenith', 'surface_type', 'terrain_elevation']: var_key = AUX_DATA.get(dsname) else: raise NotImplementedError(f"Dataset {dsname!r} not supported!") try: variable = self[var_key] except KeyError: logger.exception("Could not find key %s in NetCDF file, no valid Dataset created", var_key) raise # Scale the data: if 'scale_factor' in variable.attrs and 'add_offset' in variable.attrs: missing_value = variable.attrs['missing_value'] = da.where( == missing_value, np.nan, * variable.attrs['scale_factor'] + variable.attrs['add_offset']) return variable
[docs] def _get_global_attributes(self): """Create a dictionary of global attributes.""" return { 'filename': self.filename, 'start_time': self.start_time, 'end_time': self.end_time, 'spacecraft_name': self.platform_name, 'sensor': self.sensor, 'filename_start_time': self.filename_info['start_time'], 'filename_end_time': self.filename_info['end_time'], 'platform_name': self.platform_name, 'quality_group': self._get_quality_attributes(), }
[docs] def _get_quality_attributes(self): """Get quality attributes.""" quality_group = self['quality'] quality_dict = {} for key in quality_group: # Add the values (as Numpy array) of each variable in the group # where possible try: quality_dict[key] = quality_group[key].values except ValueError: quality_dict[key] = None quality_dict.update(quality_group.attrs) return quality_dict