Source code for satpy.readers.sgli_l1b

# Copyright (c) 2020 Satpy developers
# This file is part of satpy.
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <>.
"""GCOM-C SGLI L1b reader.

GCOM-C has an imager instrument: SGLI

Test data is available here:
The live data is available from here:
And the format description is here:


import logging
from datetime import datetime

import dask.array as da
import h5py
import numpy as np
import xarray as xr
from dask.array.core import normalize_chunks

# from satpy import CHUNK_SIZE
from satpy.readers.file_handlers import BaseFileHandler

logger = logging.getLogger(__name__)

resolutions = {"Q": 250,
               "K": 1000,
               "L": 1000}

polarization_keys = {0: "0",
                     -60: "m60",
                     60: "60"}

[docs] class HDF5SGLI(BaseFileHandler): """File handler for the SGLI l1b data.""" def __init__(self, filename, filename_info, filetype_info): """Initialize the filehandler.""" super().__init__(filename, filename_info, filetype_info) self.resolution = resolutions[self.filename_info["resolution"]] self.h5file = h5py.File(self.filename, "r") @property def start_time(self): """Get the start time.""" the_time = self.h5file["Global_attributes"].attrs["Scene_start_time"].item() return datetime.strptime(the_time.decode("ascii"), "%Y%m%d %H:%M:%S.%f") @property def end_time(self): """Get the end time.""" the_time = self.h5file["Global_attributes"].attrs["Scene_end_time"].item() return datetime.strptime(the_time.decode("ascii"), "%Y%m%d %H:%M:%S.%f")
[docs] def get_dataset(self, key, info): """Get the dataset from the file.""" if key["resolution"] != self.resolution: return file_key = info["file_key"] if key["name"].startswith("P"): file_key = file_key.format(polarization=polarization_keys[key["polarization"]]) h5dataset = self.h5file[file_key] chunks = normalize_chunks(("auto", "auto"), h5dataset.shape, previous_chunks=h5dataset.chunks, dtype=np.float32) dataset = da.from_array(h5dataset, chunks=chunks) attrs = h5dataset.attrs dataset = xr.DataArray(dataset, attrs=attrs, dims=["y", "x"]) dataset = self.prepare_dataset(key, dataset) dataset.attrs["platform_name"] = "GCOM-C1" dataset.attrs["sensor"] = "sgli" dataset.attrs["units"] = info["units"] dataset.attrs["standard_name"] = info["standard_name"] return dataset
[docs] def prepare_dataset(self, key, dataset): """Prepare the dataset according to key.""" with xr.set_options(keep_attrs=True): if key["name"].startswith(("VN", "SW", "P")): dataset = self.get_visible_dataset(key, dataset) elif key["name"].startswith("TI"): dataset = self.get_ir_dataset(key, dataset) elif key["name"].startswith(("longitude", "latitude")): dataset = self.get_lon_lats(key) elif "angle" in key["name"]: dataset = self.get_angles(key) else: raise KeyError(f"Unrecognized dataset {key['name']}") return dataset
[docs] def get_visible_dataset(self, key, dataset): """Produce a DataArray with a visible channel data in it.""" dataset = self.mask_to_14_bits(dataset) dataset = self.calibrate_vis(dataset, key["calibration"]) return dataset
[docs] def mask_to_14_bits(self, dataset): """Mask data to 14 bits.""" return dataset & dataset.attrs["Mask"].item()
[docs] def calibrate_vis(self, dataset, calibration): """Calibrate visible data.""" attrs = dataset.attrs if calibration == "counts": return dataset if calibration == "reflectance": calibrated = (dataset * attrs["Slope_reflectance"] + attrs["Offset_reflectance"]) * 100 elif calibration == "radiance": calibrated = dataset * attrs["Slope"] + attrs["Offset"] missing, _ = self.get_missing_and_saturated(attrs) return calibrated.where(dataset < missing)
[docs] def get_missing_and_saturated(self, attrs): """Get the missing and saturation values.""" missing_and_saturated = attrs["Bit00(LSB)-13"].item() mask_vals = missing_and_saturated.split(b"\n")[1:] missing = int(mask_vals[0].split(b":")[0].strip()) saturation = int(mask_vals[1].split(b":")[0].strip()) return missing, saturation
[docs] def get_ir_dataset(self, key, dataset): """Produce a DataArray with an IR channel data in it.""" dataset = self.mask_to_14_bits(dataset) dataset = self.calibrate_ir(dataset, key["calibration"]) return dataset
[docs] def calibrate_ir(self, dataset, calibration): """Calibrate IR channel.""" attrs = dataset.attrs if calibration == "counts": return dataset elif calibration in ["radiance", "brightness_temperature"]: calibrated = dataset * attrs["Slope"] + attrs["Offset"] if calibration == "brightness_temperature": raise NotImplementedError("Cannot calibrate to brightness temperatures.") # from pyspectral.radiance_tb_conversion import radiance2tb # calibrated = radiance2tb(calibrated, attrs["Center_wavelength"] * 1e-9) missing, _ = self.get_missing_and_saturated(attrs) return calibrated.where(dataset < missing)
[docs] def get_lon_lats(self, key): """Get lon/lats from the file.""" lons = self.h5file["Geometry_data/Longitude"] lats = self.h5file["Geometry_data/Latitude"] attrs = lons.attrs resampling_interval = attrs["Resampling_interval"] if resampling_interval != 1: lons, lats = self.interpolate_spherical(lons, lats, resampling_interval) if key["name"].startswith("longitude"): dataset = lons else: dataset = lats return xr.DataArray(dataset, attrs=attrs, dims=["y", "x"])
[docs] def interpolate_spherical(self, azimuthal_angle, polar_angle, resampling_interval): """Interpolate spherical coordinates.""" from geotiepoints.geointerpolator import GeoGridInterpolator full_shape = (self.h5file["Image_data"].attrs["Number_of_lines"], self.h5file["Image_data"].attrs["Number_of_pixels"]) tie_lines = np.arange(0, polar_angle.shape[0] * resampling_interval, resampling_interval) tie_cols = np.arange(0, polar_angle.shape[1] * resampling_interval, resampling_interval) interpolator = GeoGridInterpolator((tie_lines, tie_cols), azimuthal_angle, polar_angle, method="slinear") new_azi, new_pol = interpolator.interpolate_to_shape(full_shape, chunks="auto") return new_azi, new_pol
[docs] def get_angles(self, key): """Get angles from the file.""" if "solar" in key["name"]: azi, zen, attrs = self.get_solar_angles() elif "satellite" in key["name"]: azi, zen, attrs = self.get_sensor_angles() if "azimuth" in key["name"]: dataset = azi else: dataset = zen dataset = xr.DataArray(dataset, attrs=attrs, dims=["y", "x"]) return dataset
[docs] def get_solar_angles(self): """Get the solar angles.""" azi = self.h5file["Geometry_data/Solar_azimuth"] zen = self.h5file["Geometry_data/Solar_zenith"] attrs = zen.attrs azi = self.scale_array(azi) zen = self.scale_array(zen) return *self.get_full_angles(azi, zen, attrs), attrs
[docs] def get_sensor_angles(self): """Get the solar angles.""" azi = self.h5file["Geometry_data/Sensor_azimuth"] zen = self.h5file["Geometry_data/Sensor_zenith"] attrs = zen.attrs azi = self.scale_array(azi) zen = self.scale_array(zen) return *self.get_full_angles(azi, zen, attrs), attrs
[docs] def scale_array(self, array): """Scale an array with its attributes `Slope` and `Offset` if available.""" try: return array * array.attrs["Slope"] + array.attrs["Offset"] except KeyError: return array
[docs] def get_full_angles(self, azi, zen, attrs): """Interpolate angle arrays.""" resampling_interval = attrs["Resampling_interval"] if resampling_interval != 1: zen = zen[:] - 90 new_azi, new_zen = self.interpolate_spherical(azi, zen, resampling_interval) return new_azi, new_zen + 90 return azi, zen