#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2018 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Unit tests for IASI L2 reader."""
import math
import os
import unittest
import numpy as np
import pytest
import xarray as xr
SCAN_WIDTH = 120
NUM_LEVELS = 138
NUM_SCANLINES = 1
FNAME = "W_XX-EUMETSAT-kan,iasi,metopb+kan_C_EUMS_20170920103559_IASI_PW3_02_M01_20170920102217Z_20170920102912Z.hdf"
# Structure for the test data, to be written to HDF5 file
TEST_DATA = {
# Not implemented in the reader
"Amsu": {
"FLG_AMSUBAD": {"data": np.zeros((NUM_SCANLINES, 30), dtype=np.uint8),
"attrs": {}}
},
# Not implemented in the reader
"INFO": {
"OmC": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.float32),
"attrs": {"long_name": "Cloud signal. Predicted average window channel 'Obs minus Calc",
"units": "K"}},
"mdist": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.float32),
"attrs": {}}
},
"L1C": {
"Latitude": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.float32),
"attrs": {"units": "degrees_north"}},
"Longitude": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.float32),
"attrs": {"units": "degrees_north"}},
"SatAzimuth": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.float32),
"attrs": {"units": "degrees"}},
"SatZenith": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.float32),
"attrs": {"units": "degrees"}},
"SensingTime_day": {"data": np.array([6472], dtype=np.uint16),
"attrs": {}},
"SensingTime_msec": {"data": np.array([37337532], dtype=np.uint32),
"attrs": {}},
"SunAzimuth": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.float32),
"attrs": {"units": "degrees"}},
"SunZenith": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.float32),
"attrs": {"units": "degrees"}},
},
# Not implemented in the reader
"Maps": {
"Height": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.float32),
"attrs": {"units": "m"}},
"HeightStd": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.float32),
"attrs": {"units": "m"}},
},
# Not implemented in the reader
"Mhs": {
"FLG_MHSBAD": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.uint8),
"attrs": {}}
},
"PWLR": {
"E": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH, 10), dtype=np.float32),
"attrs": {"emissivity_wavenumbers": np.array([699.3, 826.4,
925.9, 1075.2,
1204.8, 1315.7,
1724.1, 2000.0,
2325.5, 2702.7],
dtype=np.float32)}},
"O": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH, NUM_LEVELS), dtype=np.float32),
"attrs": {"long_name": "Ozone mixing ratio vertical profile",
"units": "kg/kg"}},
"OC": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.float32),
"attrs": {}},
"P": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH, NUM_LEVELS), dtype=np.float32),
"attrs": {"long_name": "Atmospheric pressures at which the vertical profiles are given. "
"Last value is the surface pressure",
"units": "hpa"}},
"QE": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.float32),
"attrs": {}},
"QO": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.float32),
"attrs": {}},
"QP": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.float32),
"attrs": {}},
"QT": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.float32),
"attrs": {}},
"QTs": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.float32),
"attrs": {}},
"QW": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.float32),
"attrs": {}},
"T": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH, NUM_LEVELS), dtype=np.float32),
"attrs": {"long_name": "Temperature vertical profile", "units": "K"}},
"Ts": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.float32),
"attrs": {"long_name": "Surface skin temperature", "units": "K"}},
"W": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH, NUM_LEVELS), dtype=np.float32),
"attrs": {"long_name": "Water vapour mixing ratio vertical profile", "units": "kg/kg"}},
"WC": {"data": np.zeros((NUM_SCANLINES, SCAN_WIDTH), dtype=np.float32),
"attrs": {"long_name": "Water vapour total columnar amount", "units": "mm"}},
}
}
[docs]
def save_test_data(path):
"""Save the test to the indicated directory."""
import h5py
with h5py.File(os.path.join(path, FNAME), "w") as fid:
# Create groups
for grp in TEST_DATA:
fid.create_group(grp)
# Write datasets
for dset in TEST_DATA[grp]:
fid[grp][dset] = TEST_DATA[grp][dset]["data"]
# Write dataset attributes
for attr in TEST_DATA[grp][dset]["attrs"]:
fid[grp][dset].attrs[attr] = \
TEST_DATA[grp][dset]["attrs"][attr]
[docs]
class TestIasiL2(unittest.TestCase):
"""Test IASI L2 reader."""
[docs]
def setUp(self):
"""Create temporary data to test on."""
import datetime as dt
import tempfile
from satpy.readers.iasi_l2 import IASIL2HDF5
self.base_dir = tempfile.mkdtemp()
save_test_data(self.base_dir)
self.fname = os.path.join(self.base_dir, FNAME)
self.fname_info = {"start_time": dt.datetime(2017, 9, 20, 10, 22, 17),
"end_time": dt.datetime(2017, 9, 20, 10, 29, 12),
"processing_time": dt.datetime(2017, 9, 20, 10, 35, 59),
"processing_location": "kan",
"long_platform_id": "metopb",
"instrument": "iasi",
"platform_id": "M01"}
self.ftype_info = {"file_reader": IASIL2HDF5,
"file_patterns": ["{fname}.hdf"],
"file_type": "iasi_l2_hdf5"}
self.reader = IASIL2HDF5(self.fname, self.fname_info, self.ftype_info)
[docs]
def tearDown(self):
"""Remove the temporary directory created for a test."""
try:
import shutil
shutil.rmtree(self.base_dir, ignore_errors=True)
except OSError:
pass
[docs]
def test_scene(self):
"""Test scene creation."""
from satpy import Scene
fname = os.path.join(self.base_dir, FNAME)
scn = Scene(reader="iasi_l2", filenames=[fname])
assert scn.start_time is not None
assert scn.end_time is not None
assert scn.sensor_names
assert "iasi" in scn.sensor_names
[docs]
def test_scene_load_available_datasets(self):
"""Test that all datasets are available."""
from satpy import Scene
fname = os.path.join(self.base_dir, FNAME)
scn = Scene(reader="iasi_l2", filenames=[fname])
scn.load(scn.available_dataset_names())
[docs]
def test_scene_load_pressure(self):
"""Test loading pressure data."""
from satpy import Scene
fname = os.path.join(self.base_dir, FNAME)
scn = Scene(reader="iasi_l2", filenames=[fname])
scn.load(["pressure"])
pres = scn["pressure"].compute()
self.check_pressure(pres, scn.attrs)
[docs]
def test_scene_load_emissivity(self):
"""Test loading emissivity data."""
from satpy import Scene
fname = os.path.join(self.base_dir, FNAME)
scn = Scene(reader="iasi_l2", filenames=[fname])
scn.load(["emissivity"])
emis = scn["emissivity"].compute()
self.check_emissivity(emis)
[docs]
def test_scene_load_sensing_times(self):
"""Test loading sensing times."""
from satpy import Scene
fname = os.path.join(self.base_dir, FNAME)
scn = Scene(reader="iasi_l2", filenames=[fname])
scn.load(["sensing_time"])
times = scn["sensing_time"].compute()
self.check_sensing_times(times)
[docs]
def test_init(self):
"""Test reader initialization."""
assert self.reader.filename == self.fname
assert self.reader.finfo == self.fname_info
assert self.reader.lons is None
assert self.reader.lats is None
assert self.reader.mda["platform_name"] == "Metop-B"
assert self.reader.mda["sensor"] == "iasi"
[docs]
def test_time_properties(self):
"""Test time properties."""
import datetime as dt
assert isinstance(self.reader.start_time, dt.datetime)
assert isinstance(self.reader.end_time, dt.datetime)
[docs]
def test_get_dataset(self):
"""Test get_dataset() for different datasets."""
from satpy.tests.utils import make_dataid
info = {"eggs": "spam"}
key = make_dataid(name="pressure")
data = self.reader.get_dataset(key, info).compute()
self.check_pressure(data)
assert "eggs" in data.attrs
assert data.attrs["eggs"] == "spam"
key = make_dataid(name="emissivity")
data = self.reader.get_dataset(key, info).compute()
self.check_emissivity(data)
key = make_dataid(name="sensing_time")
data = self.reader.get_dataset(key, info).compute()
assert data.shape == (NUM_SCANLINES, SCAN_WIDTH)
[docs]
def check_pressure(self, pres, attrs=None):
"""Test reading pressure dataset.
Helper function.
"""
assert np.all(pres == 0.0)
assert pres.x.size == SCAN_WIDTH
assert pres.y.size == NUM_SCANLINES
assert pres.level.size == NUM_LEVELS
if attrs:
assert pres.attrs["start_time"] == attrs["start_time"]
assert pres.attrs["end_time"] == attrs["end_time"]
assert "long_name" in pres.attrs
assert "units" in pres.attrs
[docs]
def check_emissivity(self, emis):
"""Test reading emissivity dataset.
Helper function.
"""
assert np.all(emis == 0.0)
assert emis.x.size == SCAN_WIDTH
assert emis.y.size == NUM_SCANLINES
assert "emissivity_wavenumbers" in emis.attrs
[docs]
def check_sensing_times(self, times):
"""Test reading sensing times.
Helper function.
"""
# Times should be equal in blocks of four, but not beyond, so
# there should be SCAN_WIDTH/4 different values
for i in range(int(SCAN_WIDTH / 4)):
assert np.unique(times[0, i * 4:i * 4 + 4]).size == 1
assert np.unique(times[0, :]).size == SCAN_WIDTH / 4
[docs]
def test_read_dataset(self):
"""Test read_dataset() function."""
import h5py
from satpy.readers.iasi_l2 import read_dataset
from satpy.tests.utils import make_dataid
with h5py.File(self.fname, "r") as fid:
key = make_dataid(name="pressure")
data = read_dataset(fid, key).compute()
self.check_pressure(data)
key = make_dataid(name="emissivity")
data = read_dataset(fid, key).compute()
self.check_emissivity(data)
# This dataset doesn't have any attributes
key = make_dataid(name="ozone_total_column")
data = read_dataset(fid, key).compute()
assert len(data.attrs) == 0
[docs]
def test_read_geo(self):
"""Test read_geo() function."""
import h5py
from satpy.readers.iasi_l2 import read_geo
from satpy.tests.utils import make_dataid
with h5py.File(self.fname, "r") as fid:
key = make_dataid(name="sensing_time")
data = read_geo(fid, key).compute()
assert data.shape == (NUM_SCANLINES, SCAN_WIDTH)
key = make_dataid(name="latitude")
data = read_geo(fid, key).compute()
assert data.shape == (NUM_SCANLINES, SCAN_WIDTH)
[docs]
@pytest.fixture
def fake_iasi_l2_cdr_nc_dataset():
"""Create minimally fake IASI L2 CDR NC dataset."""
shp = (3, 4, 5)
fv = -999
dims = ("scan_lines", "pixels", "vertical_levels")
coords2 = "latitude longitude"
coords3 = "latitude longitude pressure_levels"
lons = xr.DataArray(
np.array([[0, 0, 0, 0], [1, 1, 1, 1],
[2, 2, 2, 2]], dtype="float32"),
dims=dims[:2],
attrs={"coordinates": coords2,
"standard_name": "longitude"})
lats = xr.DataArray(
np.array([[3, 3, 3, 3], [2, 2, 2, 2],
[1, 1, 1, 1]], dtype="float32"),
dims=dims[:2],
attrs={"coordinates": coords2,
"standard_name": "latitude"})
pres = xr.DataArray(
np.linspace(0, 1050, math.prod(shp), dtype="float32").reshape(shp),
dims=dims,
attrs={"coordinates": coords3})
temps = np.linspace(100, 400, math.prod(shp), dtype="float32").reshape(shp)
temps[0, 0, 0] = fv
temp = xr.DataArray(
temps, dims=dims,
attrs={"coordinates": coords3, "_FillValue": fv, "units": "K"})
iasibad = xr.DataArray(
np.zeros(shp[:2], dtype="uint8"),
dims=dims[:2],
attrs={"coordinates": coords2,
"standard_name": "flag_information_IASI_L1c"})
iasibad[0, 0] = 1
cf = xr.DataArray(
np.zeros(shp[:2], dtype="uint8"),
dims=dims[:2],
attrs={"coordinates": coords2,
"standard_name": "cloud_area_fraction",
"_FillValue": 255,
"valid_min": 0,
"valid_max": 100})
return xr.Dataset(
{"T": temp, "FLG_IASIBAD": iasibad, "CloudFraction": cf},
coords={
"longitude": lons,
"latitude": lats,
"pressure_levels": pres})
[docs]
@pytest.fixture
def fake_iasi_l2_cdr_nc_file(fake_iasi_l2_cdr_nc_dataset, tmp_path):
"""Write a NetCDF file with minimal fake IASI L2 CDR NC data."""
fn = ("W_XX-EUMETSAT-Darmstadt,HYPERSPECT+SOUNDING,METOPA+PW3+"
"IASI_C_EUMP_19210624090000Z_19210623090100Z_eps_r_l2_0101.nc")
of = tmp_path / fn
fake_iasi_l2_cdr_nc_dataset.to_netcdf(of)
return os.fspath(of)
[docs]
def test_iasi_l2_cdr_nc(fake_iasi_l2_cdr_nc_file):
"""Test the IASI L2 CDR NC reader."""
from satpy import Scene
sc = Scene(filenames=[fake_iasi_l2_cdr_nc_file], reader=["iasi_l2_cdr_nc"])
sc.load(["T", "FLG_IASIBAD", "CloudFraction"])
assert sc["T"].dims == ("y", "x", "vertical_levels")
assert sc["T"].shape == (3, 4, 5)
assert sc["T"].attrs["area"].shape == (3, 4)
(lons, lats) = sc["T"].attrs["area"].get_lonlats()
np.testing.assert_array_equal(
lons,
np.array([[0, 0, 0, 0], [1, 1, 1, 1],
[2, 2, 2, 2]]))
assert np.isnan(sc["T"][0, 0, 0])
assert sc["FLG_IASIBAD"][0, 0] == 1
assert sc["CloudFraction"].dtype == np.dtype("uint8")
assert sc["T"].attrs["units"] == "K"