Source code for satpy.readers.viirs_compact

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2014-2019 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Compact viirs format.

This is a reader for the Compact VIIRS format shipped on Eumetcast for the
VIIRS SDR. The format is compressed in multiple ways, notably by shipping only
tie-points for geographical data. The interpolation of this data is done using
dask operations, so it should be relatively performant.

For more information on this format, the reader can refer to the
`Compact VIIRS SDR Product Format User Guide` that can be found on this EARS_ page.

.. _EARS: https://www.eumetsat.int/media/45988

"""

import datetime as dt
import logging
from contextlib import suppress

import dask.array as da
import h5py
import numpy as np
import xarray as xr

from satpy.readers.file_handlers import BaseFileHandler
from satpy.readers.utils import np2str
from satpy.utils import angle2xyz, get_legacy_chunk_size, lonlat2xyz, xyz2angle, xyz2lonlat

CHUNK_SIZE = get_legacy_chunk_size()
_channels_dict = {"M01": "M1",
                  "M02": "M2",
                  "M03": "M3",
                  "M04": "M4",
                  "M05": "M5",
                  "M06": "M6",
                  "M07": "M7",
                  "M08": "M8",
                  "M09": "M9",
                  "M10": "M10",
                  "M11": "M11",
                  "M12": "M12",
                  "M13": "M13",
                  "M14": "M14",
                  "M15": "M15",
                  "M16": "M16",
                  "DNB": "DNB"}

logger = logging.getLogger(__name__)

c = 299792458  # m.s-1
h = 6.6260755e-34  # m2kg.s-1
k = 1.380658e-23  # m2kg.s-2.K-1

short_names = {"NPP": "Suomi-NPP",
               "J01": "NOAA-20",
               "J02": "NOAA-21"}


[docs] class VIIRSCompactFileHandler(BaseFileHandler): """A file handler class for VIIRS compact format.""" def __init__(self, filename, filename_info, filetype_info): """Initialize the reader.""" super(VIIRSCompactFileHandler, self).__init__(filename, filename_info, filetype_info) self.h5f = h5py.File(self.filename, "r") self.finfo = filename_info self.lons = None self.lats = None if filetype_info["file_type"] == "compact_m": self.ch_type = "MOD" elif filetype_info["file_type"] == "compact_dnb": self.ch_type = "DNB" else: raise IOError("Compact Viirs file type not recognized.") geo_data = self.h5f["Data_Products"]["VIIRS-%s-GEO" % self.ch_type]["VIIRS-%s-GEO_Gran_0" % self.ch_type] self.min_lat = geo_data.attrs["South_Bounding_Coordinate"].item() self.max_lat = geo_data.attrs["North_Bounding_Coordinate"].item() self.min_lon = geo_data.attrs["West_Bounding_Coordinate"].item() self.max_lon = geo_data.attrs["East_Bounding_Coordinate"].item() self.switch_to_cart = ((abs(self.max_lon - self.min_lon) > 90) or (max(abs(self.min_lat), abs(self.max_lat)) > 60)) self.scans = self.h5f["All_Data"]["NumberOfScans"][0] self.geography = self.h5f["All_Data"]["VIIRS-%s-GEO_All" % self.ch_type] for key in self.h5f["All_Data"].keys(): if key.startswith("VIIRS") and key.endswith("SDR_All"): channel = key.split("-")[1] break # This supposes there is only one tiepoint zone in the track direction. channel_path = f"All_Data/VIIRS-{channel}-SDR_All" self.scan_size = self.h5f[channel_path].attrs["TiePointZoneSizeTrack"].item() self.track_offset = self.h5f[channel_path].attrs["PixelOffsetTrack"][()] self.scan_offset = self.h5f[channel_path].attrs["PixelOffsetScan"][()] try: self.group_locations = self.geography["TiePointZoneGroupLocationScanCompact"][()] except KeyError: self.group_locations = [0] self.tpz_sizes = da.from_array(self.h5f[channel_path].attrs["TiePointZoneSizeScan"], chunks=1) if len(self.tpz_sizes.shape) == 2: if self.tpz_sizes.shape[1] != 1: raise NotImplementedError("Can't handle 2 dimensional tiepoint zones.") self.tpz_sizes = self.tpz_sizes.squeeze(1) self.nb_tiepoint_zones = self.geography["NumberOfTiePointZonesScan"][()] self.c_align = da.from_array(self.geography["AlignmentCoefficient"], chunks=tuple(self.nb_tiepoint_zones)) self.c_exp = da.from_array(self.geography["ExpansionCoefficient"], chunks=tuple(self.nb_tiepoint_zones)) self.nb_tiepoint_zones = da.from_array(self.nb_tiepoint_zones, chunks=1) self._expansion_coefs = None self.cache = {} self.mda = {} short_name = np2str(self.h5f.attrs["Platform_Short_Name"]) self.mda["platform_name"] = short_names.get(short_name, short_name) self.mda["sensor"] = "viirs" def __del__(self): """Close file handlers when we are done.""" with suppress(OSError): self.h5f.close()
[docs] def get_dataset(self, key, info): """Load a dataset.""" logger.debug("Reading %s.", key["name"]) if key["name"] in _channels_dict: m_data = self.read_dataset(key, info) else: m_data = self.read_geo(key, info) m_data.attrs.update(info) m_data.attrs["rows_per_scan"] = self.scan_size return m_data
[docs] def get_bounding_box(self): """Get the bounding box of the data.""" for key in self.h5f["Data_Products"].keys(): if key.startswith("VIIRS") and key.endswith("GEO"): lats = self.h5f["Data_Products"][key][key + "_Gran_0"].attrs["G-Ring_Latitude"][()] lons = self.h5f["Data_Products"][key][key + "_Gran_0"].attrs["G-Ring_Longitude"][()] break else: raise KeyError("Cannot find bounding coordinates!") return lons.ravel(), lats.ravel()
@property def start_time(self): """Get the start time.""" return self.finfo["start_time"] @property def end_time(self): """Get the end time.""" end_time = dt.datetime.combine(self.start_time.date(), self.finfo["end_time"].time()) if end_time < self.start_time: end_time += dt.timedelta(days=1) return end_time
[docs] def read_geo(self, key, info): """Read angles.""" pairs = {("satellite_azimuth_angle", "satellite_zenith_angle"): ("SatelliteAzimuthAngle", "SatelliteZenithAngle"), ("solar_azimuth_angle", "solar_zenith_angle"): ("SolarAzimuthAngle", "SolarZenithAngle"), ("dnb_solar_azimuth_angle", "dnb_solar_zenith_angle"): ("SolarAzimuthAngle", "SolarZenithAngle"), ("dnb_lunar_azimuth_angle", "dnb_lunar_zenith_angle"): ("LunarAzimuthAngle", "LunarZenithAngle"), } if self.lons is None or self.lats is None: self.lons, self.lats = self.navigate() for pair, fkeys in pairs.items(): if key["name"] in pair: if (self.cache.get(pair[0]) is None or self.cache.get(pair[1]) is None): angles = self.angles(*fkeys) self.cache[pair[0]], self.cache[pair[1]] = angles if key["name"] == pair[0]: return xr.DataArray(self.cache[pair[0]], name=key["name"], attrs=self.mda, dims=("y", "x")) else: return xr.DataArray(self.cache[pair[1]], name=key["name"], attrs=self.mda, dims=("y", "x")) if info.get("standard_name") in ["latitude", "longitude"]: mda = self.mda.copy() mda.update(info) if info["standard_name"] == "longitude": return xr.DataArray(self.lons, attrs=mda, dims=("y", "x")) else: return xr.DataArray(self.lats, attrs=mda, dims=("y", "x")) if key["name"] == "dnb_moon_illumination_fraction": mda = self.mda.copy() mda.update(info) return xr.DataArray(da.from_array(self.geography["MoonIllumFraction"]), attrs=info)
[docs] def read_dataset(self, dataset_key, info): """Read a dataset.""" h5f = self.h5f channel = _channels_dict[dataset_key["name"]] chan_dict = dict([(key.split("-")[1], key) for key in h5f["All_Data"].keys() if key.startswith("VIIRS")]) h5rads = h5f["All_Data"][chan_dict[channel]]["Radiance"] chunks = h5rads.chunks or CHUNK_SIZE rads = xr.DataArray(da.from_array(h5rads, chunks=chunks), name=dataset_key["name"], dims=["y", "x"]).astype(np.float32) h5attrs = h5rads.attrs scans = h5f["All_Data"]["NumberOfScans"][0] rads = rads[:scans * 16, :] rads = rads.where(rads <= 65526) try: rads = xr.where(rads <= h5attrs["Threshold"], rads * h5attrs["RadianceScaleLow"] + h5attrs["RadianceOffsetLow"], rads * h5attrs["RadianceScaleHigh"] + h5attrs["RadianceOffsetHigh"]) except (KeyError, AttributeError): logger.info("Missing attribute for scaling of %s.", channel) pass unit = "W m-2 sr-1 μm-1" if dataset_key["calibration"] == "counts": raise NotImplementedError("Can't get counts from this data") if dataset_key["calibration"] in ["reflectance", "brightness_temperature"]: # do calibrate try: # First guess: VIS or NIR data a_vis = h5attrs["EquivalentWidth"] b_vis = h5attrs["IntegratedSolarIrradiance"] dse = h5attrs["EarthSunDistanceNormalised"] rads *= 100 * np.pi * a_vis / b_vis * (dse**2) unit = "%" except KeyError: # Maybe it's IR data? try: a_ir = h5attrs["BandCorrectionCoefficientA"] b_ir = h5attrs["BandCorrectionCoefficientB"] lambda_c = h5attrs["CentralWaveLength"] rads *= 1e6 rads = (h * c) / (k * lambda_c * np.log(1 + (2 * h * c ** 2) / ((lambda_c ** 5) * rads))) rads *= a_ir rads += b_ir unit = "K" except KeyError: logger.warning("Calibration failed.") elif dataset_key["calibration"] != "radiance": raise ValueError("Calibration parameter should be radiance, " "reflectance or brightness_temperature") rads = rads.clip(min=0) rads.attrs = self.mda rads.attrs["units"] = unit return rads
[docs] def expand_angle_and_nav(self, arrays): """Expand angle and navigation datasets.""" res = [] for array in arrays: res.append(da.map_blocks(expand, array[:, :, np.newaxis], self.expansion_coefs, scans=self.scans, scan_size=self.scan_size, dtype=array.dtype, drop_axis=2, chunks=self.expansion_coefs.chunks[:-1])) return res
@property def expansion_coefs(self): """Compute the expansion coefficients.""" if self._expansion_coefs is not None: return self._expansion_coefs v_track = (np.arange(self.scans * self.scan_size) % self.scan_size + self.track_offset) / self.scan_size self.tpz_sizes = self.tpz_sizes.persist() self.nb_tiepoint_zones = self.nb_tiepoint_zones.persist() col_chunks = (self.tpz_sizes * self.nb_tiepoint_zones).compute() self._expansion_coefs = da.map_blocks(get_coefs, self.c_align, self.c_exp, self.tpz_sizes, self.nb_tiepoint_zones, v_track=v_track, scans=self.scans, scan_size=self.scan_size, scan_offset=self.scan_offset, dtype=np.float64, new_axis=[0, 2], chunks=(self.scans * self.scan_size, tuple(col_chunks), 4)) return self._expansion_coefs
[docs] def navigate(self): """Generate the navigation datasets.""" chunks = self._get_geographical_chunks() lon = da.from_array(self.geography["Longitude"], chunks=chunks) lat = da.from_array(self.geography["Latitude"], chunks=chunks) if self.switch_to_cart: arrays = lonlat2xyz(lon, lat) else: arrays = (lon, lat) expanded = self.expand_angle_and_nav(arrays) if self.switch_to_cart: return xyz2lonlat(*expanded) return expanded
[docs] def _get_geographical_chunks(self): shape = self.geography["Longitude"].shape horizontal_chunks = (self.nb_tiepoint_zones + 1).compute() chunks = (shape[0], tuple(horizontal_chunks)) return chunks
[docs] def angles(self, azi_name, zen_name): """Generate the angle datasets.""" chunks = self._get_geographical_chunks() azi = self.geography[azi_name] zen = self.geography[zen_name] switch_to_cart = ((np.max(azi) - np.min(azi) > 5) or (np.min(zen) < 10) or (max(abs(self.min_lat), abs(self.max_lat)) > 80)) azi = da.from_array(azi, chunks=chunks) zen = da.from_array(zen, chunks=chunks) if switch_to_cart: arrays = convert_from_angles(azi, zen) else: arrays = (azi, zen) expanded = self.expand_angle_and_nav(arrays) if switch_to_cart: return convert_to_angles(*expanded) return expanded
[docs] def convert_from_angles(azi, zen): """Convert the angles to cartesian coordinates.""" x, y, z, = angle2xyz(azi, zen) # Conversion to ECEF is recommended by the provider, but no significant # difference has been seen. # x, y, z = (-np.sin(lon) * x + np.cos(lon) * y, # -np.sin(lat) * np.cos(lon) * x - np.sin(lat) * np.sin(lon) * y + np.cos(lat) * z, # np.cos(lat) * np.cos(lon) * x + np.cos(lat) * np.sin(lon) * y + np.sin(lat) * z) return x, y, z
[docs] def convert_to_angles(x, y, z): """Convert the cartesian coordinates to angles.""" # Conversion to ECEF is recommended by the provider, but no significant # difference has been seen. # x, y, z = (-np.sin(lon) * x - np.sin(lat) * np.cos(lon) * y + np.cos(lat) * np.cos(lon) * z, # np.cos(lon) * x - np.sin(lat) * np.sin(lon) * y + np.cos(lat) * np.sin(lon) * z, # np.cos(lat) * y + np.sin(lat) * z) azi, zen = xyz2angle(x, y, z, acos=True) return azi, zen
[docs] def get_coefs(c_align, c_exp, tpz_size, nb_tpz, v_track, scans, scan_size, scan_offset): """Compute the coeffs in numpy domain.""" nties = nb_tpz.item() tpz_size = tpz_size.item() v_scan = (np.arange(nties * tpz_size) % tpz_size + scan_offset) / tpz_size s_scan, s_track = np.meshgrid(v_scan, v_track) s_track = s_track.reshape(scans, scan_size, nties, tpz_size) s_scan = s_scan.reshape(scans, scan_size, nties, tpz_size) c_align = c_align[np.newaxis, np.newaxis, :, np.newaxis] c_exp = c_exp[np.newaxis, np.newaxis, :, np.newaxis] a_scan = s_scan + s_scan * (1 - s_scan) * c_exp + s_track * ( 1 - s_track) * c_align a_track = s_track coef_a = (1 - a_track) * (1 - a_scan) coef_b = (1 - a_track) * a_scan coef_d = a_track * (1 - a_scan) coef_c = a_track * a_scan res = np.stack([coef_a, coef_b, coef_c, coef_d], axis=4).reshape(scans * scan_size, -1, 4) return res
[docs] def expand(data, coefs, scans, scan_size): """Perform the expansion in numpy domain.""" data = data.reshape(data.shape[:-1]) coefs = coefs.reshape(scans, scan_size, data.shape[1] - 1, -1, 4) coef_a = coefs[:, :, :, :, 0] coef_b = coefs[:, :, :, :, 1] coef_c = coefs[:, :, :, :, 2] coef_d = coefs[:, :, :, :, 3] corner_coefficients = (coef_a, coef_b, coef_c, coef_d) fdata = _interpolate_data(data, corner_coefficients, scans) return fdata.reshape(scans * scan_size, -1)
[docs] def _interpolate_data(data, corner_coefficients, scans): """Interpolate the data using the provided coefficients.""" coef_a, coef_b, coef_c, coef_d = corner_coefficients data_a = data[:scans * 2:2, np.newaxis, :-1, np.newaxis] data_b = data[:scans * 2:2, np.newaxis, 1:, np.newaxis] data_c = data[1:scans * 2:2, np.newaxis, 1:, np.newaxis] data_d = data[1:scans * 2:2, np.newaxis, :-1, np.newaxis] fdata = (coef_a * data_a + coef_b * data_b + coef_d * data_d + coef_c * data_c) return fdata
[docs] def expand_arrays(arrays, scans, c_align, c_exp, scan_size=16, tpz_size=16, nties=200, track_offset=0.5, scan_offset=0.5): """Expand *data* according to alignment and expansion.""" nties = nties.item() tpz_size = tpz_size.item() s_scan, s_track = da.meshgrid(da.arange(nties * tpz_size), da.arange(scans * scan_size)) s_track = (s_track.reshape(scans, scan_size, nties, tpz_size) % scan_size + track_offset) / scan_size s_scan = (s_scan.reshape(scans, scan_size, nties, tpz_size) % tpz_size + scan_offset) / tpz_size a_scan = s_scan + s_scan * (1 - s_scan) * c_exp + s_track * ( 1 - s_track) * c_align a_track = s_track expanded = [] coef_a = (1 - a_track) * (1 - a_scan) coef_b = (1 - a_track) * a_scan coef_d = a_track * (1 - a_scan) coef_c = a_track * a_scan corner_coefficients = (coef_a, coef_b, coef_c, coef_d) for data in arrays: fdata = _interpolate_data(data, corner_coefficients, scans) expanded.append(fdata.reshape(scans * scan_size, nties * tpz_size)) return expanded