# Copyright (c) 2017-2023 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Unit testing the enhancements functions, e.g. cira_stretch."""
import contextlib
import os
from tempfile import NamedTemporaryFile
from unittest import mock
import dask.array as da
import numpy as np
import pytest
import xarray as xr
from satpy.enhancements import create_colormap, on_dask_array, on_separate_bands, using_map_blocks
# NOTE:
# The following fixtures are not defined in this file, but are used and injected by Pytest:
# - tmp_path
[docs]
def run_and_check_enhancement(func, data, expected, **kwargs):
"""Perform basic checks that apply to multiple tests."""
pre_attrs = data.attrs
img = _get_enhanced_image(func, data, **kwargs)
_assert_image(img, pre_attrs, func.__name__, "palettes" in kwargs)
_assert_image_data(img, expected)
[docs]
def _get_enhanced_image(func, data, **kwargs):
from trollimage.xrimage import XRImage
img = XRImage(data)
func(img, **kwargs)
return img
[docs]
def _assert_image(img, pre_attrs, func_name, has_palette):
assert isinstance(img.data, xr.DataArray)
assert isinstance(img.data.data, da.Array)
old_keys = set(pre_attrs.keys())
# It is OK to have "enhancement_history" added
new_keys = set(img.data.attrs.keys()) - {"enhancement_history"}
# In case of palettes are used, _FillValue is added.
# Colorize doesn't add the fill value, so ignore that
if has_palette and func_name != "colorize":
assert "_FillValue" in new_keys
# Remove it from further comparisons
new_keys = new_keys - {"_FillValue"}
assert old_keys == new_keys
[docs]
def _assert_image_data(img, expected, dtype=None):
# Compute the data to mimic what xrimage geotiff writing does
res_data = img.data.data.compute()
assert not isinstance(res_data, da.Array)
np.testing.assert_allclose(res_data, expected, atol=1.e-6, rtol=0)
if dtype:
assert img.data.dtype == dtype
assert res_data.dtype == dtype
[docs]
def run_and_check_enhancement_with_dtype(func, data, expected, **kwargs):
"""Perform basic checks that apply to multiple tests."""
pre_attrs = data.attrs
img = _get_enhanced_image(func, data, **kwargs)
_assert_image(img, pre_attrs, func.__name__, "palettes" in kwargs)
_assert_image_data(img, expected, dtype=data.dtype)
[docs]
def identical_decorator(func):
"""Decorate but do nothing."""
return func
[docs]
class TestEnhancementStretch:
"""Class for testing enhancements in satpy.enhancements."""
[docs]
def setup_method(self):
"""Create test data used by every test."""
data = np.arange(-210, 790, 100).reshape((2, 5)) * 0.95
data[0, 0] = np.nan # one bad value for testing
crefl_data = np.arange(-210, 790, 100).reshape((2, 5)) * 0.95
crefl_data /= 5.605
crefl_data[0, 0] = np.nan # one bad value for testing
crefl_data[0, 1] = 0.
self.ch1 = xr.DataArray(da.from_array(data, chunks=2), dims=("y", "x"), attrs={"test": "test"})
self.ch2 = xr.DataArray(da.from_array(crefl_data, chunks=2), dims=("y", "x"), attrs={"test": "test"})
rgb_data = np.stack([data, data, data])
self.rgb = xr.DataArray(da.from_array(rgb_data, chunks=(3, 2, 2)),
dims=("bands", "y", "x"),
coords={"bands": ["R", "G", "B"]})
[docs]
@pytest.mark.parametrize(
("decorator", "exp_call_cls"),
[
(identical_decorator, xr.DataArray),
(on_dask_array, da.Array),
(using_map_blocks, np.ndarray),
],
)
@pytest.mark.parametrize("input_data_name", ["ch1", "ch2", "rgb"])
def test_apply_enhancement(self, input_data_name, decorator, exp_call_cls):
"""Test the 'apply_enhancement' utility function."""
def _enh_func(img):
def _calc_func(data):
assert isinstance(data, exp_call_cls)
return data
decorated_func = decorator(_calc_func)
return decorated_func(img.data)
in_data = getattr(self, input_data_name)
exp_data = in_data.values
if "bands" not in in_data.coords:
exp_data = exp_data[np.newaxis, :, :]
run_and_check_enhancement(_enh_func, in_data, exp_data)
[docs]
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
def test_cira_stretch(self, dtype):
"""Test applying the cira_stretch."""
from satpy.enhancements import cira_stretch
expected = np.array([[
[np.nan, -7.04045974, -7.04045974, 0.79630132, 0.95947296],
[1.05181359, 1.11651012, 1.16635571, 1.20691137, 1.24110186]]], dtype=dtype)
run_and_check_enhancement_with_dtype(cira_stretch, self.ch1.astype(dtype), expected)
[docs]
def test_reinhard(self):
"""Test the reinhard algorithm."""
from satpy.enhancements import reinhard_to_srgb
expected = np.array([[[np.nan, 0., 0., 0.93333793, 1.29432402],
[1.55428709, 1.76572249, 1.94738635, 2.10848544, 2.25432809]],
[[np.nan, 0., 0., 0.93333793, 1.29432402],
[1.55428709, 1.76572249, 1.94738635, 2.10848544, 2.25432809]],
[[np.nan, 0., 0., 0.93333793, 1.29432402],
[1.55428709, 1.76572249, 1.94738635, 2.10848544, 2.25432809]]])
run_and_check_enhancement(reinhard_to_srgb, self.rgb, expected)
[docs]
def test_lookup(self):
"""Test the lookup enhancement function."""
from satpy.enhancements import lookup
expected = np.array([[
[0., 0., 0., 0.333333, 0.705882],
[1., 1., 1., 1., 1.]]])
lut = np.arange(256.)
run_and_check_enhancement(lookup, self.ch1, expected, luts=lut)
expected = np.array([[[0., 0., 0., 0.333333, 0.705882],
[1., 1., 1., 1., 1.]],
[[0., 0., 0., 0.333333, 0.705882],
[1., 1., 1., 1., 1.]],
[[0., 0., 0., 0.333333, 0.705882],
[1., 1., 1., 1., 1.]]])
lut = np.arange(256.)
lut = np.vstack((lut, lut, lut)).T
run_and_check_enhancement(lookup, self.rgb, expected, luts=lut)
[docs]
def test_colorize(self):
"""Test the colorize enhancement function."""
from trollimage.colormap import brbg
from satpy.enhancements import colorize
expected = np.array([
[[np.nan, 3.29411723e-01, 3.29411723e-01, 3.21825881e-08, 3.21825881e-08],
[3.21825881e-08, 3.21825881e-08, 3.21825881e-08, 3.21825881e-08, 3.21825881e-08]],
[[np.nan, 1.88235327e-01, 1.88235327e-01, 2.35294109e-01, 2.35294109e-01],
[2.35294109e-01, 2.35294109e-01, 2.35294109e-01, 2.35294109e-01, 2.35294109e-01]],
[[np.nan, 1.96078164e-02, 1.96078164e-02, 1.88235281e-01, 1.88235281e-01],
[1.88235281e-01, 1.88235281e-01, 1.88235281e-01, 1.88235281e-01, 1.88235281e-01]]])
run_and_check_enhancement(colorize, self.ch1, expected, palettes=brbg)
[docs]
def test_palettize(self):
"""Test the palettize enhancement function."""
from trollimage.colormap import brbg
from satpy.enhancements import palettize
expected = np.array([[[10, 0, 0, 10, 10], [10, 10, 10, 10, 10]]])
run_and_check_enhancement(palettize, self.ch1, expected, palettes=brbg)
[docs]
def test_three_d_effect(self):
"""Test the three_d_effect enhancement function."""
from satpy.enhancements import three_d_effect
expected = np.array([[
[np.nan, np.nan, -389.5, -294.5, 826.5],
[np.nan, np.nan, 85.5, 180.5, 1301.5]]])
run_and_check_enhancement(three_d_effect, self.ch1, expected)
[docs]
def test_piecewise_linear_stretch(self):
"""Test the piecewise_linear_stretch enhancement function."""
from satpy.enhancements import piecewise_linear_stretch
expected = np.array([[
[np.nan, 0., 0., 0.44378, 0.631734],
[0.737562, 0.825041, 0.912521, 1., 1.]]])
run_and_check_enhancement(piecewise_linear_stretch,
self.ch2 / 100.0,
expected,
xp=[0., 25., 55., 100., 255.],
fp=[0., 90., 140., 175., 255.],
reference_scale_factor=255,
)
[docs]
def test_btemp_threshold(self):
"""Test applying the cira_stretch."""
from satpy.enhancements import btemp_threshold
expected = np.array([[
[np.nan, 0.946207, 0.892695, 0.839184, 0.785672],
[0.73216, 0.595869, 0.158745, -0.278379, -0.715503]]])
run_and_check_enhancement(btemp_threshold, self.ch1, expected,
min_in=-200, max_in=500, threshold=350)
[docs]
def test_merge_colormaps(self):
"""Test merging colormaps."""
from trollimage.colormap import Colormap
from satpy.enhancements import _merge_colormaps as mcp
from satpy.enhancements import create_colormap
ret_map = mock.MagicMock()
create_colormap_mock = mock.Mock(wraps=create_colormap)
cmap1 = Colormap((1, (1., 1., 1.)))
kwargs = {"palettes": cmap1}
with mock.patch("satpy.enhancements.create_colormap", create_colormap_mock):
res = mcp(kwargs)
assert res is cmap1
create_colormap_mock.assert_not_called()
create_colormap_mock.reset_mock()
ret_map.reset_mock()
cmap1 = {"colors": "blues", "min_value": 0,
"max_value": 1}
kwargs = {"palettes": [cmap1]}
with mock.patch("satpy.enhancements.create_colormap", create_colormap_mock), \
mock.patch("trollimage.colormap.blues", ret_map):
_ = mcp(kwargs)
create_colormap_mock.assert_called_once()
ret_map.reverse.assert_not_called()
ret_map.set_range.assert_called_with(0, 1)
create_colormap_mock.reset_mock()
ret_map.reset_mock()
cmap2 = {"colors": "blues", "min_value": 2,
"max_value": 3, "reverse": True}
kwargs = {"palettes": [cmap2]}
with mock.patch("trollimage.colormap.blues", ret_map):
_ = mcp(kwargs)
ret_map.reverse.assert_called_once()
ret_map.set_range.assert_called_with(2, 3)
create_colormap_mock.reset_mock()
ret_map.reset_mock()
kwargs = {"palettes": [cmap1, cmap2]}
with mock.patch("trollimage.colormap.blues", ret_map):
_ = mcp(kwargs)
ret_map.__add__.assert_called_once()
[docs]
def tearDown(self):
"""Clean up."""
[docs]
@contextlib.contextmanager
def closed_named_temp_file(**kwargs):
"""Named temporary file context manager that closes the file after creation.
This helps with Windows systems which can get upset with opening or
deleting a file that is already open.
"""
try:
with NamedTemporaryFile(delete=False, **kwargs) as tmp_cmap:
yield tmp_cmap.name
finally:
os.remove(tmp_cmap.name)
[docs]
def _write_cmap_to_file(cmap_filename, cmap_data):
ext = os.path.splitext(cmap_filename)[1]
if ext in (".npy",):
np.save(cmap_filename, cmap_data)
elif ext in (".npz",):
np.savez(cmap_filename, cmap_data)
else:
np.savetxt(cmap_filename, cmap_data, delimiter=",")
[docs]
def _generate_cmap_test_data(color_scale, colormap_mode):
cmap_data = np.array([
[1, 0, 0],
[1, 1, 0],
[1, 1, 1],
[0, 0, 1],
], dtype=np.float64)
if len(colormap_mode) != 3:
_cmap_data = cmap_data
cmap_data = np.empty((cmap_data.shape[0], len(colormap_mode)),
dtype=np.float64)
if colormap_mode.startswith("V") or colormap_mode.endswith("A"):
cmap_data[:, 0] = np.array([128, 130, 132, 134]) / 255.0
cmap_data[:, -3:] = _cmap_data
if colormap_mode.startswith("V") and colormap_mode.endswith("A"):
cmap_data[:, 1] = np.array([128, 130, 132, 134]) / 255.0
if color_scale is None or color_scale == 255:
cmap_data = (cmap_data * 255).astype(np.uint8)
return cmap_data
[docs]
class TestColormapLoading:
"""Test utilities used with colormaps."""
[docs]
@pytest.mark.parametrize("color_scale", [None, 1.0])
@pytest.mark.parametrize("colormap_mode", ["RGB", "VRGB", "VRGBA"])
@pytest.mark.parametrize("extra_kwargs",
[
{},
{"min_value": 50, "max_value": 100},
])
@pytest.mark.parametrize("filename_suffix", [".npy", ".npz", ".csv"])
def test_cmap_from_file(self, color_scale, colormap_mode, extra_kwargs, filename_suffix):
"""Test that colormaps can be loaded from a binary file."""
# create the colormap file on disk
with closed_named_temp_file(suffix=filename_suffix) as cmap_filename:
cmap_data = _generate_cmap_test_data(color_scale, colormap_mode)
_write_cmap_to_file(cmap_filename, cmap_data)
unset_first_value = 128.0 / 255.0 if colormap_mode.startswith("V") else 0.0
unset_last_value = 134.0 / 255.0 if colormap_mode.startswith("V") else 1.0
if (color_scale is None or color_scale == 255) and colormap_mode.startswith("V"):
unset_first_value *= 255
unset_last_value *= 255
if "min_value" in extra_kwargs:
unset_first_value = extra_kwargs["min_value"]
unset_last_value = extra_kwargs["max_value"]
first_color = [1.0, 0.0, 0.0]
if colormap_mode == "VRGBA":
first_color = [128.0 / 255.0] + first_color
kwargs1 = {"filename": cmap_filename}
kwargs1.update(extra_kwargs)
if color_scale is not None:
kwargs1["color_scale"] = color_scale
cmap = create_colormap(kwargs1)
assert cmap.colors.shape[0] == 4
np.testing.assert_equal(cmap.colors[0], first_color)
assert cmap.values.shape[0] == 4
assert cmap.values[0] == unset_first_value
assert cmap.values[-1] == unset_last_value
[docs]
def test_cmap_vrgb_as_rgba(self):
"""Test that data created as VRGB still reads as RGBA."""
with closed_named_temp_file(suffix=".npy") as cmap_filename:
cmap_data = _generate_cmap_test_data(None, "VRGB")
np.save(cmap_filename, cmap_data)
cmap = create_colormap({"filename": cmap_filename, "colormap_mode": "RGBA"})
assert cmap.colors.shape[0] == 4
assert cmap.colors.shape[1] == 4 # RGBA
np.testing.assert_equal(cmap.colors[0], [128 / 255., 1.0, 0, 0])
assert cmap.values.shape[0] == 4
assert cmap.values[0] == 0
assert cmap.values[-1] == 1.0
[docs]
def test_cmap_with_alpha_set(self):
"""Test that the min_alpha and max_alpha arguments set the alpha channel correctly."""
with closed_named_temp_file(suffix=".npy") as cmap_filename:
cmap_data = _generate_cmap_test_data(None, "RGB")
np.save(cmap_filename, cmap_data)
cmap = create_colormap({"filename": cmap_filename, "min_alpha": 100, "max_alpha": 255})
assert cmap.colors.shape[0] == 4
assert cmap.colors.shape[1] == 4 # RGBA
# check that we start from min_alpha
np.testing.assert_equal(cmap.colors[0], [1.0, 0, 0, 100/255.])
# two thirds of the linear scale
np.testing.assert_almost_equal(cmap.colors[2], [1., 1., 1., (100+(2/3)*(255-100))/255])
# check that we end at max_alpha
np.testing.assert_equal(cmap.colors[3], [0, 0, 1., 1.0])
# check that values have not been changed
assert cmap.values.shape[0] == 4
assert cmap.values[0] == 0
assert cmap.values[-1] == 1.0
[docs]
@pytest.mark.parametrize("alpha_arg", ["max_alpha", "min_alpha"])
def test_cmap_error_with_only_one_alpha_set(self, alpha_arg):
"""Test that when only min_alpha or max_alpha arguments are set an error is raised."""
with closed_named_temp_file(suffix=".npy") as cmap_filename:
cmap_data = _generate_cmap_test_data(None, "RGB")
np.save(cmap_filename, cmap_data)
# check that if a value is missing we raise a ValueError
with pytest.raises(ValueError, match="Both 'min_alpha' and 'max_alpha' must be specified*."):
create_colormap({"filename": cmap_filename, alpha_arg: 255})
[docs]
@pytest.mark.parametrize(
("real_mode", "forced_mode"),
[
("VRGBA", "RGBA"),
("VRGBA", "VRGB"),
("RGBA", "RGB"),
]
)
@pytest.mark.parametrize("filename_suffix", [".npy", ".csv"])
def test_cmap_bad_mode(self, real_mode, forced_mode, filename_suffix):
"""Test that reading colormaps with the wrong mode fails."""
with closed_named_temp_file(suffix=filename_suffix) as cmap_filename:
cmap_data = _generate_cmap_test_data(None, real_mode)
_write_cmap_to_file(cmap_filename, cmap_data)
# Force colormap_mode VRGBA to RGBA and we should see an exception
with pytest.raises(ValueError, match="Unexpected colormap shape for mode .*"):
create_colormap({"filename": cmap_filename, "colormap_mode": forced_mode})
[docs]
def test_cmap_from_file_bad_shape(self):
"""Test that unknown array shape causes an error."""
from satpy.enhancements import create_colormap
# create the colormap file on disk
with closed_named_temp_file(suffix=".npy") as cmap_filename:
np.save(cmap_filename, np.array([
[0],
[64],
[128],
[255],
]))
with pytest.raises(ValueError, match="Unexpected colormap shape for mode 'None'"):
create_colormap({"filename": cmap_filename})
[docs]
def test_cmap_from_config_path(self, tmp_path):
"""Test loading a colormap relative to a config path."""
import satpy
from satpy.enhancements import create_colormap
cmap_dir = tmp_path / "colormaps"
cmap_dir.mkdir()
cmap_filename = cmap_dir / "my_colormap.npy"
cmap_data = _generate_cmap_test_data(None, "RGBA")
np.save(cmap_filename, cmap_data)
with satpy.config.set(config_path=[tmp_path]):
rel_cmap_filename = os.path.join("colormaps", "my_colormap.npy")
cmap = create_colormap({"filename": rel_cmap_filename, "colormap_mode": "RGBA"})
assert cmap.colors.shape[0] == 4
assert cmap.colors.shape[1] == 4 # RGBA
np.testing.assert_equal(cmap.colors[0], [128 / 255., 1.0, 0, 0])
assert cmap.values.shape[0] == 4
assert cmap.values[0] == 0
assert cmap.values[-1] == 1.0
[docs]
def test_cmap_from_trollimage(self):
"""Test that colormaps in trollimage can be loaded."""
from satpy.enhancements import create_colormap
cmap = create_colormap({"colors": "pubu"})
from trollimage.colormap import pubu
np.testing.assert_equal(cmap.colors, pubu.colors)
np.testing.assert_equal(cmap.values, pubu.values)
[docs]
def test_cmap_no_colormap(self):
"""Test that being unable to create a colormap raises an error."""
from satpy.enhancements import create_colormap
with pytest.raises(ValueError, match="Unknown colormap format: .*"):
create_colormap({})
[docs]
def test_cmap_list(self):
"""Test that colors can be a list/tuple."""
from satpy.enhancements import create_colormap
colors = [
[0., 0., 1.],
[1., 0., 1.],
[0., 1., 1.],
[1., 1., 1.],
]
values = [2, 4, 6, 8]
cmap = create_colormap({"colors": colors, "color_scale": 1})
assert cmap.colors.shape[0] == 4
np.testing.assert_equal(cmap.colors[0], [0.0, 0.0, 1.0])
assert cmap.values.shape[0] == 4
assert cmap.values[0] == 0
assert cmap.values[-1] == 1.0
cmap = create_colormap({"colors": colors, "color_scale": 1, "values": values})
assert cmap.colors.shape[0] == 4
np.testing.assert_equal(cmap.colors[0], [0.0, 0.0, 1.0])
assert cmap.values.shape[0] == 4
assert cmap.values[0] == 2
assert cmap.values[-1] == 8
[docs]
def test_on_separate_bands():
"""Test the `on_separate_bands` decorator."""
def func(array, index, gain=2):
return xr.DataArray(np.ones(array.shape, dtype=array.dtype) * index * gain,
coords=array.coords, dims=array.dims, attrs=array.attrs)
separate_func = on_separate_bands(func)
arr = xr.DataArray(np.zeros((3, 10, 10)), dims=["bands", "y", "x"], coords={"bands": ["R", "G", "B"]})
assert separate_func(arr).shape == arr.shape
assert all(separate_func(arr, gain=1).values[:, 0, 0] == [0, 1, 2])
[docs]
def test_using_map_blocks():
"""Test the `using_map_blocks` decorator."""
def func(np_array, block_info=None):
value = block_info[0]["chunk-location"][-1]
return np.ones(np_array.shape) * value
map_blocked_func = using_map_blocks(func)
arr = xr.DataArray(da.zeros((3, 10, 10), dtype=int, chunks=5), dims=["bands", "y", "x"])
res = map_blocked_func(arr)
assert res.shape == arr.shape
assert res[0, 0, 0].compute() != res[0, 9, 9].compute()
[docs]
def test_on_dask_array():
"""Test the `on_dask_array` decorator."""
def func(dask_array):
if not isinstance(dask_array, da.core.Array):
pytest.fail("Array is not a dask array")
return dask_array
dask_func = on_dask_array(func)
arr = xr.DataArray(da.zeros((3, 10, 10), dtype=int, chunks=5), dims=["bands", "y", "x"])
res = dask_func(arr)
assert res.shape == arr.shape
[docs]
@pytest.fixture
def fake_area():
"""Return a fake 2×2 area."""
from pyresample.geometry import create_area_def
return create_area_def("wingertsberg", 4087, area_extent=[-2_000, -2_000, 2_000, 2_000], shape=(2, 2))
_nwcsaf_geo_props = {
"cma_geo": ("geo", "cma", None, "cma_pal", None, "cloudmask", "CMA", "uint8"),
"cma_pps": ("pps", "cma", None, "cma_pal", None, "cloudmask", "CMA", "uint8"),
"cma_extended_pps": ("pps", "cma_extended", None, "cma_extended_pal", None,
"cloudmask_extended", "CMA", "uint8"),
"cmaprob_pps": ("pps", "cmaprob", None, "cmaprob_pal", None, "cloudmask_probability",
"CMAPROB", "uint8"),
"ct_geo": ("geo", "ct", None, "ct_pal", None, "cloudtype", "CT", "uint8"),
"ct_pps": ("pps", "ct", None, "ct_pal", None, "cloudtype", "CT", "uint8"),
"ctth_alti_geo": ("geo", "ctth_alti", None, "ctth_alti_pal", None, "cloud_top_height",
"CTTH", "float64"),
"ctth_alti_pps": ("pps", "ctth_alti", None, "ctth_alti_pal", "ctth_status_flag",
"cloud_top_height", "CTTH", "float64"),
"ctth_pres_geo": ("geo", "ctth_pres", None, "ctth_pres_pal", None, "cloud_top_pressure",
"CTTH", "float64"),
"ctth_pres_pps": ("pps", "ctth_pres", None, "ctth_pres_pal", None, "cloud_top_pressure",
"CTTH", "float64"),
"ctth_tempe_geo": ("geo", "ctth_tempe", None, "ctth_tempe_pal", None, "cloud_top_temperature",
"CTTH", "float64"),
"ctth_tempe_pps": ("pps", "ctth_tempe", None, "ctth_tempe_pal", None, "cloud_top_temperature",
"CTTH", "float64"),
"cmic_phase_geo": ("geo", "cmic_phase", None, "cmic_phase_pal", None, "cloud_top_phase",
"CMIC", "uint8"),
"cmic_phase_pps": ("pps", "cmic_phase", None, "cmic_phase_pal", "cmic_status_flag", "cloud_top_phase",
"CMIC", "uint8"),
"cmic_reff_geo": ("geo", "cmic_reff", None, "cmic_reff_pal", None, "cloud_drop_effective_radius",
"CMIC", "float64"),
"cmic_reff_pps": ("pps", "cmic_reff", "cmic_cre", "cmic_cre_pal", "cmic_status_flag",
"cloud_drop_effective_radius", "CMIC", "float64"),
"cmic_cot_geo": ("geo", "cmic_cot", None, "cmic_cot_pal", None, "cloud_optical_thickness",
"CMIC", "float64"),
"cmic_cot_pps": ("pps", "cmic_cot", None, "cmic_cot_pal", None, "cloud_optical_thickness",
"CMIC", "float64"),
"cmic_cwp_pps": ("pps", "cmic_cwp", None, "cmic_cwp_pal", None, "cloud_water_path",
"CMIC", "float64"),
"cmic_lwp_geo": ("geo", "cmic_lwp", None, "cmic_lwp_pal", None, "cloud_liquid_water_path",
"CMIC", "float64"),
"cmic_lwp_pps": ("pps", "cmic_lwp", None, "cmic_lwp_pal", None, "liquid_water_path",
"CMIC", "float64"),
"cmic_iwp_geo": ("geo", "cmic_iwp", None, "cmic_iwp_pal", None, "cloud_ice_water_path",
"CMIC", "float64"),
"cmic_iwp_pps": ("pps", "cmic_iwp", None, "cmic_iwp_pal", None, "ice_water_path",
"CMIC", "float64"),
"pc": ("geo", "pc", None, "pc_pal", None, "precipitation_probability", "PC", "uint8"),
"crr": ("geo", "crr", None, "crr_pal", None, "convective_rain_rate", "CRR", "uint8"),
"crr_accum": ("geo", "crr_accum", None, "crr_pal", None,
"convective_precipitation_hourly_accumulation", "CRR", "uint8"),
"ishai_tpw": ("geo", "ishai_tpw", None, "ishai_tpw_pal", None, "total_precipitable_water",
"iSHAI", "float64"),
"ishai_shw": ("geo", "ishai_shw", None, "ishai_shw_pal", None, "showalter_index",
"iSHAI", "float64"),
"ishai_li": ("geo", "ishai_li", None, "ishai_li_pal", None, "lifted_index",
"iSHAI", "float64"),
"ci_prob30": ("geo", "ci_prob30", None, "ci_pal", None, "convection_initiation_prob30",
"CI", "float64"),
"ci_prob60": ("geo", "ci_prob60", None, "ci_pal", None, "convection_initiation_prob60",
"CI", "float64"),
"ci_prob90": ("geo", "ci_prob90", None, "ci_pal", None, "convection_initiation_prob90",
"CI", "float64"),
"asii_turb_trop_prob": ("geo", "asii_turb_trop_prob", None, "asii_turb_prob_pal", None,
"asii_prob", "ASII-NG", "float64"),
"MapCellCatType": ("geo", "MapCellCatType", None, "MapCellCatType_pal", None,
"rdt_cell_type", "RDT-CW", "uint8"),
}
[docs]
@pytest.mark.parametrize(
"data",
["cma_geo", "cma_pps", "cma_extended_pps", "cmaprob_pps", "ct_geo",
"ct_pps", "ctth_alti_geo", "ctth_alti_pps", "ctth_pres_geo",
"ctth_pres_pps", "ctth_tempe_geo", "ctth_tempe_pps",
"cmic_phase_geo", "cmic_phase_pps", "cmic_reff_geo",
"cmic_reff_pps", "cmic_cot_geo", "cmic_cot_pps", "cmic_cwp_pps",
"cmic_lwp_geo", "cmic_lwp_pps", "cmic_iwp_geo", "cmic_iwp_pps",
"pc", "crr", "crr_accum", "ishai_tpw", "ishai_shw", "ishai_li",
"ci_prob30", "ci_prob60", "ci_prob90", "asii_turb_trop_prob",
"MapCellCatType"]
)
def test_nwcsaf_comps(fake_area, tmp_path, data):
"""Test loading NWCSAF composites."""
from satpy import Scene
from satpy.writers import get_enhanced_image
(flavour, dvname, altname, palettename, statusname, comp, filelabel, dtp) = _nwcsaf_geo_props[data]
rng = (0, 100) if dtp == "uint8" else (-100, 1000)
if flavour == "geo":
fn = f"S_NWC_{filelabel:s}_MSG2_MSG-N-VISIR_20220124T094500Z.nc"
reader = "nwcsaf-geo"
id_ = {"satellite_identifier": "MSG4"}
else:
fn = f"S_NWC_{filelabel:s}_noaa20_00000_20230301T1200213Z_20230301T1201458Z.nc"
reader = "nwcsaf-pps_nc"
id_ = {"platform": "NOAA-20"}
fk = tmp_path / fn
# create a minimally fake netCDF file, otherwise satpy won't load the
# composite
ds = xr.Dataset(
coords={"nx": [0], "ny": [0]},
attrs={
"source": "satpy unit test",
"time_coverage_start": "0001-01-01T00:00:00Z",
"time_coverage_end": "0001-01-01T01:00:00Z",
}
)
ds.attrs.update(id_)
ds.to_netcdf(fk)
sc = Scene(filenames=[os.fspath(fk)], reader=[reader])
sc[palettename] = xr.DataArray(
da.tile(da.arange(256), [3, 1]).T,
dims=("pal02_colors", "pal_RGB"))
fake_alti = da.linspace(rng[0], rng[1], 4, chunks=2, dtype=dtp).reshape(2, 2)
ancvars = [sc[palettename]]
if statusname is not None:
sc[statusname] = xr.DataArray(
da.zeros(shape=(2, 2), dtype="uint8"),
attrs={
"area": fake_area,
"_FillValue": 123},
dims=("y", "x"))
ancvars.append(sc[statusname])
sc[dvname] = xr.DataArray(
fake_alti,
dims=("y", "x"),
attrs={
"area": fake_area,
"scaled_FillValue": 123,
"ancillary_variables": ancvars,
"valid_range": rng})
def _fake_get_varname(info, info_type="file_key"):
return altname or dvname
with mock.patch("satpy.readers.nwcsaf_nc.NcNWCSAF._get_varname_in_file") as srnN_:
srnN_.side_effect = _fake_get_varname
sc.load([comp])
im = get_enhanced_image(sc[comp])
if flavour == "geo":
assert im.mode == "P"
np.testing.assert_array_equal(im.data.coords["bands"], ["P"])
if dtp == "float64":
np.testing.assert_allclose(
im.data.sel(bands="P"),
((fake_alti - rng[0]) * (255 / np.ptp(rng))).round())
else:
np.testing.assert_allclose(im.data.sel(bands="P"), fake_alti)
[docs]
class TestTCREnhancement:
"""Test the AHI enhancement functions."""
[docs]
def setup_method(self):
"""Create test data."""
data = da.arange(-100, 1000, 110).reshape(2, 5)
rgb_data = np.stack([data, data, data])
self.rgb = xr.DataArray(rgb_data, dims=("bands", "y", "x"),
coords={"bands": ["R", "G", "B"]},
attrs={"platform_name": "Himawari-8"})
[docs]
def test_jma_true_color_reproduction(self):
"""Test the jma_true_color_reproduction enhancement."""
from trollimage.xrimage import XRImage
from satpy.enhancements import jma_true_color_reproduction
expected = [[[-109.93, 10.993, 131.916, 252.839, 373.762],
[494.685, 615.608, 736.531, 857.454, 978.377]],
[[-97.73, 9.773, 117.276, 224.779, 332.282],
[439.785, 547.288, 654.791, 762.294, 869.797]],
[[-93.29, 9.329, 111.948, 214.567, 317.186],
[419.805, 522.424, 625.043, 727.662, 830.281]]]
img = XRImage(self.rgb)
jma_true_color_reproduction(img)
np.testing.assert_almost_equal(img.data.compute(), expected)
self.rgb.attrs["platform_name"] = None
img = XRImage(self.rgb)
with pytest.raises(ValueError, match="Missing platform name."):
jma_true_color_reproduction(img)
self.rgb.attrs["platform_name"] = "Fakesat"
img = XRImage(self.rgb)
with pytest.raises(KeyError, match="No conversion matrix found for platform Fakesat"):
jma_true_color_reproduction(img)