Source code for satpy.readers.seadas_l2

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2021 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.

"""Reader for SEADAS L2 products.

This reader currently only supports MODIS and VIIRS Chlorophyll A from SEADAS.

The reader includes an additional keyword argument ``apply_quality_flags``
which can be used to mask out low-quality pixels based on quality flags
contained in the file (``l2_flags``). This option defaults to ``False``, but
when set to ``True`` the "CHLWARN" pixels of the ``l2_flags`` variable
are masked out. These pixels represent data where the chlorophyll algorithm
warned about the quality of the result.

"""

import datetime as dt

from satpy.readers.core.hdf4 import HDF4FileHandler
from satpy.readers.core.netcdf import NetCDF4FileHandler


[docs] class _SEADASL2Base: """Simple handler of SEADAS L2 files."""
[docs] def __init__(self, filename, filename_info, filetype_info, apply_quality_flags=False): """Initialize file handler and determine if data quality flags should be applied.""" super().__init__(filename, filename_info, filetype_info) self.apply_quality_flags = apply_quality_flags and self.l2_flags_var_name in self
[docs] def _add_satpy_metadata(self, data): data.attrs["sensor"] = self.sensor_names data.attrs["platform_name"] = self._platform_name() data.attrs["rows_per_scan"] = self._rows_per_scan() return data
[docs] def _rows_per_scan(self): if "modis" in self.sensor_names: return 10 if "viirs" in self.sensor_names: return 16 if "oci" in self.sensor_names: return 0 raise ValueError(f"Don't know how to read data for sensors: {self.sensor_names}")
[docs] def _platform_name(self): platform = self[self.platform_attr_name] platform_dict = {"NPP": "Suomi-NPP", "JPSS-1": "NOAA-20", "JPSS-2": "NOAA-21"} return platform_dict.get(platform, platform)
@property def start_time(self): """Get the starting observation time of this file's data.""" start_time = self[self.start_time_attr_name] return dt.datetime.strptime(start_time[:-3], self.time_format) @property def end_time(self): """Get the ending observation time of this file's data.""" end_time = self[self.end_time_attr_name] return dt.datetime.strptime(end_time[:-3], self.time_format) @property def sensor_names(self): """Get sensor for the current file's data.""" # Example: MODISA or VIIRSN or VIIRSJ1 sensor_name = self[self.sensor_attr_name].lower() if sensor_name.startswith("modis"): return {"modis"} if sensor_name.startswith("viirs"): return {"viirs"} # Example: OCI return {sensor_name}
[docs] def get_dataset(self, data_id, dataset_info): """Get DataArray for the specified DataID.""" file_key, data = self._get_file_key_and_variable(data_id, dataset_info) data = self._filter_by_valid_min_max(data) data = self._rename_2d_dims_if_necessary(data) data = self._mask_based_on_l2_flags(data) for attr_name in ("standard_name", "long_name", "units"): val = data.attrs[attr_name] if val[-1] == "\x00": data.attrs[attr_name] = data.attrs[attr_name][:-1] data = self._add_satpy_metadata(data) return data
[docs] def _get_file_key_and_variable(self, data_id, dataset_info): file_keys = dataset_info.get("file_key", data_id["name"]) if not isinstance(file_keys, list): file_keys = [file_keys] for file_key in file_keys: try: data = self[file_key] return file_key, data except KeyError: continue raise KeyError(f"Unable to find any of the possible keys for {data_id}: {file_keys}")
[docs] def _rename_2d_dims_if_necessary(self, data_arr): if data_arr.ndim != 2 or data_arr.dims == ("y", "x"): return data_arr return data_arr.rename(dict(zip(data_arr.dims, ("y", "x"))))
[docs] def _filter_by_valid_min_max(self, data_arr): valid_range = self._valid_min_max(data_arr) data_arr = data_arr.where(valid_range[0] <= data_arr) data_arr = data_arr.where(data_arr <= valid_range[1]) return data_arr
[docs] def _valid_min_max(self, data_arr): try: return data_arr.attrs["valid_range"] except KeyError: return data_arr.attrs["valid_min"], data_arr.attrs["valid_max"]
[docs] def _mask_based_on_l2_flags(self, data_arr): standard_name = data_arr.attrs.get("standard_name", "") if self.apply_quality_flags and not ("lon" in standard_name or "lat" in standard_name): l2_flags = self[self.l2_flags_var_name] l2_flags = self._rename_2d_dims_if_necessary(l2_flags) mask = (l2_flags & 0b00000000010000000000000000000000) != 0 data_arr = data_arr.where(~mask) return data_arr
[docs] class SEADASL2NetCDFFileHandler(_SEADASL2Base, NetCDF4FileHandler): """Simple handler of SEADAS L2 NetCDF4 files.""" start_time_attr_name = "/attr/time_coverage_start" end_time_attr_name = "/attr/time_coverage_end" time_format = "%Y-%m-%dT%H:%M:%S.%f" platform_attr_name = "/attr/platform" sensor_attr_name = "/attr/instrument" l2_flags_var_name = "geophysical_data/l2_flags"
[docs] class SEADASL2HDFFileHandler(_SEADASL2Base, HDF4FileHandler): """Simple handler of SEADAS L2 HDF4 files.""" start_time_attr_name = "/attr/Start Time" end_time_attr_name = "/attr/End Time" time_format = "%Y%j%H%M%S" platform_attr_name = "/attr/Mission" sensor_attr_name = "/attr/Sensor Name" l2_flags_var_name = "l2_flags"