#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2017-2020 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""The HRIT msg reader tests package."""
import datetime as dt
from unittest import mock
import numpy as np
import pytest
import xarray as xr
from satpy.readers.seviri_l1b_nc import NCSEVIRIFileHandler
from satpy.tests.reader_tests.test_seviri_base import ORBIT_POLYNOMIALS
from satpy.tests.reader_tests.test_seviri_l1b_calibration import TestFileHandlerCalibrationBase
from satpy.tests.utils import assert_attrs_equal, make_dataid
channel_keys_dict = {"VIS006": "ch1", "IR_108": "ch9"}
[docs]
def to_cds_time(time):
"""Convert datetime to (days, msecs) since 1958-01-01."""
if isinstance(time, dt.datetime):
time = np.datetime64(time)
t0 = np.datetime64("1958-01-01 00:00")
delta = time - t0
days = (delta / np.timedelta64(1, "D")).astype(int)
msecs = delta / np.timedelta64(1, "ms") - days * 24 * 3600 * 1E3
return days, msecs
[docs]
class TestNCSEVIRIFileHandler(TestFileHandlerCalibrationBase):
"""Unit tests for SEVIRI netCDF reader."""
[docs]
def _get_fake_dataset(self, counts, h5netcdf):
"""Create a fake dataset.
Args:
counts (xr.DataArray):
Array with data.
h5netcdf (boolean):
If True an array attribute will be created which is common
for the h5netcdf backend in xarray for scalar values.
"""
acq_time_day = np.repeat([1, 1], 11).reshape(2, 11)
acq_time_msec = np.repeat([1000, 2000], 11).reshape(2, 11)
line_validity = np.repeat([3, 3], 11).reshape(2, 11)
line_geom_radio_quality = np.repeat([4, 4], 11).reshape(2, 11)
orbit_poly_start_day, orbit_poly_start_msec = to_cds_time(
np.array([dt.datetime(2019, 12, 31, 18),
dt.datetime(2019, 12, 31, 22)],
dtype="datetime64")
)
orbit_poly_end_day, orbit_poly_end_msec = to_cds_time(
np.array([dt.datetime(2019, 12, 31, 22),
dt.datetime(2020, 1, 1, 2)],
dtype="datetime64")
)
counts = counts.rename({
"y": "num_rows_vis_ir",
"x": "num_columns_vis_ir"
})
scan_time_days, scan_time_msecs = to_cds_time(self.scan_time)
ds = xr.Dataset(
{
"ch1": counts.copy(),
"ch9": counts.copy(),
"HRV": (("num_rows_hrv", "num_columns_hrv"), [[1, 2, 3],
[4, 5, 6],
[7, 8, 9]]),
"planned_chan_processing": self.radiance_types,
"channel_data_visir_data_l10_line_mean_acquisition_time_day": (
("num_rows_vis_ir", "channels_vis_ir_dim"),
acq_time_day
),
"channel_data_visir_data_l10_line_mean_acquisition_msec": (
("num_rows_vis_ir", "channels_vis_ir_dim"),
acq_time_msec
),
"channel_data_visir_data_line_validity": (
("num_rows_vis_ir", "channels_vis_ir_dim"),
line_validity
),
"channel_data_visir_data_line_geometric_quality": (
("num_rows_vis_ir", "channels_vis_ir_dim"),
line_geom_radio_quality
),
"channel_data_visir_data_line_radiometric_quality": (
("num_rows_vis_ir", "channels_vis_ir_dim"),
line_geom_radio_quality
),
"orbit_polynomial_x": (
("orbit_polynomial_dim_row",
"orbit_polynomial_dim_col"),
ORBIT_POLYNOMIALS["X"][0:2]
),
"orbit_polynomial_y": (
("orbit_polynomial_dim_row",
"orbit_polynomial_dim_col"),
ORBIT_POLYNOMIALS["Y"][0:2]
),
"orbit_polynomial_z": (
("orbit_polynomial_dim_row",
"orbit_polynomial_dim_col"),
ORBIT_POLYNOMIALS["Z"][0:2]
),
"orbit_polynomial_start_time_day": (
"orbit_polynomial_dim_row",
orbit_poly_start_day
),
"orbit_polynomial_start_time_msec": (
"orbit_polynomial_dim_row",
orbit_poly_start_msec
),
"orbit_polynomial_end_time_day": (
"orbit_polynomial_dim_row",
orbit_poly_end_day
),
"orbit_polynomial_end_time_msec": (
"orbit_polynomial_dim_row",
orbit_poly_end_msec
),
},
attrs={
"equatorial_radius": 6378.169,
"north_polar_radius": 6356.5838,
"south_polar_radius": 6356.5838,
"longitude_of_SSP": 0.0,
"nominal_longitude": -3.5,
"satellite_id": self.platform_id,
"true_repeat_cycle_start_day": scan_time_days,
"true_repeat_cycle_start_mi_sec": scan_time_msecs,
"planned_repeat_cycle_end_day": scan_time_days,
"planned_repeat_cycle_end_mi_sec": scan_time_msecs,
"north_most_line": 3712,
"east_most_pixel": 1,
"west_most_pixel": 3712,
"south_most_line": 1,
"vis_ir_grid_origin": 0,
"vis_ir_column_dir_grid_step": 3.0004032,
"vis_ir_line_dir_grid_step": 3.0004032,
"type_of_earth_model": "0x02",
"nominal_image_scanning": "T",
}
)
if h5netcdf:
nattrs = {"equatorial_radius": np.array([6378.169]),
"north_polar_radius": np.array([6356.5838]),
"south_polar_radius": np.array([6356.5838]),
"longitude_of_SSP": np.array([0.0]),
"vis_ir_column_dir_grid_step": np.array([3.0004032]),
"vis_ir_line_dir_grid_step": np.array([3.0004032])
}
ds.attrs.update(nattrs)
ds["ch1"].attrs.update({
"scale_factor": self.gains_nominal[0],
"add_offset": self.offsets_nominal[0]
})
# IR_108 is dataset with key ch9
ds["ch9"].attrs.update({
"scale_factor": self.gains_nominal[8],
"add_offset": self.offsets_nominal[8],
})
# Add some attributes so that the reader can strip them
strip_attrs = {
"comment": None,
"long_name": None,
"valid_min": None,
"valid_max": None
}
for name in ["ch1", "ch9"]:
ds[name].attrs.update(strip_attrs)
return ds
[docs]
@pytest.fixture
def h5netcdf(self):
"""Fixture for xr backend choice."""
return False
[docs]
@pytest.fixture(name="file_handler")
def file_handler(self, counts, h5netcdf):
"""Create a mocked file handler."""
with mock.patch(
"satpy.readers.seviri_l1b_nc.open_dataset",
return_value=self._get_fake_dataset(counts=counts, h5netcdf=h5netcdf)
):
return NCSEVIRIFileHandler(
"filename",
{"platform_shortname": "MSG3",
"start_time": self.scan_time,
"service": "MSG"},
{"filetype": "info"}
)
[docs]
@pytest.mark.parametrize(
("channel", "calibration", "use_ext_coefs"),
[
# VIS channel, internal coefficients
("VIS006", "counts", False),
("VIS006", "radiance", False),
("VIS006", "reflectance", False),
# VIS channel, external coefficients
("VIS006", "radiance", True),
("VIS006", "reflectance", True),
# IR channel, internal coefficients
("IR_108", "counts", False),
("IR_108", "radiance", False),
("IR_108", "brightness_temperature", False),
# IR channel, external coefficients
("IR_108", "radiance", True),
("IR_108", "brightness_temperature", True),
# FUTURE: Enable once HRV reading has been fixed.
# # HRV channel, internal coefficiens
# ('HRV', 'counts', False),
# ('HRV', 'radiance', False),
# ('HRV', 'reflectance', False),
# # HRV channel, external coefficients (mode should have no effect)
# ('HRV', 'radiance', True),
# ('HRV', 'reflectance', True),
]
)
def test_calibrate(
self, file_handler, channel, calibration, use_ext_coefs
):
"""Test the calibration."""
external_coefs = self.external_coefs if use_ext_coefs else {}
expected = self._get_expected(
channel=channel,
calibration=calibration,
calib_mode="NOMINAL",
use_ext_coefs=use_ext_coefs
)
fh = file_handler
fh.ext_calib_coefs = external_coefs
dataset_id = make_dataid(name=channel, calibration=calibration)
key = channel_keys_dict[channel]
res = fh.calibrate(fh.nc[key], dataset_id)
xr.testing.assert_allclose(res, expected)
[docs]
def test_mask_bad_quality(self, file_handler):
"""Test masking of bad quality scan lines."""
channel = "VIS006"
key = channel_keys_dict[channel]
dataset_info = {
"nc_key": key,
"units": "units",
"wavelength": "wavelength",
"standard_name": "standard_name"
}
expected = self._get_expected(
channel=channel,
calibration="radiance",
calib_mode="NOMINAL",
use_ext_coefs=False
)
fh = file_handler
res = fh._mask_bad_quality(fh.nc[key], dataset_info)
new_data = np.zeros_like(expected.data).astype("float32")
new_data[:, :] = np.nan
expected = expected.copy(data=new_data)
xr.testing.assert_allclose(res, expected)
[docs]
@pytest.mark.parametrize(
("channel", "calibration", "mask_bad_quality_scan_lines"),
[
("VIS006", "reflectance", True),
("VIS006", "reflectance", False),
("IR_108", "brightness_temperature", True)
]
)
def test_get_dataset(self, file_handler, channel, calibration, mask_bad_quality_scan_lines):
"""Test getting the dataset."""
dataset_id = make_dataid(name=channel, calibration=calibration)
key = channel_keys_dict[channel]
dataset_info = {
"nc_key": key,
"units": "units",
"wavelength": "wavelength",
"standard_name": "standard_name"
}
file_handler.mask_bad_quality_scan_lines = mask_bad_quality_scan_lines
res = file_handler.get_dataset(dataset_id, dataset_info)
# Test scanline acquisition times
expected = self._get_expected(
channel=channel,
calibration=calibration,
calib_mode="NOMINAL",
use_ext_coefs=False
)
expected.attrs = {
"orbital_parameters": {
"satellite_actual_longitude": -3.541742131915741,
"satellite_actual_latitude": -0.5203765167594427,
"satellite_actual_altitude": 35783419.16135868,
"satellite_nominal_longitude": -3.5,
"satellite_nominal_latitude": 0.0,
"projection_longitude": 0.0,
"projection_latitude": 0.0,
"projection_altitude": 35785831.0
},
"time_parameters": {
"nominal_start_time": dt.datetime(2020, 1, 1, 0, 0),
"nominal_end_time": dt.datetime(2020, 1, 1, 0, 0),
"observation_start_time": dt.datetime(2020, 1, 1, 0, 0),
"observation_end_time": dt.datetime(2020, 1, 1, 0, 0),
},
"georef_offset_corrected": True,
"platform_name": "Meteosat-11",
"sensor": "seviri",
"units": "units",
"wavelength": "wavelength",
"standard_name": "standard_name"
}
expected["acq_time"] = ("y", [np.datetime64("1958-01-02 00:00:01").astype("datetime64[ns]"),
np.datetime64("1958-01-02 00:00:02").astype("datetime64[ns]")])
expected = expected[::-1] # reader flips data upside down
if mask_bad_quality_scan_lines:
expected = file_handler._mask_bad_quality(expected, dataset_info)
xr.testing.assert_allclose(res, expected)
for key in ["sun_earth_distance_correction_applied",
"sun_earth_distance_correction_factor"]:
res.attrs.pop(key, None)
assert_attrs_equal(res.attrs, expected.attrs, tolerance=1e-4)
[docs]
def test_time(self, file_handler):
"""Test start/end nominal/observation time handling."""
assert dt.datetime(2020, 1, 1, 0, 0) == file_handler.observation_start_time
assert dt.datetime(2020, 1, 1, 0, 0) == file_handler.start_time
assert file_handler.start_time == file_handler.nominal_start_time
assert dt.datetime(2020, 1, 1, 0, 0) == file_handler.observation_end_time
assert file_handler.end_time == file_handler.nominal_end_time
assert dt.datetime(2020, 1, 1, 0, 0) == file_handler.end_time
[docs]
def test_repeat_cycle_duration(self, file_handler):
"""Test repeat cycle handling for FD or ReduscedScan."""
assert 15 == file_handler._repeat_cycle_duration
# Change the reducescan scenario to test the repeat cycle duration handling
file_handler.nc.attrs["nominal_image_scanning"] = ""
file_handler.nc.attrs["reduced_scanning"] = "T"
# file_handler.trailer['15TRAILER']['ImageProductionStats']['ActualScanningSummary']['ReducedScan'] = 1
assert 5 == file_handler._repeat_cycle_duration
[docs]
def test_satpos_no_valid_orbit_polynomial(self, file_handler):
"""Test satellite position if there is no valid orbit polynomial."""
dataset_id = make_dataid(name="VIS006", calibration="counts")
dataset_info = {
"name": "VIS006",
"nc_key": "ch1",
"units": "units",
"wavelength": "wavelength",
"standard_name": "standard_name"
}
file_handler.nc["orbit_polynomial_start_time_day"] = 0
file_handler.nc["orbit_polynomial_end_time_day"] = 0
with pytest.warns(UserWarning, match=r"No orbit polynomial valid for"):
res = file_handler.get_dataset(dataset_id, dataset_info)
assert "satellite_actual_longitude" not in res.attrs[
"orbital_parameters"]
[docs]
@pytest.mark.parametrize("h5netcdf", [True])
def test_h5netcdf_pecularity(self, file_handler, h5netcdf):
"""Test conversion of attributes when xarray is used with h5netcdf backend."""
fh = file_handler
assert isinstance(fh.mda["projection_parameters"]["a"], float)