#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2022, 2023 Satpy Developers
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Common utilities for reading VIIRS and ATMS SDR data."""
import datetime as dt
import logging
import dask.array as da
import numpy as np
import xarray as xr
from satpy.readers.hdf5_utils import HDF5FileHandler
NO_DATE = dt.datetime(1958, 1, 1)
EPSILON_TIME = dt.timedelta(days=2)
LOG = logging.getLogger(__name__)
VIIRS_DATASET_KEYS = {"GDNBO": "VIIRS-DNB-GEO",
"SVDNB": "VIIRS-DNB-SDR",
"GITCO": "VIIRS-IMG-GEO-TC",
"GIMGO": "VIIRS-IMG-GEO",
"SVI01": "VIIRS-I1-SDR",
"SVI02": "VIIRS-I2-SDR",
"SVI03": "VIIRS-I3-SDR",
"SVI04": "VIIRS-I4-SDR",
"SVI05": "VIIRS-I5-SDR",
"GMTCO": "VIIRS-MOD-GEO-TC",
"GMODO": "VIIRS-MOD-GEO",
"SVM01": "VIIRS-M1-SDR",
"SVM02": "VIIRS-M2-SDR",
"SVM03": "VIIRS-M3-SDR",
"SVM04": "VIIRS-M4-SDR",
"SVM05": "VIIRS-M5-SDR",
"SVM06": "VIIRS-M6-SDR",
"SVM07": "VIIRS-M7-SDR",
"SVM08": "VIIRS-M8-SDR",
"SVM09": "VIIRS-M9-SDR",
"SVM10": "VIIRS-M10-SDR",
"SVM11": "VIIRS-M11-SDR",
"SVM12": "VIIRS-M12-SDR",
"SVM13": "VIIRS-M13-SDR",
"SVM14": "VIIRS-M14-SDR",
"SVM15": "VIIRS-M15-SDR",
"SVM16": "VIIRS-M16-SDR",
"IVCDB": "VIIRS-DualGain-Cal-IP"}
ATMS_DATASET_KEYS = {"SATMS": "ATMS-SDR",
"GATMO": "ATMS-SDR-GEO",
"TATMS": "ATMS-TDR"}
DATASET_KEYS = {}
DATASET_KEYS.update(VIIRS_DATASET_KEYS)
DATASET_KEYS.update(ATMS_DATASET_KEYS)
[docs]
def _get_scale_factors_for_units(factors, file_units, output_units):
if file_units == "W cm-2 sr-1" and output_units == "W m-2 sr-1":
LOG.debug("Adjusting scaling factors to convert '%s' to '%s'",
file_units, output_units)
factors = factors * 10000.
elif file_units == "1" and output_units == "%":
LOG.debug("Adjusting scaling factors to convert '%s' to '%s'",
file_units, output_units)
factors = factors * 100.
else:
raise ValueError("Don't know how to convert '{}' to '{}'".format(
file_units, output_units))
return factors
[docs]
def _get_file_units(dataset_id, ds_info):
"""Get file units from metadata."""
file_units = ds_info.get("file_units")
if file_units is None:
LOG.debug("Unknown units for file key '%s'", dataset_id)
return file_units
[docs]
class JPSS_SDR_FileHandler(HDF5FileHandler):
"""Base class for reading JPSS VIIRS & ATMS SDR HDF5 Files."""
def __init__(self, filename, filename_info, filetype_info, **kwargs):
"""Initialize file handler."""
super().__init__(filename, filename_info, filetype_info, **kwargs)
[docs]
def _parse_datetime(self, datestr, timestr):
if not isinstance(datestr, str):
datestr = str(datestr.data.compute().astype(str))
if not isinstance(timestr, str):
timestr = str(timestr.data.compute().astype(str))
datetime_str = datestr + timestr
time_val = dt.datetime.strptime(datetime_str, "%Y%m%d%H%M%S.%fZ")
if abs(time_val - NO_DATE) < EPSILON_TIME:
# catch rare case when SDR files have incorrect date
raise ValueError("Datetime invalid {}".format(time_val))
return time_val
@property
def start_time(self):
"""Get start time."""
date_var_path = self._get_aggr_path("start_date", "AggregateBeginningDate")
time_var_path = self._get_aggr_path("start_time", "AggregateBeginningTime")
return self._parse_datetime(self[date_var_path], self[time_var_path])
@property
def end_time(self):
"""Get end time."""
date_var_path = self._get_aggr_path("end_date", "AggregateEndingDate")
time_var_path = self._get_aggr_path("end_time", "AggregateEndingTime")
return self._parse_datetime(self[date_var_path], self[time_var_path])
@property
def start_orbit_number(self):
"""Get start orbit number."""
start_orbit_path = self._get_aggr_path("start_orbit", "AggregateBeginningOrbitNumber")
return int(self[start_orbit_path])
@property
def end_orbit_number(self):
"""Get end orbit number."""
end_orbit_path = self._get_aggr_path("end_orbit", "AggregateEndingOrbitNumber")
return int(self[end_orbit_path])
[docs]
def _get_aggr_path(self, fileinfo_key, aggr_default):
dataset_group = DATASET_KEYS[self.datasets[0]]
default = "Data_Products/{dataset_group}/{dataset_group}_Aggr/attr/" + aggr_default
return self.filetype_info.get(fileinfo_key, default).format(dataset_group=dataset_group)
@property
def platform_name(self):
"""Get platform name."""
default = "/attr/Platform_Short_Name"
platform_path = self.filetype_info.get(
"platform_name", default).format(**self.filetype_info)
platform_dict = {"NPP": "Suomi-NPP",
"JPSS-1": "NOAA-20",
"J01": "NOAA-20",
"JPSS-2": "NOAA-21",
"J02": "NOAA-21"}
return platform_dict.get(self[platform_path], self[platform_path])
@property
def sensor_name(self):
"""Get sensor name."""
dataset_group = DATASET_KEYS[self.datasets[0]]
default = "Data_Products/{dataset_group}/attr/Instrument_Short_Name"
sensor_path = self.filetype_info.get(
"sensor_name", default).format(dataset_group=dataset_group)
return self[sensor_path].lower()
[docs]
def scale_swath_data(self, data, scaling_factors, dataset_group):
"""Scale swath data using scaling factors and offsets.
Multi-granule (a.k.a. aggregated) files will have more than the usual two values.
"""
rows_per_gran = self._get_rows_per_granule(dataset_group)
factors = self._mask_and_reshape_factors(scaling_factors)
data = self._map_and_apply_factors(data, factors, rows_per_gran)
return data
[docs]
def scale_data_to_specified_unit(self, data, dataset_id, ds_info):
"""Get sscale and offset factors and convert/scale data to given physical unit."""
var_path = self._generate_file_key(dataset_id, ds_info)
dataset_group = ds_info["dataset_group"]
file_units = _get_file_units(dataset_id, ds_info)
output_units = ds_info.get("units", file_units)
factor_var_path = ds_info.get("factors_key", var_path + "Factors")
factors = self.get(factor_var_path)
factors = self._adjust_scaling_factors(factors, file_units, output_units)
if factors is not None:
return self.scale_swath_data(data, factors, dataset_group)
LOG.debug("No scaling factors found for %s", dataset_id)
return data
[docs]
@staticmethod
def _mask_and_reshape_factors(factors):
factors = factors.where(factors > -999, np.float32(np.nan))
return factors.data.reshape((-1, 2)).rechunk((1, 2)) # make it so map_blocks happens per factor
[docs]
@staticmethod
def _map_and_apply_factors(data, factors, rows_per_gran):
# The user may have requested a different chunking scheme, but we need
# per granule chunking right now so factor chunks map 1:1 to data chunks
old_chunks = data.chunks
dask_data = data.data.rechunk((tuple(rows_per_gran), data.data.chunks[1]))
dask_data = da.map_blocks(_apply_factors, dask_data, factors,
chunks=dask_data.chunks, dtype=data.dtype,
meta=np.array([[]], dtype=data.dtype))
data = xr.DataArray(dask_data.rechunk(old_chunks),
dims=data.dims, coords=data.coords,
attrs=data.attrs)
return data
[docs]
@staticmethod
def _scale_factors_for_units(factors, file_units, output_units):
return _get_scale_factors_for_units(factors, file_units, output_units)
[docs]
@staticmethod
def _get_valid_scaling_factors(factors):
if factors is None:
factors = np.array([1, 0], dtype=np.float32)
factors = xr.DataArray(da.from_array(factors, chunks=1))
else:
factors = factors.where(factors != -999., np.float32(np.nan))
return factors
[docs]
def _adjust_scaling_factors(self, factors, file_units, output_units):
"""Adjust scaling factors ."""
if file_units == output_units:
LOG.debug("File units and output units are the same (%s)", file_units)
return factors
factors = self._get_valid_scaling_factors(factors)
return self._scale_factors_for_units(factors, file_units, output_units)
[docs]
@staticmethod
def expand_single_values(var, scans):
"""Expand single valued variable to full scan lengths."""
if scans.size == 1:
return var
else:
expanded = np.repeat(var, scans)
expanded.attrs = var.attrs
expanded.rename({expanded.dims[0]: "y"})
return expanded
[docs]
def _scan_size(self, dataset_group_name):
"""Get how many rows of data constitute one scanline."""
if "ATM" in dataset_group_name:
scan_size = 1
elif "I" in dataset_group_name:
scan_size = 32
else:
scan_size = 16
return scan_size
[docs]
def _generate_file_key(self, ds_id, ds_info, factors=False):
var_path = ds_info.get("file_key", "All_Data/{dataset_group}_All/{calibration}")
calibration = {
"radiance": "Radiance",
"reflectance": "Reflectance",
"brightness_temperature": "BrightnessTemperature",
}.get(ds_id.get("calibration"))
var_path = var_path.format(calibration=calibration, dataset_group=DATASET_KEYS[ds_info["dataset_group"]])
if ds_id["name"] in ["dnb_longitude", "dnb_latitude"]:
if self.use_tc is True:
return var_path + "_TC"
if self.use_tc is None and var_path + "_TC" in self.file_content:
return var_path + "_TC"
return var_path
[docs]
def _update_data_attributes(self, data, dataset_id, ds_info):
file_units = _get_file_units(dataset_id, ds_info)
output_units = ds_info.get("units", file_units)
i = getattr(data, "attrs", {})
i.update(ds_info)
i.update({
"platform_name": self.platform_name,
"sensor": self.sensor_name,
"start_orbit": self.start_orbit_number,
"end_orbit": self.end_orbit_number,
"units": output_units,
"rows_per_scan": self._scan_size(ds_info["dataset_group"]),
})
i.update(dataset_id.to_dict())
data.attrs.update(i)
return data
[docs]
def _get_variable(self, var_path, **kwargs):
return self[var_path]
[docs]
def concatenate_dataset(self, dataset_group, var_path, **kwargs):
"""Concatenate dataset."""
scan_size = self._scan_size(dataset_group)
scans = self._get_scans_per_granule(dataset_group)
start_scan = 0
data_chunks = []
scans = xr.DataArray(scans)
variable = self._get_variable(var_path, **kwargs)
# check if these are single per-granule value
if variable.size != scans.size:
for gscans in scans.values:
data_chunks.append(variable.isel(y=slice(start_scan,
start_scan + gscans * scan_size)))
start_scan += gscans * scan_size
return xr.concat(data_chunks, "y")
else:
# This is not tested - Not sure this code is ever going to be used? A. Dybbroe
# Mon Jan 2 13:31:21 2023
return self.expand_single_values(variable, scans)
[docs]
def _get_rows_per_granule(self, dataset_group):
scan_size = self._scan_size(dataset_group)
scans_per_gran = self._get_scans_per_granule(dataset_group)
return [scan_size * gran_scans for gran_scans in scans_per_gran]
[docs]
def _get_scans_per_granule(self, dataset_group):
number_of_granules_path = "Data_Products/{dataset_group}/{dataset_group}_Aggr/attr/AggregateNumberGranules"
nb_granules_path = number_of_granules_path.format(dataset_group=DATASET_KEYS[dataset_group])
scans = []
for granule in range(self[nb_granules_path]):
scans_path = "Data_Products/{dataset_group}/{dataset_group}_Gran_{granule}/attr/N_Number_Of_Scans"
scans_path = scans_path.format(dataset_group=DATASET_KEYS[dataset_group], granule=granule)
scans.append(self[scans_path])
return scans
[docs]
def mask_fill_values(self, data, ds_info):
"""Mask fill values."""
is_floating = np.issubdtype(data.dtype, np.floating)
if is_floating:
# If the data is a float then we mask everything <= -999.0
fill_max = np.float32(ds_info.pop("fill_max_float", -999.0))
return data.where(data > fill_max, np.float32(np.nan))
else:
# If the data is an integer then we mask everything >= fill_min_int
fill_min = int(ds_info.pop("fill_min_int", 65528))
return data.where(data < fill_min, np.float32(np.nan))
[docs]
def available_datasets(self, configured_datasets=None):
"""Generate dataset info and their availablity.
See
:meth:`satpy.readers.file_handlers.BaseFileHandler.available_datasets`
for details.
"""
for is_avail, ds_info in (configured_datasets or []):
if is_avail is not None:
yield is_avail, ds_info
continue
dataset_group = [ds_group for ds_group in ds_info["dataset_groups"] if ds_group in self.datasets]
if dataset_group:
yield True, ds_info
elif is_avail is None:
yield is_avail, ds_info
[docs]
def _apply_factors(data, factor_set):
return data * factor_set[0, 0] + factor_set[0, 1]