Source code for satpy.readers.core.viirs_atms_sdr

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright (c) 2022, 2023 Satpy Developers

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""Common utilities for reading VIIRS and ATMS SDR data."""

import datetime as dt
import logging

import dask.array as da
import numpy as np
import xarray as xr

from satpy.readers.core.hdf5 import HDF5FileHandler

NO_DATE = dt.datetime(1958, 1, 1)
EPSILON_TIME = dt.timedelta(days=2)
LOG = logging.getLogger(__name__)


VIIRS_DATASET_KEYS = {"GDNBO": "VIIRS-DNB-GEO",
                      "SVDNB": "VIIRS-DNB-SDR",
                      "GITCO": "VIIRS-IMG-GEO-TC",
                      "GIMGO": "VIIRS-IMG-GEO",
                      "SVI01": "VIIRS-I1-SDR",
                      "SVI02": "VIIRS-I2-SDR",
                      "SVI03": "VIIRS-I3-SDR",
                      "SVI04": "VIIRS-I4-SDR",
                      "SVI05": "VIIRS-I5-SDR",
                      "GMTCO": "VIIRS-MOD-GEO-TC",
                      "GMODO": "VIIRS-MOD-GEO",
                      "SVM01": "VIIRS-M1-SDR",
                      "SVM02": "VIIRS-M2-SDR",
                      "SVM03": "VIIRS-M3-SDR",
                      "SVM04": "VIIRS-M4-SDR",
                      "SVM05": "VIIRS-M5-SDR",
                      "SVM06": "VIIRS-M6-SDR",
                      "SVM07": "VIIRS-M7-SDR",
                      "SVM08": "VIIRS-M8-SDR",
                      "SVM09": "VIIRS-M9-SDR",
                      "SVM10": "VIIRS-M10-SDR",
                      "SVM11": "VIIRS-M11-SDR",
                      "SVM12": "VIIRS-M12-SDR",
                      "SVM13": "VIIRS-M13-SDR",
                      "SVM14": "VIIRS-M14-SDR",
                      "SVM15": "VIIRS-M15-SDR",
                      "SVM16": "VIIRS-M16-SDR",
                      "IVCDB": "VIIRS-DualGain-Cal-IP"}
ATMS_DATASET_KEYS = {"SATMS": "ATMS-SDR",
                     "GATMO": "ATMS-SDR-GEO",
                     "TATMS": "ATMS-TDR"}

DATASET_KEYS = {}
DATASET_KEYS.update(VIIRS_DATASET_KEYS)
DATASET_KEYS.update(ATMS_DATASET_KEYS)


[docs] def _get_scale_factors_for_units(factors, file_units, output_units): if file_units == "W cm-2 sr-1" and output_units == "W m-2 sr-1": LOG.debug("Adjusting scaling factors to convert '%s' to '%s'", file_units, output_units) factors = factors * 10000. elif file_units == "1" and output_units == "%": LOG.debug("Adjusting scaling factors to convert '%s' to '%s'", file_units, output_units) factors = factors * 100. else: raise ValueError("Don't know how to convert '{}' to '{}'".format( file_units, output_units)) return factors
[docs] def _get_file_units(dataset_id, ds_info): """Get file units from metadata.""" file_units = ds_info.get("file_units") if file_units is None: LOG.debug("Unknown units for file key '%s'", dataset_id) return file_units
[docs] class JPSS_SDR_FileHandler(HDF5FileHandler): """Base class for reading JPSS VIIRS & ATMS SDR HDF5 Files."""
[docs] def __init__(self, filename, filename_info, filetype_info, **kwargs): """Initialize file handler.""" super().__init__(filename, filename_info, filetype_info, **kwargs)
[docs] def _parse_datetime(self, datestr, timestr): if not isinstance(datestr, str): datestr = str(datestr.data.compute().astype(str)) if not isinstance(timestr, str): timestr = str(timestr.data.compute().astype(str)) datetime_str = datestr + timestr time_val = dt.datetime.strptime(datetime_str, "%Y%m%d%H%M%S.%fZ") if abs(time_val - NO_DATE) < EPSILON_TIME: # catch rare case when SDR files have incorrect date raise ValueError("Datetime invalid {}".format(time_val)) return time_val
@property def start_time(self): """Get start time.""" date_var_path = self._get_aggr_path("start_date", "AggregateBeginningDate") time_var_path = self._get_aggr_path("start_time", "AggregateBeginningTime") return self._parse_datetime(self[date_var_path], self[time_var_path]) @property def end_time(self): """Get end time.""" date_var_path = self._get_aggr_path("end_date", "AggregateEndingDate") time_var_path = self._get_aggr_path("end_time", "AggregateEndingTime") return self._parse_datetime(self[date_var_path], self[time_var_path]) @property def start_orbit_number(self): """Get start orbit number.""" start_orbit_path = self._get_aggr_path("start_orbit", "AggregateBeginningOrbitNumber") return int(self[start_orbit_path]) @property def end_orbit_number(self): """Get end orbit number.""" end_orbit_path = self._get_aggr_path("end_orbit", "AggregateEndingOrbitNumber") return int(self[end_orbit_path])
[docs] def _get_aggr_path(self, fileinfo_key, aggr_default): dataset_group = DATASET_KEYS[self.datasets[0]] default = "Data_Products/{dataset_group}/{dataset_group}_Aggr/attr/" + aggr_default return self.filetype_info.get(fileinfo_key, default).format(dataset_group=dataset_group)
@property def platform_name(self): """Get platform name.""" default = "/attr/Platform_Short_Name" platform_path = self.filetype_info.get( "platform_name", default).format(**self.filetype_info) platform_dict = {"NPP": "Suomi-NPP", "JPSS-1": "NOAA-20", "J01": "NOAA-20", "JPSS-2": "NOAA-21", "J02": "NOAA-21"} return platform_dict.get(self[platform_path], self[platform_path]) @property def sensor_name(self): """Get sensor name.""" dataset_group = DATASET_KEYS[self.datasets[0]] default = "Data_Products/{dataset_group}/attr/Instrument_Short_Name" sensor_path = self.filetype_info.get( "sensor_name", default).format(dataset_group=dataset_group) return self[sensor_path].lower()
[docs] def scale_swath_data(self, data, scaling_factors, dataset_group): """Scale swath data using scaling factors and offsets. Multi-granule (a.k.a. aggregated) files will have more than the usual two values. """ rows_per_gran = self._get_rows_per_granule(dataset_group) factors = self._mask_and_reshape_factors(scaling_factors) data = self._map_and_apply_factors(data, factors, rows_per_gran) return data
[docs] def scale_data_to_specified_unit(self, data, dataset_id, ds_info): """Get sscale and offset factors and convert/scale data to given physical unit.""" var_path = self._generate_file_key(dataset_id, ds_info) dataset_group = ds_info["dataset_group"] file_units = _get_file_units(dataset_id, ds_info) output_units = ds_info.get("units", file_units) factor_var_path = ds_info.get("factors_key", var_path + "Factors") factors = self.get(factor_var_path) factors = self._adjust_scaling_factors(factors, file_units, output_units) if factors is not None: return self.scale_swath_data(data, factors, dataset_group) LOG.debug("No scaling factors found for %s", dataset_id) return data
[docs] @staticmethod def _mask_and_reshape_factors(factors): factors = factors.where(factors > -999, np.float32(np.nan)) return factors.data.reshape((-1, 2)).rechunk((1, 2)) # make it so map_blocks happens per factor
[docs] @staticmethod def _map_and_apply_factors(data, factors, rows_per_gran): # The user may have requested a different chunking scheme, but we need # per granule chunking right now so factor chunks map 1:1 to data chunks old_chunks = data.chunks dask_data = data.data.rechunk((tuple(rows_per_gran), data.data.chunks[1])) dask_data = da.map_blocks(_apply_factors, dask_data, factors, chunks=dask_data.chunks, dtype=data.dtype, meta=np.array([[]], dtype=data.dtype)) data = xr.DataArray(dask_data.rechunk(old_chunks), dims=data.dims, coords=data.coords, attrs=data.attrs) return data
[docs] @staticmethod def _scale_factors_for_units(factors, file_units, output_units): return _get_scale_factors_for_units(factors, file_units, output_units)
[docs] @staticmethod def _get_valid_scaling_factors(factors): if factors is None: factors = np.array([1, 0], dtype=np.float32) factors = xr.DataArray(da.from_array(factors, chunks=1)) else: factors = factors.where(factors != -999., np.float32(np.nan)) return factors
[docs] def _adjust_scaling_factors(self, factors, file_units, output_units): """Adjust scaling factors .""" if file_units == output_units: LOG.debug("File units and output units are the same (%s)", file_units) return factors factors = self._get_valid_scaling_factors(factors) return self._scale_factors_for_units(factors, file_units, output_units)
[docs] @staticmethod def expand_single_values(var, scans): """Expand single valued variable to full scan lengths.""" if scans.size == 1: return var else: expanded = np.repeat(var, scans) expanded.attrs = var.attrs expanded.rename({expanded.dims[0]: "y"}) return expanded
[docs] def _scan_size(self, dataset_group_name): """Get how many rows of data constitute one scanline.""" if "ATM" in dataset_group_name: scan_size = 1 elif "I" in dataset_group_name: scan_size = 32 else: scan_size = 16 return scan_size
[docs] def _generate_file_key(self, ds_id, ds_info, factors=False): var_path = ds_info.get("file_key", "All_Data/{dataset_group}_All/{calibration}") calibration = { "radiance": "Radiance", "reflectance": "Reflectance", "brightness_temperature": "BrightnessTemperature", }.get(ds_id.get("calibration")) var_path = var_path.format(calibration=calibration, dataset_group=DATASET_KEYS[ds_info["dataset_group"]]) if ds_id["name"] in ["dnb_longitude", "dnb_latitude"]: if self.use_tc is True: return var_path + "_TC" if self.use_tc is None and var_path + "_TC" in self.file_content: return var_path + "_TC" return var_path
[docs] def _update_data_attributes(self, data, dataset_id, ds_info): file_units = _get_file_units(dataset_id, ds_info) output_units = ds_info.get("units", file_units) i = getattr(data, "attrs", {}) i.update(ds_info) i.update({ "platform_name": self.platform_name, "sensor": self.sensor_name, "start_orbit": self.start_orbit_number, "end_orbit": self.end_orbit_number, "units": output_units, "rows_per_scan": self._scan_size(ds_info["dataset_group"]), }) i.update(dataset_id.to_dict()) data.attrs.update(i) return data
[docs] def _get_variable(self, var_path, **kwargs): return self[var_path]
[docs] def concatenate_dataset(self, dataset_group, var_path, **kwargs): """Concatenate dataset.""" scan_size = self._scan_size(dataset_group) scans = self._get_scans_per_granule(dataset_group) start_scan = 0 data_chunks = [] scans = xr.DataArray(scans) variable = self._get_variable(var_path, **kwargs) # check if these are single per-granule value if variable.size != scans.size: for gscans in scans.values: data_chunks.append(variable.isel(y=slice(start_scan, start_scan + gscans * scan_size))) start_scan += gscans * scan_size return xr.concat(data_chunks, "y") else: # This is not tested - Not sure this code is ever going to be used? A. Dybbroe # Mon Jan 2 13:31:21 2023 return self.expand_single_values(variable, scans)
[docs] def _get_rows_per_granule(self, dataset_group): scan_size = self._scan_size(dataset_group) scans_per_gran = self._get_scans_per_granule(dataset_group) return [scan_size * gran_scans for gran_scans in scans_per_gran]
[docs] def _get_scans_per_granule(self, dataset_group): number_of_granules_path = "Data_Products/{dataset_group}/{dataset_group}_Aggr/attr/AggregateNumberGranules" nb_granules_path = number_of_granules_path.format(dataset_group=DATASET_KEYS[dataset_group]) scans = [] for granule in range(self[nb_granules_path]): scans_path = "Data_Products/{dataset_group}/{dataset_group}_Gran_{granule}/attr/N_Number_Of_Scans" scans_path = scans_path.format(dataset_group=DATASET_KEYS[dataset_group], granule=granule) scans.append(self[scans_path]) return scans
[docs] def mask_fill_values(self, data, ds_info): """Mask fill values.""" is_floating = np.issubdtype(data.dtype, np.floating) if is_floating: # If the data is a float then we mask everything <= -999.0 fill_max = np.float32(ds_info.pop("fill_max_float", -999.0)) return data.where(data > fill_max, np.float32(np.nan)) else: # If the data is an integer then we mask everything >= fill_min_int fill_min = int(ds_info.pop("fill_min_int", 65528)) return data.where(data < fill_min, np.float32(np.nan))
[docs] def available_datasets(self, configured_datasets=None): """Generate dataset info and their availablity. See :meth:`satpy.readers.core.file_handlers.BaseFileHandler.available_datasets` for details. """ for is_avail, ds_info in (configured_datasets or []): if is_avail is not None: yield is_avail, ds_info continue dataset_group = [ds_group for ds_group in ds_info["dataset_groups"] if ds_group in self.datasets] if dataset_group: yield True, ds_info elif is_avail is None: yield is_avail, ds_info
[docs] def _apply_factors(data, factor_set): return data * factor_set[0, 0] + factor_set[0, 1]