Source code for satpy.readers.iasi_l2

# Copyright (c) 2017-2023 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""IASI L2 files."""

import datetime as dt
import logging

import dask.array as da
import h5py
import numpy as np
import xarray as xr

from satpy.readers.file_handlers import BaseFileHandler
from satpy.utils import get_legacy_chunk_size

from .netcdf_utils import NetCDF4FsspecFileHandler

CHUNK_SIZE = get_legacy_chunk_size()


# Scan timing values taken from
# http://oiswww.eumetsat.org/WEBOPS/eps-pg/IASI-L1/IASIL1-PG-4ProdOverview.htm
# Time between each scan in one scanline [ms]
SCAN_STEP_TIME = 8. / 37.
# Duration of one measurement [ms]
SCAN_STARE_DURATION = 151.0
# Time correction used between each 4-footprint measurements
VIEW_TIME_ADJUSTMENT = SCAN_STEP_TIME + SCAN_STARE_DURATION / 2.

VALUES_PER_SCAN_LINE = 120

# Epoch for the dates
EPOCH = dt.datetime(2000, 1, 1)

SHORT_NAMES = {"M01": "Metop-B",
               "M02": "Metop-A",
               "M03": "Metop-C"}

DSET_NAMES = {"ozone_mixing_ratio": "O",
              "ozone_mixing_ratio_quality": "QO",
              "pressure": "P",
              "pressure_quality": "QP",
              "temperature": "T",
              "temperature_quality": "QT",
              "water_mixing_ratio": "W",
              "water_mixing_ratio_quality": "QW",
              "water_total_column": "WC",
              "ozone_total_column": "OC",
              "surface_skin_temperature": "Ts",
              "surface_skin_temperature_quality": "QTs",
              "emissivity": "E",
              "emissivity_quality": "QE"}

GEO_NAMES = {"latitude": "Latitude",
             "longitude": "Longitude",
             "satellite_azimuth_angle": "SatAzimuth",
             "satellite_zenith_angle": "SatZenith",
             "sensing_time": {"day": "SensingTime_day",
                              "msec": "SensingTime_msec"},
             "solar_azimuth_angle": "SunAzimuth",
             "solar_zenith_angle": "SunZenith"}


LOGGER = logging.getLogger(__name__)


[docs] class IASIL2HDF5(BaseFileHandler): """File handler for IASI L2 HDF5 files.""" def __init__(self, filename, filename_info, filetype_info): """Init the file handler.""" super(IASIL2HDF5, self).__init__(filename, filename_info, filetype_info) self.finfo = filename_info self.lons = None self.lats = None self.sensor = "iasi" self.mda = {} short_name = filename_info["platform_id"] self.mda["platform_name"] = SHORT_NAMES.get(short_name, short_name) self.mda["sensor"] = "iasi" @property def start_time(self): """Get the start time.""" return self.finfo["start_time"] @property def end_time(self): """Get the end time.""" end_time = dt.datetime.combine(self.start_time.date(), self.finfo["end_time"].time()) if end_time < self.start_time: end_time += dt.timedelta(days=1) return end_time
[docs] def get_dataset(self, key, info): """Load a dataset.""" with h5py.File(self.filename, "r") as fid: LOGGER.debug("Reading %s.", key["name"]) if key["name"] in DSET_NAMES: m_data = read_dataset(fid, key) else: m_data = read_geo(fid, key) m_data.attrs.update(info) m_data.attrs["sensor"] = self.sensor return m_data
[docs] def read_dataset(fid, key): """Read dataset.""" dsid = DSET_NAMES[key["name"]] dset = fid["/PWLR/" + dsid] if dset.ndim == 3: dims = ["y", "x", "level"] else: dims = ["y", "x"] data = xr.DataArray(da.from_array(dset[()], chunks=CHUNK_SIZE), name=key["name"], dims=dims).astype(np.float32) data = xr.where(data > 1e30, np.nan, data) dset_attrs = dict(dset.attrs) data.attrs.update(dset_attrs) return data
[docs] def read_geo(fid, key): """Read geolocation and related datasets.""" dsid = GEO_NAMES[key["name"]] add_epoch = False if "time" in key["name"]: days = fid["/L1C/" + dsid["day"]][()] msecs = fid["/L1C/" + dsid["msec"]][()] data = _form_datetimes(days, msecs) add_epoch = True dtype = np.float64 else: data = fid["/L1C/" + dsid][()] dtype = np.float32 data = xr.DataArray(da.from_array(data, chunks=CHUNK_SIZE), name=key["name"], dims=["y", "x"]).astype(dtype) if add_epoch: data.attrs["sensing_time_epoch"] = EPOCH return data
[docs] def _form_datetimes(days, msecs): """Calculate seconds since EPOCH from days and milliseconds for each of IASI scan.""" all_datetimes = [] for i in range(days.size): day = int(days[i]) msec = msecs[i] scanline_datetimes = [] for j in range(int(VALUES_PER_SCAN_LINE / 4)): usec = 1000 * (j * VIEW_TIME_ADJUSTMENT + msec) delta = (dt.timedelta(days=day, microseconds=usec)) for _k in range(4): scanline_datetimes.append(delta.total_seconds()) all_datetimes.append(scanline_datetimes) return np.array(all_datetimes, dtype=np.float64)
[docs] class IASIL2CDRNC(NetCDF4FsspecFileHandler): """Reader for IASI L2 CDR in NetCDF format. Reader for IASI All Sky Temperature and Humidity Profiles - Climate Data Record Release 1.1 - Metop-A and -B. Data and documentation are available from http://doi.org/10.15770/EUM_SEC_CLM_0063. Data are also available from the EUMETSAT Data Store under ID EO:EUM:DAT:0576. """
[docs] def get_dataset(self, data_id, ds_info): """Obtain dataset.""" ds = self[data_id["name"]] if "scan_lines" in ds.dims: ds = ds.rename(scan_lines="y") if "pixels" in ds.dims: ds = ds.rename(pixels="x") if "_FillValue" in ds.attrs and ds.dtype.kind == "f": with xr.set_options(keep_attrs=True): # have to inverse the logic due to https://github.com/pydata/xarray/issues/7581 return xr.where(ds != ds.attrs["_FillValue"], ds, np.nan) return ds
[docs] def available_datasets(self, configured_datasets=None): """Get available datasets based on what's in the file. Returns all datasets in the root group. """ yield from super().available_datasets(configured_datasets) common = {"file_type": "iasi_l2_cdr_nc", "resolution": 12000} for key in self.file_content: if "/" in key: # not a dataset continue yield (True, {"name": key} | common | self[key].attrs)