Source code for satpy.tests.enhancement_tests.test_enhancements

# Copyright (c) 2017-2023 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Unit testing the enhancements functions, e.g. cira_stretch."""

import contextlib
import os
from tempfile import NamedTemporaryFile
from unittest import mock

import dask.array as da
import numpy as np
import pytest
import xarray as xr

from satpy.enhancements import create_colormap, on_dask_array, on_separate_bands, using_map_blocks

# NOTE:
# The following fixtures are not defined in this file, but are used and injected by Pytest:
# - tmp_path


[docs] def run_and_check_enhancement(func, data, expected, **kwargs): """Perform basic checks that apply to multiple tests.""" from trollimage.xrimage import XRImage pre_attrs = data.attrs img = XRImage(data) func(img, **kwargs) assert isinstance(img.data.data, da.Array) old_keys = set(pre_attrs.keys()) # It is OK to have "enhancement_history" added new_keys = set(img.data.attrs.keys()) - {"enhancement_history"} assert old_keys == new_keys res_data_arr = img.data assert isinstance(res_data_arr, xr.DataArray) assert isinstance(res_data_arr.data, da.Array) res_data = res_data_arr.data.compute() # mimics what xrimage geotiff writing does assert not isinstance(res_data, da.Array) np.testing.assert_allclose(res_data, expected, atol=1.e-6, rtol=0)
[docs] def identical_decorator(func): """Decorate but do nothing.""" return func
[docs] class TestEnhancementStretch: """Class for testing enhancements in satpy.enhancements."""
[docs] def setup_method(self): """Create test data used by every test.""" data = np.arange(-210, 790, 100).reshape((2, 5)) * 0.95 data[0, 0] = np.nan # one bad value for testing crefl_data = np.arange(-210, 790, 100).reshape((2, 5)) * 0.95 crefl_data /= 5.605 crefl_data[0, 0] = np.nan # one bad value for testing crefl_data[0, 1] = 0. self.ch1 = xr.DataArray(da.from_array(data, chunks=2), dims=("y", "x"), attrs={"test": "test"}) self.ch2 = xr.DataArray(da.from_array(crefl_data, chunks=2), dims=("y", "x"), attrs={"test": "test"}) rgb_data = np.stack([data, data, data]) self.rgb = xr.DataArray(da.from_array(rgb_data, chunks=(3, 2, 2)), dims=("bands", "y", "x"), coords={"bands": ["R", "G", "B"]})
[docs] @pytest.mark.parametrize( ("decorator", "exp_call_cls"), [ (identical_decorator, xr.DataArray), (on_dask_array, da.Array), (using_map_blocks, np.ndarray), ], ) @pytest.mark.parametrize("input_data_name", ["ch1", "ch2", "rgb"]) def test_apply_enhancement(self, input_data_name, decorator, exp_call_cls): """Test the 'apply_enhancement' utility function.""" def _enh_func(img): def _calc_func(data): assert isinstance(data, exp_call_cls) return data decorated_func = decorator(_calc_func) return decorated_func(img.data) in_data = getattr(self, input_data_name) exp_data = in_data.values if "bands" not in in_data.coords: exp_data = exp_data[np.newaxis, :, :] run_and_check_enhancement(_enh_func, in_data, exp_data)
[docs] def test_cira_stretch(self): """Test applying the cira_stretch.""" from satpy.enhancements import cira_stretch expected = np.array([[ [np.nan, -7.04045974, -7.04045974, 0.79630132, 0.95947296], [1.05181359, 1.11651012, 1.16635571, 1.20691137, 1.24110186]]]) run_and_check_enhancement(cira_stretch, self.ch1, expected)
[docs] def test_reinhard(self): """Test the reinhard algorithm.""" from satpy.enhancements import reinhard_to_srgb expected = np.array([[[np.nan, 0., 0., 0.93333793, 1.29432402], [1.55428709, 1.76572249, 1.94738635, 2.10848544, 2.25432809]], [[np.nan, 0., 0., 0.93333793, 1.29432402], [1.55428709, 1.76572249, 1.94738635, 2.10848544, 2.25432809]], [[np.nan, 0., 0., 0.93333793, 1.29432402], [1.55428709, 1.76572249, 1.94738635, 2.10848544, 2.25432809]]]) run_and_check_enhancement(reinhard_to_srgb, self.rgb, expected)
[docs] def test_lookup(self): """Test the lookup enhancement function.""" from satpy.enhancements import lookup expected = np.array([[ [0., 0., 0., 0.333333, 0.705882], [1., 1., 1., 1., 1.]]]) lut = np.arange(256.) run_and_check_enhancement(lookup, self.ch1, expected, luts=lut) expected = np.array([[[0., 0., 0., 0.333333, 0.705882], [1., 1., 1., 1., 1.]], [[0., 0., 0., 0.333333, 0.705882], [1., 1., 1., 1., 1.]], [[0., 0., 0., 0.333333, 0.705882], [1., 1., 1., 1., 1.]]]) lut = np.arange(256.) lut = np.vstack((lut, lut, lut)).T run_and_check_enhancement(lookup, self.rgb, expected, luts=lut)
[docs] def test_colorize(self): """Test the colorize enhancement function.""" from trollimage.colormap import brbg from satpy.enhancements import colorize expected = np.array([ [[np.nan, 3.29411723e-01, 3.29411723e-01, 3.21825881e-08, 3.21825881e-08], [3.21825881e-08, 3.21825881e-08, 3.21825881e-08, 3.21825881e-08, 3.21825881e-08]], [[np.nan, 1.88235327e-01, 1.88235327e-01, 2.35294109e-01, 2.35294109e-01], [2.35294109e-01, 2.35294109e-01, 2.35294109e-01, 2.35294109e-01, 2.35294109e-01]], [[np.nan, 1.96078164e-02, 1.96078164e-02, 1.88235281e-01, 1.88235281e-01], [1.88235281e-01, 1.88235281e-01, 1.88235281e-01, 1.88235281e-01, 1.88235281e-01]]]) run_and_check_enhancement(colorize, self.ch1, expected, palettes=brbg)
[docs] def test_palettize(self): """Test the palettize enhancement function.""" from trollimage.colormap import brbg from satpy.enhancements import palettize expected = np.array([[[10, 0, 0, 10, 10], [10, 10, 10, 10, 10]]]) run_and_check_enhancement(palettize, self.ch1, expected, palettes=brbg)
[docs] def test_three_d_effect(self): """Test the three_d_effect enhancement function.""" from satpy.enhancements import three_d_effect expected = np.array([[ [np.nan, np.nan, -389.5, -294.5, 826.5], [np.nan, np.nan, 85.5, 180.5, 1301.5]]]) run_and_check_enhancement(three_d_effect, self.ch1, expected)
[docs] def test_piecewise_linear_stretch(self): """Test the piecewise_linear_stretch enhancement function.""" from satpy.enhancements import piecewise_linear_stretch expected = np.array([[ [np.nan, 0., 0., 0.44378, 0.631734], [0.737562, 0.825041, 0.912521, 1., 1.]]]) run_and_check_enhancement(piecewise_linear_stretch, self.ch2 / 100.0, expected, xp=[0., 25., 55., 100., 255.], fp=[0., 90., 140., 175., 255.], reference_scale_factor=255, )
[docs] def test_btemp_threshold(self): """Test applying the cira_stretch.""" from satpy.enhancements import btemp_threshold expected = np.array([[ [np.nan, 0.946207, 0.892695, 0.839184, 0.785672], [0.73216, 0.595869, 0.158745, -0.278379, -0.715503]]]) run_and_check_enhancement(btemp_threshold, self.ch1, expected, min_in=-200, max_in=500, threshold=350)
[docs] def test_merge_colormaps(self): """Test merging colormaps.""" from trollimage.colormap import Colormap from satpy.enhancements import _merge_colormaps as mcp from satpy.enhancements import create_colormap ret_map = mock.MagicMock() create_colormap_mock = mock.Mock(wraps=create_colormap) cmap1 = Colormap((1, (1., 1., 1.))) kwargs = {"palettes": cmap1} with mock.patch("satpy.enhancements.create_colormap", create_colormap_mock): res = mcp(kwargs) assert res is cmap1 create_colormap_mock.assert_not_called() create_colormap_mock.reset_mock() ret_map.reset_mock() cmap1 = {"colors": "blues", "min_value": 0, "max_value": 1} kwargs = {"palettes": [cmap1]} with mock.patch("satpy.enhancements.create_colormap", create_colormap_mock), \ mock.patch("trollimage.colormap.blues", ret_map): _ = mcp(kwargs) create_colormap_mock.assert_called_once() ret_map.reverse.assert_not_called() ret_map.set_range.assert_called_with(0, 1) create_colormap_mock.reset_mock() ret_map.reset_mock() cmap2 = {"colors": "blues", "min_value": 2, "max_value": 3, "reverse": True} kwargs = {"palettes": [cmap2]} with mock.patch("trollimage.colormap.blues", ret_map): _ = mcp(kwargs) ret_map.reverse.assert_called_once() ret_map.set_range.assert_called_with(2, 3) create_colormap_mock.reset_mock() ret_map.reset_mock() kwargs = {"palettes": [cmap1, cmap2]} with mock.patch("trollimage.colormap.blues", ret_map): _ = mcp(kwargs) ret_map.__add__.assert_called_once()
[docs] def tearDown(self): """Clean up."""
[docs] @contextlib.contextmanager def closed_named_temp_file(**kwargs): """Named temporary file context manager that closes the file after creation. This helps with Windows systems which can get upset with opening or deleting a file that is already open. """ try: with NamedTemporaryFile(delete=False, **kwargs) as tmp_cmap: yield tmp_cmap.name finally: os.remove(tmp_cmap.name)
[docs] def _write_cmap_to_file(cmap_filename, cmap_data): ext = os.path.splitext(cmap_filename)[1] if ext in (".npy",): np.save(cmap_filename, cmap_data) elif ext in (".npz",): np.savez(cmap_filename, cmap_data) else: np.savetxt(cmap_filename, cmap_data, delimiter=",")
[docs] def _generate_cmap_test_data(color_scale, colormap_mode): cmap_data = np.array([ [1, 0, 0], [1, 1, 0], [1, 1, 1], [0, 0, 1], ], dtype=np.float64) if len(colormap_mode) != 3: _cmap_data = cmap_data cmap_data = np.empty((cmap_data.shape[0], len(colormap_mode)), dtype=np.float64) if colormap_mode.startswith("V") or colormap_mode.endswith("A"): cmap_data[:, 0] = np.array([128, 130, 132, 134]) / 255.0 cmap_data[:, -3:] = _cmap_data if colormap_mode.startswith("V") and colormap_mode.endswith("A"): cmap_data[:, 1] = np.array([128, 130, 132, 134]) / 255.0 if color_scale is None or color_scale == 255: cmap_data = (cmap_data * 255).astype(np.uint8) return cmap_data
[docs] class TestColormapLoading: """Test utilities used with colormaps."""
[docs] @pytest.mark.parametrize("color_scale", [None, 1.0]) @pytest.mark.parametrize("colormap_mode", ["RGB", "VRGB", "VRGBA"]) @pytest.mark.parametrize("extra_kwargs", [ {}, {"min_value": 50, "max_value": 100}, ]) @pytest.mark.parametrize("filename_suffix", [".npy", ".npz", ".csv"]) def test_cmap_from_file(self, color_scale, colormap_mode, extra_kwargs, filename_suffix): """Test that colormaps can be loaded from a binary file.""" # create the colormap file on disk with closed_named_temp_file(suffix=filename_suffix) as cmap_filename: cmap_data = _generate_cmap_test_data(color_scale, colormap_mode) _write_cmap_to_file(cmap_filename, cmap_data) unset_first_value = 128.0 / 255.0 if colormap_mode.startswith("V") else 0.0 unset_last_value = 134.0 / 255.0 if colormap_mode.startswith("V") else 1.0 if (color_scale is None or color_scale == 255) and colormap_mode.startswith("V"): unset_first_value *= 255 unset_last_value *= 255 if "min_value" in extra_kwargs: unset_first_value = extra_kwargs["min_value"] unset_last_value = extra_kwargs["max_value"] first_color = [1.0, 0.0, 0.0] if colormap_mode == "VRGBA": first_color = [128.0 / 255.0] + first_color kwargs1 = {"filename": cmap_filename} kwargs1.update(extra_kwargs) if color_scale is not None: kwargs1["color_scale"] = color_scale cmap = create_colormap(kwargs1) assert cmap.colors.shape[0] == 4 np.testing.assert_equal(cmap.colors[0], first_color) assert cmap.values.shape[0] == 4 assert cmap.values[0] == unset_first_value assert cmap.values[-1] == unset_last_value
[docs] def test_cmap_vrgb_as_rgba(self): """Test that data created as VRGB still reads as RGBA.""" with closed_named_temp_file(suffix=".npy") as cmap_filename: cmap_data = _generate_cmap_test_data(None, "VRGB") np.save(cmap_filename, cmap_data) cmap = create_colormap({"filename": cmap_filename, "colormap_mode": "RGBA"}) assert cmap.colors.shape[0] == 4 assert cmap.colors.shape[1] == 4 # RGBA np.testing.assert_equal(cmap.colors[0], [128 / 255., 1.0, 0, 0]) assert cmap.values.shape[0] == 4 assert cmap.values[0] == 0 assert cmap.values[-1] == 1.0
[docs] @pytest.mark.parametrize( ("real_mode", "forced_mode"), [ ("VRGBA", "RGBA"), ("VRGBA", "VRGB"), ("RGBA", "RGB"), ] ) @pytest.mark.parametrize("filename_suffix", [".npy", ".csv"]) def test_cmap_bad_mode(self, real_mode, forced_mode, filename_suffix): """Test that reading colormaps with the wrong mode fails.""" with closed_named_temp_file(suffix=filename_suffix) as cmap_filename: cmap_data = _generate_cmap_test_data(None, real_mode) _write_cmap_to_file(cmap_filename, cmap_data) # Force colormap_mode VRGBA to RGBA and we should see an exception with pytest.raises(ValueError, match="Unexpected colormap shape for mode .*"): create_colormap({"filename": cmap_filename, "colormap_mode": forced_mode})
[docs] def test_cmap_from_file_bad_shape(self): """Test that unknown array shape causes an error.""" from satpy.enhancements import create_colormap # create the colormap file on disk with closed_named_temp_file(suffix=".npy") as cmap_filename: np.save(cmap_filename, np.array([ [0], [64], [128], [255], ])) with pytest.raises(ValueError, match="Unexpected colormap shape for mode 'None'"): create_colormap({"filename": cmap_filename})
[docs] def test_cmap_from_config_path(self, tmp_path): """Test loading a colormap relative to a config path.""" import satpy from satpy.enhancements import create_colormap cmap_dir = tmp_path / "colormaps" cmap_dir.mkdir() cmap_filename = cmap_dir / "my_colormap.npy" cmap_data = _generate_cmap_test_data(None, "RGBA") np.save(cmap_filename, cmap_data) with satpy.config.set(config_path=[tmp_path]): rel_cmap_filename = os.path.join("colormaps", "my_colormap.npy") cmap = create_colormap({"filename": rel_cmap_filename, "colormap_mode": "RGBA"}) assert cmap.colors.shape[0] == 4 assert cmap.colors.shape[1] == 4 # RGBA np.testing.assert_equal(cmap.colors[0], [128 / 255., 1.0, 0, 0]) assert cmap.values.shape[0] == 4 assert cmap.values[0] == 0 assert cmap.values[-1] == 1.0
[docs] def test_cmap_from_trollimage(self): """Test that colormaps in trollimage can be loaded.""" from satpy.enhancements import create_colormap cmap = create_colormap({"colors": "pubu"}) from trollimage.colormap import pubu np.testing.assert_equal(cmap.colors, pubu.colors) np.testing.assert_equal(cmap.values, pubu.values)
[docs] def test_cmap_no_colormap(self): """Test that being unable to create a colormap raises an error.""" from satpy.enhancements import create_colormap with pytest.raises(ValueError, match="Unknown colormap format: .*"): create_colormap({})
[docs] def test_cmap_list(self): """Test that colors can be a list/tuple.""" from satpy.enhancements import create_colormap colors = [ [0, 0, 1], [1, 0, 1], [0, 1, 1], [1, 1, 1], ] values = [2, 4, 6, 8] cmap = create_colormap({"colors": colors, "color_scale": 1}) assert cmap.colors.shape[0] == 4 np.testing.assert_equal(cmap.colors[0], [0.0, 0.0, 1.0]) assert cmap.values.shape[0] == 4 assert cmap.values[0] == 0 assert cmap.values[-1] == 1.0 cmap = create_colormap({"colors": colors, "color_scale": 1, "values": values}) assert cmap.colors.shape[0] == 4 np.testing.assert_equal(cmap.colors[0], [0.0, 0.0, 1.0]) assert cmap.values.shape[0] == 4 assert cmap.values[0] == 2 assert cmap.values[-1] == 8
[docs] def test_on_separate_bands(): """Test the `on_separate_bands` decorator.""" def func(array, index, gain=2): return xr.DataArray(np.ones(array.shape, dtype=array.dtype) * index * gain, coords=array.coords, dims=array.dims, attrs=array.attrs) separate_func = on_separate_bands(func) arr = xr.DataArray(np.zeros((3, 10, 10)), dims=["bands", "y", "x"], coords={"bands": ["R", "G", "B"]}) assert separate_func(arr).shape == arr.shape assert all(separate_func(arr, gain=1).values[:, 0, 0] == [0, 1, 2])
[docs] def test_using_map_blocks(): """Test the `using_map_blocks` decorator.""" def func(np_array, block_info=None): value = block_info[0]["chunk-location"][-1] return np.ones(np_array.shape) * value map_blocked_func = using_map_blocks(func) arr = xr.DataArray(da.zeros((3, 10, 10), dtype=int, chunks=5), dims=["bands", "y", "x"]) res = map_blocked_func(arr) assert res.shape == arr.shape assert res[0, 0, 0].compute() != res[0, 9, 9].compute()
[docs] def test_on_dask_array(): """Test the `on_dask_array` decorator.""" def func(dask_array): if not isinstance(dask_array, da.core.Array): pytest.fail("Array is not a dask array") return dask_array dask_func = on_dask_array(func) arr = xr.DataArray(da.zeros((3, 10, 10), dtype=int, chunks=5), dims=["bands", "y", "x"]) res = dask_func(arr) assert res.shape == arr.shape
[docs] @pytest.fixture() def fake_area(): """Return a fake 2×2 area.""" from pyresample.geometry import create_area_def return create_area_def("wingertsberg", 4087, area_extent=[-2_000, -2_000, 2_000, 2_000], shape=(2, 2))
_nwcsaf_geo_props = { "cma_geo": ("geo", "cma", None, "cma_pal", None, "cloudmask", "CMA", "uint8"), "cma_pps": ("pps", "cma", None, "cma_pal", None, "cloudmask", "CMA", "uint8"), "cma_extended_pps": ("pps", "cma_extended", None, "cma_extended_pal", None, "cloudmask_extended", "CMA", "uint8"), "cmaprob_pps": ("pps", "cmaprob", None, "cmaprob_pal", None, "cloudmask_probability", "CMAPROB", "uint8"), "ct_geo": ("geo", "ct", None, "ct_pal", None, "cloudtype", "CT", "uint8"), "ct_pps": ("pps", "ct", None, "ct_pal", None, "cloudtype", "CT", "uint8"), "ctth_alti_geo": ("geo", "ctth_alti", None, "ctth_alti_pal", None, "cloud_top_height", "CTTH", "float64"), "ctth_alti_pps": ("pps", "ctth_alti", None, "ctth_alti_pal", "ctth_status_flag", "cloud_top_height", "CTTH", "float64"), "ctth_pres_geo": ("geo", "ctth_pres", None, "ctth_pres_pal", None, "cloud_top_pressure", "CTTH", "float64"), "ctth_pres_pps": ("pps", "ctth_pres", None, "ctth_pres_pal", None, "cloud_top_pressure", "CTTH", "float64"), "ctth_tempe_geo": ("geo", "ctth_tempe", None, "ctth_tempe_pal", None, "cloud_top_temperature", "CTTH", "float64"), "ctth_tempe_pps": ("pps", "ctth_tempe", None, "ctth_tempe_pal", None, "cloud_top_temperature", "CTTH", "float64"), "cmic_phase_geo": ("geo", "cmic_phase", None, "cmic_phase_pal", None, "cloud_top_phase", "CMIC", "uint8"), "cmic_phase_pps": ("pps", "cmic_phase", None, "cmic_phase_pal", "cmic_status_flag", "cloud_top_phase", "CMIC", "uint8"), "cmic_reff_geo": ("geo", "cmic_reff", None, "cmic_reff_pal", None, "cloud_drop_effective_radius", "CMIC", "float64"), "cmic_reff_pps": ("pps", "cmic_reff", "cmic_cre", "cmic_cre_pal", "cmic_status_flag", "cloud_drop_effective_radius", "CMIC", "float64"), "cmic_cot_geo": ("geo", "cmic_cot", None, "cmic_cot_pal", None, "cloud_optical_thickness", "CMIC", "float64"), "cmic_cot_pps": ("pps", "cmic_cot", None, "cmic_cot_pal", None, "cloud_optical_thickness", "CMIC", "float64"), "cmic_cwp_pps": ("pps", "cmic_cwp", None, "cmic_cwp_pal", None, "cloud_water_path", "CMIC", "float64"), "cmic_lwp_geo": ("geo", "cmic_lwp", None, "cmic_lwp_pal", None, "cloud_liquid_water_path", "CMIC", "float64"), "cmic_lwp_pps": ("pps", "cmic_lwp", None, "cmic_lwp_pal", None, "liquid_water_path", "CMIC", "float64"), "cmic_iwp_geo": ("geo", "cmic_iwp", None, "cmic_iwp_pal", None, "cloud_ice_water_path", "CMIC", "float64"), "cmic_iwp_pps": ("pps", "cmic_iwp", None, "cmic_iwp_pal", None, "ice_water_path", "CMIC", "float64"), "pc": ("geo", "pc", None, "pc_pal", None, "precipitation_probability", "PC", "uint8"), "crr": ("geo", "crr", None, "crr_pal", None, "convective_rain_rate", "CRR", "uint8"), "crr_accum": ("geo", "crr_accum", None, "crr_pal", None, "convective_precipitation_hourly_accumulation", "CRR", "uint8"), "ishai_tpw": ("geo", "ishai_tpw", None, "ishai_tpw_pal", None, "total_precipitable_water", "iSHAI", "float64"), "ishai_shw": ("geo", "ishai_shw", None, "ishai_shw_pal", None, "showalter_index", "iSHAI", "float64"), "ishai_li": ("geo", "ishai_li", None, "ishai_li_pal", None, "lifted_index", "iSHAI", "float64"), "ci_prob30": ("geo", "ci_prob30", None, "ci_pal", None, "convection_initiation_prob30", "CI", "float64"), "ci_prob60": ("geo", "ci_prob60", None, "ci_pal", None, "convection_initiation_prob60", "CI", "float64"), "ci_prob90": ("geo", "ci_prob90", None, "ci_pal", None, "convection_initiation_prob90", "CI", "float64"), "asii_turb_trop_prob": ("geo", "asii_turb_trop_prob", None, "asii_turb_prob_pal", None, "asii_prob", "ASII-NG", "float64"), "MapCellCatType": ("geo", "MapCellCatType", None, "MapCellCatType_pal", None, "rdt_cell_type", "RDT-CW", "uint8"), }
[docs] @pytest.mark.parametrize( "data", ["cma_geo", "cma_pps", "cma_extended_pps", "cmaprob_pps", "ct_geo", "ct_pps", "ctth_alti_geo", "ctth_alti_pps", "ctth_pres_geo", "ctth_pres_pps", "ctth_tempe_geo", "ctth_tempe_pps", "cmic_phase_geo", "cmic_phase_pps", "cmic_reff_geo", "cmic_reff_pps", "cmic_cot_geo", "cmic_cot_pps", "cmic_cwp_pps", "cmic_lwp_geo", "cmic_lwp_pps", "cmic_iwp_geo", "cmic_iwp_pps", "pc", "crr", "crr_accum", "ishai_tpw", "ishai_shw", "ishai_li", "ci_prob30", "ci_prob60", "ci_prob90", "asii_turb_trop_prob", "MapCellCatType"] ) def test_nwcsaf_comps(fake_area, tmp_path, data): """Test loading NWCSAF composites.""" from satpy import Scene from satpy.writers import get_enhanced_image (flavour, dvname, altname, palettename, statusname, comp, filelabel, dtp) = _nwcsaf_geo_props[data] rng = (0, 100) if dtp == "uint8" else (-100, 1000) if flavour == "geo": fn = f"S_NWC_{filelabel:s}_MSG2_MSG-N-VISIR_20220124T094500Z.nc" reader = "nwcsaf-geo" id_ = {"satellite_identifier": "MSG4"} else: fn = f"S_NWC_{filelabel:s}_noaa20_00000_20230301T1200213Z_20230301T1201458Z.nc" reader = "nwcsaf-pps_nc" id_ = {"platform": "NOAA-20"} fk = tmp_path / fn # create a minimally fake netCDF file, otherwise satpy won't load the # composite ds = xr.Dataset( coords={"nx": [0], "ny": [0]}, attrs={ "source": "satpy unit test", "time_coverage_start": "0001-01-01T00:00:00Z", "time_coverage_end": "0001-01-01T01:00:00Z", } ) ds.attrs.update(id_) ds.to_netcdf(fk) sc = Scene(filenames=[os.fspath(fk)], reader=[reader]) sc[palettename] = xr.DataArray( da.tile(da.arange(256), [3, 1]).T, dims=("pal02_colors", "pal_RGB")) fake_alti = da.linspace(rng[0], rng[1], 4, chunks=2, dtype=dtp).reshape(2, 2) ancvars = [sc[palettename]] if statusname is not None: sc[statusname] = xr.DataArray( da.zeros(shape=(2, 2), dtype="uint8"), attrs={ "area": fake_area, "_FillValue": 123}, dims=("y", "x")) ancvars.append(sc[statusname]) sc[dvname] = xr.DataArray( fake_alti, dims=("y", "x"), attrs={ "area": fake_area, "scaled_FillValue": 123, "ancillary_variables": ancvars, "valid_range": rng}) def _fake_get_varname(info, info_type="file_key"): return altname or dvname with mock.patch("satpy.readers.nwcsaf_nc.NcNWCSAF._get_varname_in_file") as srnN_: srnN_.side_effect = _fake_get_varname sc.load([comp]) im = get_enhanced_image(sc[comp]) if flavour == "geo": assert im.mode == "P" np.testing.assert_array_equal(im.data.coords["bands"], ["P"]) if dtp == "float64": np.testing.assert_allclose( im.data.sel(bands="P"), ((fake_alti - rng[0]) * (255 / np.ptp(rng))).round()) else: np.testing.assert_allclose(im.data.sel(bands="P"), fake_alti)
[docs] class TestTCREnhancement: """Test the AHI enhancement functions."""
[docs] def setup_method(self): """Create test data.""" data = da.arange(-100, 1000, 110).reshape(2, 5) rgb_data = np.stack([data, data, data]) self.rgb = xr.DataArray(rgb_data, dims=("bands", "y", "x"), coords={"bands": ["R", "G", "B"]}, attrs={"platform_name": "Himawari-8"})
[docs] def test_jma_true_color_reproduction(self): """Test the jma_true_color_reproduction enhancement.""" from trollimage.xrimage import XRImage from satpy.enhancements import jma_true_color_reproduction expected = [[[-109.93, 10.993, 131.916, 252.839, 373.762], [494.685, 615.608, 736.531, 857.454, 978.377]], [[-97.73, 9.773, 117.276, 224.779, 332.282], [439.785, 547.288, 654.791, 762.294, 869.797]], [[-93.29, 9.329, 111.948, 214.567, 317.186], [419.805, 522.424, 625.043, 727.662, 830.281]]] img = XRImage(self.rgb) jma_true_color_reproduction(img) np.testing.assert_almost_equal(img.data.compute(), expected) self.rgb.attrs["platform_name"] = None img = XRImage(self.rgb) with pytest.raises(ValueError, match="Missing platform name."): jma_true_color_reproduction(img) self.rgb.attrs["platform_name"] = "Fakesat" img = XRImage(self.rgb) with pytest.raises(KeyError, match="No conversion matrix found for platform Fakesat"): jma_true_color_reproduction(img)