Source code for satpy.readers.viirs_atms_sdr_base

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright (c) 2022, 2023 Satpy Developers

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""Common utilities for reading VIIRS and ATMS SDR data."""

import logging
from datetime import datetime, timedelta

import dask.array as da
import numpy as np
import xarray as xr

from satpy.readers.hdf5_utils import HDF5FileHandler

NO_DATE = datetime(1958, 1, 1)
EPSILON_TIME = timedelta(days=2)
LOG = logging.getLogger(__name__)


VIIRS_DATASET_KEYS = {"GDNBO": "VIIRS-DNB-GEO",
                      "SVDNB": "VIIRS-DNB-SDR",
                      "GITCO": "VIIRS-IMG-GEO-TC",
                      "GIMGO": "VIIRS-IMG-GEO",
                      "SVI01": "VIIRS-I1-SDR",
                      "SVI02": "VIIRS-I2-SDR",
                      "SVI03": "VIIRS-I3-SDR",
                      "SVI04": "VIIRS-I4-SDR",
                      "SVI05": "VIIRS-I5-SDR",
                      "GMTCO": "VIIRS-MOD-GEO-TC",
                      "GMODO": "VIIRS-MOD-GEO",
                      "SVM01": "VIIRS-M1-SDR",
                      "SVM02": "VIIRS-M2-SDR",
                      "SVM03": "VIIRS-M3-SDR",
                      "SVM04": "VIIRS-M4-SDR",
                      "SVM05": "VIIRS-M5-SDR",
                      "SVM06": "VIIRS-M6-SDR",
                      "SVM07": "VIIRS-M7-SDR",
                      "SVM08": "VIIRS-M8-SDR",
                      "SVM09": "VIIRS-M9-SDR",
                      "SVM10": "VIIRS-M10-SDR",
                      "SVM11": "VIIRS-M11-SDR",
                      "SVM12": "VIIRS-M12-SDR",
                      "SVM13": "VIIRS-M13-SDR",
                      "SVM14": "VIIRS-M14-SDR",
                      "SVM15": "VIIRS-M15-SDR",
                      "SVM16": "VIIRS-M16-SDR",
                      "IVCDB": "VIIRS-DualGain-Cal-IP"}
ATMS_DATASET_KEYS = {"SATMS": "ATMS-SDR",
                     "GATMO": "ATMS-SDR-GEO",
                     "TATMS": "ATMS-TDR"}

DATASET_KEYS = {}
DATASET_KEYS.update(VIIRS_DATASET_KEYS)
DATASET_KEYS.update(ATMS_DATASET_KEYS)


[docs] def _get_scale_factors_for_units(factors, file_units, output_units): if file_units == "W cm-2 sr-1" and output_units == "W m-2 sr-1": LOG.debug("Adjusting scaling factors to convert '%s' to '%s'", file_units, output_units) factors = factors * 10000. elif file_units == "1" and output_units == "%": LOG.debug("Adjusting scaling factors to convert '%s' to '%s'", file_units, output_units) factors = factors * 100. else: raise ValueError("Don't know how to convert '{}' to '{}'".format( file_units, output_units)) return factors
[docs] def _get_file_units(dataset_id, ds_info): """Get file units from metadata.""" file_units = ds_info.get("file_units") if file_units is None: LOG.debug("Unknown units for file key '%s'", dataset_id) return file_units
[docs] class JPSS_SDR_FileHandler(HDF5FileHandler): """Base class for reading JPSS VIIRS & ATMS SDR HDF5 Files.""" def __init__(self, filename, filename_info, filetype_info, **kwargs): """Initialize file handler.""" super().__init__(filename, filename_info, filetype_info, **kwargs)
[docs] def _parse_datetime(self, datestr, timestr): if not isinstance(datestr, str): datestr = str(datestr.data.compute().astype(str)) if not isinstance(timestr, str): timestr = str(timestr.data.compute().astype(str)) datetime_str = datestr + timestr time_val = datetime.strptime(datetime_str, "%Y%m%d%H%M%S.%fZ") if abs(time_val - NO_DATE) < EPSILON_TIME: # catch rare case when SDR files have incorrect date raise ValueError("Datetime invalid {}".format(time_val)) return time_val
@property def start_time(self): """Get start time.""" date_var_path = self._get_aggr_path("start_date", "AggregateBeginningDate") time_var_path = self._get_aggr_path("start_time", "AggregateBeginningTime") return self._parse_datetime(self[date_var_path], self[time_var_path]) @property def end_time(self): """Get end time.""" date_var_path = self._get_aggr_path("end_date", "AggregateEndingDate") time_var_path = self._get_aggr_path("end_time", "AggregateEndingTime") return self._parse_datetime(self[date_var_path], self[time_var_path]) @property def start_orbit_number(self): """Get start orbit number.""" start_orbit_path = self._get_aggr_path("start_orbit", "AggregateBeginningOrbitNumber") return int(self[start_orbit_path]) @property def end_orbit_number(self): """Get end orbit number.""" end_orbit_path = self._get_aggr_path("end_orbit", "AggregateEndingOrbitNumber") return int(self[end_orbit_path])
[docs] def _get_aggr_path(self, fileinfo_key, aggr_default): dataset_group = DATASET_KEYS[self.datasets[0]] default = "Data_Products/{dataset_group}/{dataset_group}_Aggr/attr/" + aggr_default return self.filetype_info.get(fileinfo_key, default).format(dataset_group=dataset_group)
@property def platform_name(self): """Get platform name.""" default = "/attr/Platform_Short_Name" platform_path = self.filetype_info.get( "platform_name", default).format(**self.filetype_info) platform_dict = {"NPP": "Suomi-NPP", "JPSS-1": "NOAA-20", "J01": "NOAA-20", "JPSS-2": "NOAA-21", "J02": "NOAA-21"} return platform_dict.get(self[platform_path], self[platform_path]) @property def sensor_name(self): """Get sensor name.""" dataset_group = DATASET_KEYS[self.datasets[0]] default = "Data_Products/{dataset_group}/attr/Instrument_Short_Name" sensor_path = self.filetype_info.get( "sensor_name", default).format(dataset_group=dataset_group) return self[sensor_path].lower()
[docs] def scale_swath_data(self, data, scaling_factors, dataset_group): """Scale swath data using scaling factors and offsets. Multi-granule (a.k.a. aggregated) files will have more than the usual two values. """ rows_per_gran = self._get_rows_per_granule(dataset_group) factors = self._mask_and_reshape_factors(scaling_factors) data = self._map_and_apply_factors(data, factors, rows_per_gran) return data
[docs] def scale_data_to_specified_unit(self, data, dataset_id, ds_info): """Get sscale and offset factors and convert/scale data to given physical unit.""" var_path = self._generate_file_key(dataset_id, ds_info) dataset_group = ds_info["dataset_group"] file_units = _get_file_units(dataset_id, ds_info) output_units = ds_info.get("units", file_units) factor_var_path = ds_info.get("factors_key", var_path + "Factors") factors = self.get(factor_var_path) factors = self._adjust_scaling_factors(factors, file_units, output_units) if factors is not None: return self.scale_swath_data(data, factors, dataset_group) LOG.debug("No scaling factors found for %s", dataset_id) return data
[docs] @staticmethod def _mask_and_reshape_factors(factors): factors = factors.where(factors > -999, np.float32(np.nan)) return factors.data.reshape((-1, 2)).rechunk((1, 2)) # make it so map_blocks happens per factor
[docs] @staticmethod def _map_and_apply_factors(data, factors, rows_per_gran): # The user may have requested a different chunking scheme, but we need # per granule chunking right now so factor chunks map 1:1 to data chunks old_chunks = data.chunks dask_data = data.data.rechunk((tuple(rows_per_gran), data.data.chunks[1])) dask_data = da.map_blocks(_apply_factors, dask_data, factors, chunks=dask_data.chunks, dtype=data.dtype, meta=np.array([[]], dtype=data.dtype)) data = xr.DataArray(dask_data.rechunk(old_chunks), dims=data.dims, coords=data.coords, attrs=data.attrs) return data
[docs] @staticmethod def _scale_factors_for_units(factors, file_units, output_units): return _get_scale_factors_for_units(factors, file_units, output_units)
[docs] @staticmethod def _get_valid_scaling_factors(factors): if factors is None: factors = np.array([1, 0], dtype=np.float32) factors = xr.DataArray(da.from_array(factors, chunks=1)) else: factors = factors.where(factors != -999., np.float32(np.nan)) return factors
[docs] def _adjust_scaling_factors(self, factors, file_units, output_units): """Adjust scaling factors .""" if file_units == output_units: LOG.debug("File units and output units are the same (%s)", file_units) return factors factors = self._get_valid_scaling_factors(factors) return self._scale_factors_for_units(factors, file_units, output_units)
[docs] @staticmethod def expand_single_values(var, scans): """Expand single valued variable to full scan lengths.""" if scans.size == 1: return var else: expanded = np.repeat(var, scans) expanded.attrs = var.attrs expanded.rename({expanded.dims[0]: "y"}) return expanded
[docs] def _scan_size(self, dataset_group_name): """Get how many rows of data constitute one scanline.""" if "ATM" in dataset_group_name: scan_size = 1 elif "I" in dataset_group_name: scan_size = 32 else: scan_size = 16 return scan_size
[docs] def _generate_file_key(self, ds_id, ds_info, factors=False): var_path = ds_info.get("file_key", "All_Data/{dataset_group}_All/{calibration}") calibration = { "radiance": "Radiance", "reflectance": "Reflectance", "brightness_temperature": "BrightnessTemperature", }.get(ds_id.get("calibration")) var_path = var_path.format(calibration=calibration, dataset_group=DATASET_KEYS[ds_info["dataset_group"]]) if ds_id["name"] in ["dnb_longitude", "dnb_latitude"]: if self.use_tc is True: return var_path + "_TC" if self.use_tc is None and var_path + "_TC" in self.file_content: return var_path + "_TC" return var_path
[docs] def _update_data_attributes(self, data, dataset_id, ds_info): file_units = _get_file_units(dataset_id, ds_info) output_units = ds_info.get("units", file_units) i = getattr(data, "attrs", {}) i.update(ds_info) i.update({ "platform_name": self.platform_name, "sensor": self.sensor_name, "start_orbit": self.start_orbit_number, "end_orbit": self.end_orbit_number, "units": output_units, "rows_per_scan": self._scan_size(ds_info["dataset_group"]), }) i.update(dataset_id.to_dict()) data.attrs.update(i) return data
[docs] def _get_variable(self, var_path, **kwargs): return self[var_path]
[docs] def concatenate_dataset(self, dataset_group, var_path, **kwargs): """Concatenate dataset.""" scan_size = self._scan_size(dataset_group) scans = self._get_scans_per_granule(dataset_group) start_scan = 0 data_chunks = [] scans = xr.DataArray(scans) variable = self._get_variable(var_path, **kwargs) # check if these are single per-granule value if variable.size != scans.size: for gscans in scans.values: data_chunks.append(variable.isel(y=slice(start_scan, start_scan + gscans * scan_size))) start_scan += gscans * scan_size return xr.concat(data_chunks, "y") else: # This is not tested - Not sure this code is ever going to be used? A. Dybbroe # Mon Jan 2 13:31:21 2023 return self.expand_single_values(variable, scans)
[docs] def _get_rows_per_granule(self, dataset_group): scan_size = self._scan_size(dataset_group) scans_per_gran = self._get_scans_per_granule(dataset_group) return [scan_size * gran_scans for gran_scans in scans_per_gran]
[docs] def _get_scans_per_granule(self, dataset_group): number_of_granules_path = "Data_Products/{dataset_group}/{dataset_group}_Aggr/attr/AggregateNumberGranules" nb_granules_path = number_of_granules_path.format(dataset_group=DATASET_KEYS[dataset_group]) scans = [] for granule in range(self[nb_granules_path]): scans_path = "Data_Products/{dataset_group}/{dataset_group}_Gran_{granule}/attr/N_Number_Of_Scans" scans_path = scans_path.format(dataset_group=DATASET_KEYS[dataset_group], granule=granule) scans.append(self[scans_path]) return scans
[docs] def mask_fill_values(self, data, ds_info): """Mask fill values.""" is_floating = np.issubdtype(data.dtype, np.floating) if is_floating: # If the data is a float then we mask everything <= -999.0 fill_max = np.float32(ds_info.pop("fill_max_float", -999.0)) return data.where(data > fill_max, np.float32(np.nan)) else: # If the data is an integer then we mask everything >= fill_min_int fill_min = int(ds_info.pop("fill_min_int", 65528)) return data.where(data < fill_min, np.float32(np.nan))
[docs] def available_datasets(self, configured_datasets=None): """Generate dataset info and their availablity. See :meth:`satpy.readers.file_handlers.BaseFileHandler.available_datasets` for details. """ for is_avail, ds_info in (configured_datasets or []): if is_avail is not None: yield is_avail, ds_info continue dataset_group = [ds_group for ds_group in ds_info["dataset_groups"] if ds_group in self.datasets] if dataset_group: yield True, ds_info elif is_avail is None: yield is_avail, ds_info
[docs] def _apply_factors(data, factor_set): return data * factor_set[0, 0] + factor_set[0, 1]