Source code for satpy.readers.mws_l1b

# Copyright (c) 2022 Pytroll Developers

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <>.
"""Reader for the EPS-SG Microwave Sounder (MWS) level-1b data.


import logging
from datetime import datetime

import dask.array as da
import numpy as np
from netCDF4 import default_fillvals

from .netcdf_utils import NetCDF4FileHandler

logger = logging.getLogger(__name__)

# dict containing all available auxiliary data parameters to be read using the index map. Keys are the
# parameter name and values are the paths to the variable inside the netcdf

    "scantime_utc": "data/navigation/mws_scantime_utc",
    "solar_azimuth": "data/navigation/mws_solar_azimuth_angle",
    "solar_zenith": "data/navigation/mws_solar_zenith_angle",
    "satellite_azimuth": "data/navigation/mws_satellite_azimuth_angle",
    "satellite_zenith": "data/navigation/mws_satellite_zenith_angle",
    "surface_type": "data/navigation/mws_surface_type",
    "terrain_elevation": "data/navigation/mws_terrain_elevation",
    "mws_lat": "data/navigation/mws_lat",
    "mws_lon": "data/navigation/mws_lon",

MWS_CHANNEL_NAMES_TO_NUMBER = {"1": 1, "2": 2, "3": 3, "4": 4,
                               "5": 5, "6": 6, "7": 7, "8": 8,
                               "9": 9, "10": 10, "11": 11, "12": 12,
                               "13": 13, "14": 14, "15": 15, "16": 16,
                               "17": 17, "18": 18, "19": 19, "20": 20,
                               "21": 21, "22": 22, "23": 23, "24": 24}


[docs] def get_channel_index_from_name(chname): """Get the MWS channel index from the channel name.""" chindex = MWS_CHANNEL_NAMES_TO_NUMBER.get(chname, 0) - 1 if 0 <= chindex < 24: return chindex raise AttributeError(f"Channel name {chname!r} not supported")
[docs] def _get_aux_data_name_from_dsname(dsname): aux_data_name = [key for key in AUX_DATA.keys() if key in dsname] if len(aux_data_name) > 0: return aux_data_name[0]
[docs] class MWSL1BFile(NetCDF4FileHandler): """Class implementing the EPS-SG-A1 MWS L1b Filehandler. This class implements the European Polar System Second Generation (EPS-SG) Microwave Sounder (MWS) Level-1b NetCDF reader. It is designed to be used through the :class:`~satpy.Scene` class using the :mod:`~satpy.Scene.load` method with the reader ``"mws_l1b_nc"``. """ _platform_name_translate = { "SGA1": "Metop-SG-A1", "SGA2": "Metop-SG-A2", "SGA3": "Metop-SG-A3"} def __init__(self, filename, filename_info, filetype_info): """Initialize file handler.""" super().__init__(filename, filename_info, filetype_info, cache_var_size=10000, cache_handle=True) logger.debug("Reading: {}".format(self.filename)) logger.debug("Start: {}".format(self.start_time)) logger.debug("End: {}".format(self.end_time)) self._cache = {} self._channel_names = MWS_CHANNEL_NAMES @property def start_time(self): """Get start time.""" return datetime.strptime(self["/attr/sensing_start_time_utc"], "%Y-%m-%d %H:%M:%S.%f") @property def end_time(self): """Get end time.""" return datetime.strptime(self["/attr/sensing_end_time_utc"], "%Y-%m-%d %H:%M:%S.%f") @property def sensor(self): """Get the sensor name.""" return self["/attr/instrument"] @property def platform_name(self): """Get the platform name.""" return self._platform_name_translate.get(self["/attr/spacecraft"]) @property def sub_satellite_longitude_start(self): """Get the longitude of sub-satellite point at start of the product.""" return self["status/satellite/subsat_longitude_start"].data.item() @property def sub_satellite_latitude_start(self): """Get the latitude of sub-satellite point at start of the product.""" return self["status/satellite/subsat_latitude_start"].data.item() @property def sub_satellite_longitude_end(self): """Get the longitude of sub-satellite point at end of the product.""" return self["status/satellite/subsat_longitude_end"].data.item() @property def sub_satellite_latitude_end(self): """Get the latitude of sub-satellite point at end of the product.""" return self["status/satellite/subsat_latitude_end"].data.item()
[docs] def get_dataset(self, dataset_id, dataset_info): """Get dataset using file_key in dataset_info.""" logger.debug("Reading {} from {}".format(dataset_id["name"], self.filename)) var_key = dataset_info["file_key"] if _get_aux_data_name_from_dsname(dataset_id["name"]) is not None: variable = self._get_dataset_aux_data(dataset_id["name"]) elif any(lb in dataset_id["name"] for lb in MWS_CHANNELS): logger.debug(f"Reading in file to get dataset with key {var_key}.") variable = self._get_dataset_channel(dataset_id, dataset_info) else: logger.warning(f"Could not find key {var_key} in NetCDF file, no valid Dataset created") # noqa: E501 return None variable = self._manage_attributes(variable, dataset_info) variable = self._drop_coords(variable) variable = self._standardize_dims(variable) return variable
[docs] @staticmethod def _standardize_dims(variable): """Standardize dims to y, x.""" if "n_scans" in variable.dims: variable = variable.rename({"n_fovs": "x", "n_scans": "y"}) if variable.dims[0] == "x": variable = variable.transpose("y", "x") return variable
[docs] @staticmethod def _drop_coords(variable): """Drop coords that are not in dims.""" for coord in variable.coords: if coord not in variable.dims: variable = variable.drop_vars(coord) return variable
[docs] def _manage_attributes(self, variable, dataset_info): """Manage attributes of the dataset.""" variable.attrs.setdefault("units", None) variable.attrs.update(dataset_info) variable.attrs.update(self._get_global_attributes()) return variable
[docs] def _get_dataset_channel(self, key, dataset_info): """Load dataset corresponding to channel measurement. Load a dataset when the key refers to a measurand, whether uncalibrated (counts) or calibrated in terms of brightness temperature or radiance. """ # Get the dataset # Get metadata for given dataset grp_pth = dataset_info["file_key"] channel_index = get_channel_index_from_name(key["name"]) data = self[grp_pth][:, :, channel_index] attrs = data.attrs.copy() fv = attrs.pop( "FillValue", default_fillvals.get(data.dtype.str[1:], np.nan)) vr = attrs.get("valid_range", [-np.inf, np.inf]) if key["calibration"] == "counts": attrs["_FillValue"] = fv nfv = fv else: nfv = np.nan data = data.where(data >= vr[0], nfv) data = data.where(data <= vr[1], nfv) # Manage the attributes of the dataset data.attrs.setdefault("units", None) data.attrs.update(dataset_info) dataset_attrs = getattr(data, "attrs", {}) dataset_attrs.update(dataset_info) dataset_attrs.update({ "platform_name": self.platform_name, "sensor": self.sensor, "orbital_parameters": {"sub_satellite_latitude_start": self.sub_satellite_latitude_start, "sub_satellite_longitude_start": self.sub_satellite_longitude_start, "sub_satellite_latitude_end": self.sub_satellite_latitude_end, "sub_satellite_longitude_end": self.sub_satellite_longitude_end}, }) try: dataset_attrs.update(key.to_dict()) except AttributeError: dataset_attrs.update(key) data.attrs.update(dataset_attrs) return data
[docs] def _get_dataset_aux_data(self, dsname): """Get the auxiliary data arrays using the index map.""" # Geolocation and navigation data: if dsname in ["mws_lat", "mws_lon", "solar_azimuth", "solar_zenith", "satellite_azimuth", "satellite_zenith", "surface_type", "terrain_elevation"]: var_key = AUX_DATA.get(dsname) else: raise NotImplementedError(f"Dataset {dsname!r} not supported!") try: variable = self[var_key] except KeyError: logger.exception("Could not find key %s in NetCDF file, no valid Dataset created", var_key) raise # Scale the data: if "scale_factor" in variable.attrs and "add_offset" in variable.attrs: missing_value = variable.attrs["missing_value"] = da.where( == missing_value, np.nan, * variable.attrs["scale_factor"] + variable.attrs["add_offset"]) return variable
[docs] def _get_global_attributes(self): """Create a dictionary of global attributes.""" return { "filename": self.filename, "start_time": self.start_time, "end_time": self.end_time, "spacecraft_name": self.platform_name, "sensor": self.sensor, "filename_start_time": self.filename_info["start_time"], "filename_end_time": self.filename_info["end_time"], "platform_name": self.platform_name, "quality_group": self._get_quality_attributes(), }
[docs] def _get_quality_attributes(self): """Get quality attributes.""" quality_group = self["quality"] quality_dict = {} for key in quality_group: # Add the values (as Numpy array) of each variable in the group # where possible try: quality_dict[key] = quality_group[key].values except ValueError: quality_dict[key] = None quality_dict.update(quality_group.attrs) return quality_dict