# Copyright (c) 2017-2023 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""IASI L2 files."""
import datetime as dt
import logging
import dask.array as da
import h5py
import numpy as np
import xarray as xr
from satpy.readers.file_handlers import BaseFileHandler
from satpy.utils import get_legacy_chunk_size
from .netcdf_utils import NetCDF4FsspecFileHandler
CHUNK_SIZE = get_legacy_chunk_size()
# Scan timing values taken from
# http://oiswww.eumetsat.org/WEBOPS/eps-pg/IASI-L1/IASIL1-PG-4ProdOverview.htm
# Time between each scan in one scanline [ms]
SCAN_STEP_TIME = 8. / 37.
# Duration of one measurement [ms]
SCAN_STARE_DURATION = 151.0
# Time correction used between each 4-footprint measurements
VIEW_TIME_ADJUSTMENT = SCAN_STEP_TIME + SCAN_STARE_DURATION / 2.
VALUES_PER_SCAN_LINE = 120
# Epoch for the dates
EPOCH = dt.datetime(2000, 1, 1)
SHORT_NAMES = {"M01": "Metop-B",
"M02": "Metop-A",
"M03": "Metop-C"}
DSET_NAMES = {"ozone_mixing_ratio": "O",
"ozone_mixing_ratio_quality": "QO",
"pressure": "P",
"pressure_quality": "QP",
"temperature": "T",
"temperature_quality": "QT",
"water_mixing_ratio": "W",
"water_mixing_ratio_quality": "QW",
"water_total_column": "WC",
"ozone_total_column": "OC",
"surface_skin_temperature": "Ts",
"surface_skin_temperature_quality": "QTs",
"emissivity": "E",
"emissivity_quality": "QE"}
GEO_NAMES = {"latitude": "Latitude",
"longitude": "Longitude",
"satellite_azimuth_angle": "SatAzimuth",
"satellite_zenith_angle": "SatZenith",
"sensing_time": {"day": "SensingTime_day",
"msec": "SensingTime_msec"},
"solar_azimuth_angle": "SunAzimuth",
"solar_zenith_angle": "SunZenith"}
LOGGER = logging.getLogger(__name__)
[docs]
class IASIL2HDF5(BaseFileHandler):
"""File handler for IASI L2 HDF5 files."""
def __init__(self, filename, filename_info, filetype_info):
"""Init the file handler."""
super(IASIL2HDF5, self).__init__(filename, filename_info,
filetype_info)
self.finfo = filename_info
self.lons = None
self.lats = None
self.sensor = "iasi"
self.mda = {}
short_name = filename_info["platform_id"]
self.mda["platform_name"] = SHORT_NAMES.get(short_name, short_name)
self.mda["sensor"] = "iasi"
@property
def start_time(self):
"""Get the start time."""
return self.finfo["start_time"]
@property
def end_time(self):
"""Get the end time."""
end_time = dt.datetime.combine(self.start_time.date(),
self.finfo["end_time"].time())
if end_time < self.start_time:
end_time += dt.timedelta(days=1)
return end_time
[docs]
def get_dataset(self, key, info):
"""Load a dataset."""
with h5py.File(self.filename, "r") as fid:
LOGGER.debug("Reading %s.", key["name"])
if key["name"] in DSET_NAMES:
m_data = read_dataset(fid, key)
else:
m_data = read_geo(fid, key)
m_data.attrs.update(info)
m_data.attrs["sensor"] = self.sensor
return m_data
[docs]
def read_dataset(fid, key):
"""Read dataset."""
dsid = DSET_NAMES[key["name"]]
dset = fid["/PWLR/" + dsid]
if dset.ndim == 3:
dims = ["y", "x", "level"]
else:
dims = ["y", "x"]
data = xr.DataArray(da.from_array(dset[()], chunks=CHUNK_SIZE),
name=key["name"], dims=dims).astype(np.float32)
data = xr.where(data > 1e30, np.nan, data)
dset_attrs = dict(dset.attrs)
data.attrs.update(dset_attrs)
return data
[docs]
def read_geo(fid, key):
"""Read geolocation and related datasets."""
dsid = GEO_NAMES[key["name"]]
add_epoch = False
if "time" in key["name"]:
days = fid["/L1C/" + dsid["day"]][()]
msecs = fid["/L1C/" + dsid["msec"]][()]
data = _form_datetimes(days, msecs)
add_epoch = True
dtype = np.float64
else:
data = fid["/L1C/" + dsid][()]
dtype = np.float32
data = xr.DataArray(da.from_array(data, chunks=CHUNK_SIZE),
name=key["name"], dims=["y", "x"]).astype(dtype)
if add_epoch:
data.attrs["sensing_time_epoch"] = EPOCH
return data
[docs]
class IASIL2CDRNC(NetCDF4FsspecFileHandler):
"""Reader for IASI L2 CDR in NetCDF format.
Reader for IASI All Sky Temperature and Humidity Profiles - Climate
Data Record Release 1.1 - Metop-A and -B. Data and documentation are
available from http://doi.org/10.15770/EUM_SEC_CLM_0063. Data are
also available from the EUMETSAT Data Store under ID EO:EUM:DAT:0576.
"""
[docs]
def get_dataset(self, data_id, ds_info):
"""Obtain dataset."""
ds = self[data_id["name"]]
if "scan_lines" in ds.dims:
ds = ds.rename(scan_lines="y")
if "pixels" in ds.dims:
ds = ds.rename(pixels="x")
if "_FillValue" in ds.attrs and ds.dtype.kind == "f":
with xr.set_options(keep_attrs=True):
# have to inverse the logic due to https://github.com/pydata/xarray/issues/7581
return xr.where(ds != ds.attrs["_FillValue"], ds, np.nan)
return ds
[docs]
def available_datasets(self, configured_datasets=None):
"""Get available datasets based on what's in the file.
Returns all datasets in the root group.
"""
yield from super().available_datasets(configured_datasets)
common = {"file_type": "iasi_l2_cdr_nc", "resolution": 12000}
for key in self.file_content:
if "/" in key: # not a dataset
continue
yield (True, {"name": key} | common | self[key].attrs)