Source code for satpy.tests.enhancement_tests.test_enhancements

# Copyright (c) 2017-2023 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Unit testing the enhancements functions, e.g. cira_stretch."""

import contextlib
import os
from tempfile import NamedTemporaryFile
from unittest import mock

import dask.array as da
import numpy as np
import pytest
import xarray as xr

from satpy.enhancements import create_colormap, on_dask_array, on_separate_bands, using_map_blocks

# NOTE:
# The following fixtures are not defined in this file, but are used and injected by Pytest:
# - tmp_path


[docs] def run_and_check_enhancement(func, data, expected, **kwargs): """Perform basic checks that apply to multiple tests.""" from trollimage.xrimage import XRImage pre_attrs = data.attrs img = XRImage(data) func(img, **kwargs) assert isinstance(img.data.data, da.Array) old_keys = set(pre_attrs.keys()) # It is OK to have "enhancement_history" added new_keys = set(img.data.attrs.keys()) - {"enhancement_history"} assert old_keys == new_keys res_data_arr = img.data assert isinstance(res_data_arr, xr.DataArray) assert isinstance(res_data_arr.data, da.Array) res_data = res_data_arr.data.compute() # mimics what xrimage geotiff writing does assert not isinstance(res_data, da.Array) np.testing.assert_allclose(res_data, expected, atol=1.e-6, rtol=0)
[docs] def identical_decorator(func): """Decorate but do nothing.""" return func
[docs] class TestEnhancementStretch: """Class for testing enhancements in satpy.enhancements."""
[docs] def setup_method(self): """Create test data used by every test.""" data = np.arange(-210, 790, 100).reshape((2, 5)) * 0.95 data[0, 0] = np.nan # one bad value for testing crefl_data = np.arange(-210, 790, 100).reshape((2, 5)) * 0.95 crefl_data /= 5.605 crefl_data[0, 0] = np.nan # one bad value for testing crefl_data[0, 1] = 0. self.ch1 = xr.DataArray(da.from_array(data, chunks=2), dims=('y', 'x'), attrs={'test': 'test'}) self.ch2 = xr.DataArray(da.from_array(crefl_data, chunks=2), dims=('y', 'x'), attrs={'test': 'test'}) rgb_data = np.stack([data, data, data]) self.rgb = xr.DataArray(da.from_array(rgb_data, chunks=(3, 2, 2)), dims=('bands', 'y', 'x'), coords={'bands': ['R', 'G', 'B']})
[docs] @pytest.mark.parametrize( ("decorator", "exp_call_cls"), [ (identical_decorator, xr.DataArray), (on_dask_array, da.Array), (using_map_blocks, np.ndarray), ], ) @pytest.mark.parametrize("input_data_name", ["ch1", "ch2", "rgb"]) def test_apply_enhancement(self, input_data_name, decorator, exp_call_cls): """Test the 'apply_enhancement' utility function.""" def _enh_func(img): def _calc_func(data): assert isinstance(data, exp_call_cls) return data decorated_func = decorator(_calc_func) return decorated_func(img.data) in_data = getattr(self, input_data_name) exp_data = in_data.values if "bands" not in in_data.coords: exp_data = exp_data[np.newaxis, :, :] run_and_check_enhancement(_enh_func, in_data, exp_data)
[docs] def test_cira_stretch(self): """Test applying the cira_stretch.""" from satpy.enhancements import cira_stretch expected = np.array([[ [np.nan, -7.04045974, -7.04045974, 0.79630132, 0.95947296], [1.05181359, 1.11651012, 1.16635571, 1.20691137, 1.24110186]]]) run_and_check_enhancement(cira_stretch, self.ch1, expected)
[docs] def test_reinhard(self): """Test the reinhard algorithm.""" from satpy.enhancements import reinhard_to_srgb expected = np.array([[[np.nan, 0., 0., 0.93333793, 1.29432402], [1.55428709, 1.76572249, 1.94738635, 2.10848544, 2.25432809]], [[np.nan, 0., 0., 0.93333793, 1.29432402], [1.55428709, 1.76572249, 1.94738635, 2.10848544, 2.25432809]], [[np.nan, 0., 0., 0.93333793, 1.29432402], [1.55428709, 1.76572249, 1.94738635, 2.10848544, 2.25432809]]]) run_and_check_enhancement(reinhard_to_srgb, self.rgb, expected)
[docs] def test_lookup(self): """Test the lookup enhancement function.""" from satpy.enhancements import lookup expected = np.array([[ [0., 0., 0., 0.333333, 0.705882], [1., 1., 1., 1., 1.]]]) lut = np.arange(256.) run_and_check_enhancement(lookup, self.ch1, expected, luts=lut) expected = np.array([[[0., 0., 0., 0.333333, 0.705882], [1., 1., 1., 1., 1.]], [[0., 0., 0., 0.333333, 0.705882], [1., 1., 1., 1., 1.]], [[0., 0., 0., 0.333333, 0.705882], [1., 1., 1., 1., 1.]]]) lut = np.arange(256.) lut = np.vstack((lut, lut, lut)).T run_and_check_enhancement(lookup, self.rgb, expected, luts=lut)
[docs] def test_colorize(self): """Test the colorize enhancement function.""" from trollimage.colormap import brbg from satpy.enhancements import colorize expected = np.array([ [[np.nan, 3.29411723e-01, 3.29411723e-01, 3.21825881e-08, 3.21825881e-08], [3.21825881e-08, 3.21825881e-08, 3.21825881e-08, 3.21825881e-08, 3.21825881e-08]], [[np.nan, 1.88235327e-01, 1.88235327e-01, 2.35294109e-01, 2.35294109e-01], [2.35294109e-01, 2.35294109e-01, 2.35294109e-01, 2.35294109e-01, 2.35294109e-01]], [[np.nan, 1.96078164e-02, 1.96078164e-02, 1.88235281e-01, 1.88235281e-01], [1.88235281e-01, 1.88235281e-01, 1.88235281e-01, 1.88235281e-01, 1.88235281e-01]]]) run_and_check_enhancement(colorize, self.ch1, expected, palettes=brbg)
[docs] def test_palettize(self): """Test the palettize enhancement function.""" from trollimage.colormap import brbg from satpy.enhancements import palettize expected = np.array([[[10, 0, 0, 10, 10], [10, 10, 10, 10, 10]]]) run_and_check_enhancement(palettize, self.ch1, expected, palettes=brbg)
[docs] def test_three_d_effect(self): """Test the three_d_effect enhancement function.""" from satpy.enhancements import three_d_effect expected = np.array([[ [np.nan, np.nan, -389.5, -294.5, 826.5], [np.nan, np.nan, 85.5, 180.5, 1301.5]]]) run_and_check_enhancement(three_d_effect, self.ch1, expected)
[docs] def test_piecewise_linear_stretch(self): """Test the piecewise_linear_stretch enhancement function.""" from satpy.enhancements import piecewise_linear_stretch expected = np.array([[ [np.nan, 0., 0., 0.44378, 0.631734], [0.737562, 0.825041, 0.912521, 1., 1.]]]) run_and_check_enhancement(piecewise_linear_stretch, self.ch2 / 100.0, expected, xp=[0., 25., 55., 100., 255.], fp=[0., 90., 140., 175., 255.], reference_scale_factor=255, )
[docs] def test_btemp_threshold(self): """Test applying the cira_stretch.""" from satpy.enhancements import btemp_threshold expected = np.array([[ [np.nan, 0.946207, 0.892695, 0.839184, 0.785672], [0.73216, 0.595869, 0.158745, -0.278379, -0.715503]]]) run_and_check_enhancement(btemp_threshold, self.ch1, expected, min_in=-200, max_in=500, threshold=350)
[docs] def test_merge_colormaps(self): """Test merging colormaps.""" from trollimage.colormap import Colormap from satpy.enhancements import _merge_colormaps as mcp from satpy.enhancements import create_colormap ret_map = mock.MagicMock() create_colormap_mock = mock.Mock(wraps=create_colormap) cmap1 = Colormap((1, (1., 1., 1.))) kwargs = {'palettes': cmap1} with mock.patch('satpy.enhancements.create_colormap', create_colormap_mock): res = mcp(kwargs) assert res is cmap1 create_colormap_mock.assert_not_called() create_colormap_mock.reset_mock() ret_map.reset_mock() cmap1 = {'colors': 'blues', 'min_value': 0, 'max_value': 1} kwargs = {'palettes': [cmap1]} with mock.patch('satpy.enhancements.create_colormap', create_colormap_mock), \ mock.patch('trollimage.colormap.blues', ret_map): _ = mcp(kwargs) create_colormap_mock.assert_called_once() ret_map.reverse.assert_not_called() ret_map.set_range.assert_called_with(0, 1) create_colormap_mock.reset_mock() ret_map.reset_mock() cmap2 = {'colors': 'blues', 'min_value': 2, 'max_value': 3, 'reverse': True} kwargs = {'palettes': [cmap2]} with mock.patch('trollimage.colormap.blues', ret_map): _ = mcp(kwargs) ret_map.reverse.assert_called_once() ret_map.set_range.assert_called_with(2, 3) create_colormap_mock.reset_mock() ret_map.reset_mock() kwargs = {'palettes': [cmap1, cmap2]} with mock.patch('trollimage.colormap.blues', ret_map): _ = mcp(kwargs) ret_map.__add__.assert_called_once()
[docs] def tearDown(self): """Clean up."""
[docs] @contextlib.contextmanager def closed_named_temp_file(**kwargs): """Named temporary file context manager that closes the file after creation. This helps with Windows systems which can get upset with opening or deleting a file that is already open. """ try: with NamedTemporaryFile(delete=False, **kwargs) as tmp_cmap: yield tmp_cmap.name finally: os.remove(tmp_cmap.name)
[docs] def _write_cmap_to_file(cmap_filename, cmap_data): ext = os.path.splitext(cmap_filename)[1] if ext in (".npy",): np.save(cmap_filename, cmap_data) elif ext in (".npz",): np.savez(cmap_filename, cmap_data) else: np.savetxt(cmap_filename, cmap_data, delimiter=",")
[docs] def _generate_cmap_test_data(color_scale, colormap_mode): cmap_data = np.array([ [1, 0, 0], [1, 1, 0], [1, 1, 1], [0, 0, 1], ], dtype=np.float64) if len(colormap_mode) != 3: _cmap_data = cmap_data cmap_data = np.empty((cmap_data.shape[0], len(colormap_mode)), dtype=np.float64) if colormap_mode.startswith("V") or colormap_mode.endswith("A"): cmap_data[:, 0] = np.array([128, 130, 132, 134]) / 255.0 cmap_data[:, -3:] = _cmap_data if colormap_mode.startswith("V") and colormap_mode.endswith("A"): cmap_data[:, 1] = np.array([128, 130, 132, 134]) / 255.0 if color_scale is None or color_scale == 255: cmap_data = (cmap_data * 255).astype(np.uint8) return cmap_data
[docs] class TestColormapLoading: """Test utilities used with colormaps."""
[docs] @pytest.mark.parametrize("color_scale", [None, 1.0]) @pytest.mark.parametrize("colormap_mode", ["RGB", "VRGB", "VRGBA"]) @pytest.mark.parametrize("extra_kwargs", [ {}, {"min_value": 50, "max_value": 100}, ]) @pytest.mark.parametrize("filename_suffix", [".npy", ".npz", ".csv"]) def test_cmap_from_file(self, color_scale, colormap_mode, extra_kwargs, filename_suffix): """Test that colormaps can be loaded from a binary file.""" # create the colormap file on disk with closed_named_temp_file(suffix=filename_suffix) as cmap_filename: cmap_data = _generate_cmap_test_data(color_scale, colormap_mode) _write_cmap_to_file(cmap_filename, cmap_data) unset_first_value = 128.0 / 255.0 if colormap_mode.startswith("V") else 0.0 unset_last_value = 134.0 / 255.0 if colormap_mode.startswith("V") else 1.0 if (color_scale is None or color_scale == 255) and colormap_mode.startswith("V"): unset_first_value *= 255 unset_last_value *= 255 if "min_value" in extra_kwargs: unset_first_value = extra_kwargs["min_value"] unset_last_value = extra_kwargs["max_value"] first_color = [1.0, 0.0, 0.0] if colormap_mode == "VRGBA": first_color = [128.0 / 255.0] + first_color kwargs1 = {"filename": cmap_filename} kwargs1.update(extra_kwargs) if color_scale is not None: kwargs1["color_scale"] = color_scale cmap = create_colormap(kwargs1) assert cmap.colors.shape[0] == 4 np.testing.assert_equal(cmap.colors[0], first_color) assert cmap.values.shape[0] == 4 assert cmap.values[0] == unset_first_value assert cmap.values[-1] == unset_last_value
[docs] def test_cmap_vrgb_as_rgba(self): """Test that data created as VRGB still reads as RGBA.""" with closed_named_temp_file(suffix=".npy") as cmap_filename: cmap_data = _generate_cmap_test_data(None, "VRGB") np.save(cmap_filename, cmap_data) cmap = create_colormap({'filename': cmap_filename, 'colormap_mode': "RGBA"}) assert cmap.colors.shape[0] == 4 assert cmap.colors.shape[1] == 4 # RGBA np.testing.assert_equal(cmap.colors[0], [128 / 255., 1.0, 0, 0]) assert cmap.values.shape[0] == 4 assert cmap.values[0] == 0 assert cmap.values[-1] == 1.0
[docs] @pytest.mark.parametrize( ("real_mode", "forced_mode"), [ ("VRGBA", "RGBA"), ("VRGBA", "VRGB"), ("RGBA", "RGB"), ] ) @pytest.mark.parametrize("filename_suffix", [".npy", ".csv"]) def test_cmap_bad_mode(self, real_mode, forced_mode, filename_suffix): """Test that reading colormaps with the wrong mode fails.""" with closed_named_temp_file(suffix=filename_suffix) as cmap_filename: cmap_data = _generate_cmap_test_data(None, real_mode) _write_cmap_to_file(cmap_filename, cmap_data) # Force colormap_mode VRGBA to RGBA and we should see an exception with pytest.raises(ValueError): create_colormap({'filename': cmap_filename, 'colormap_mode': forced_mode})
[docs] def test_cmap_from_file_bad_shape(self): """Test that unknown array shape causes an error.""" from satpy.enhancements import create_colormap # create the colormap file on disk with closed_named_temp_file(suffix='.npy') as cmap_filename: np.save(cmap_filename, np.array([ [0], [64], [128], [255], ])) with pytest.raises(ValueError): create_colormap({'filename': cmap_filename})
[docs] def test_cmap_from_config_path(self, tmp_path): """Test loading a colormap relative to a config path.""" import satpy from satpy.enhancements import create_colormap cmap_dir = tmp_path / "colormaps" cmap_dir.mkdir() cmap_filename = cmap_dir / "my_colormap.npy" cmap_data = _generate_cmap_test_data(None, "RGBA") np.save(cmap_filename, cmap_data) with satpy.config.set(config_path=[tmp_path]): rel_cmap_filename = os.path.join("colormaps", "my_colormap.npy") cmap = create_colormap({'filename': rel_cmap_filename, 'colormap_mode': "RGBA"}) assert cmap.colors.shape[0] == 4 assert cmap.colors.shape[1] == 4 # RGBA np.testing.assert_equal(cmap.colors[0], [128 / 255., 1.0, 0, 0]) assert cmap.values.shape[0] == 4 assert cmap.values[0] == 0 assert cmap.values[-1] == 1.0
[docs] def test_cmap_from_trollimage(self): """Test that colormaps in trollimage can be loaded.""" from satpy.enhancements import create_colormap cmap = create_colormap({'colors': 'pubu'}) from trollimage.colormap import pubu np.testing.assert_equal(cmap.colors, pubu.colors) np.testing.assert_equal(cmap.values, pubu.values)
[docs] def test_cmap_no_colormap(self): """Test that being unable to create a colormap raises an error.""" from satpy.enhancements import create_colormap with pytest.raises(ValueError): create_colormap({})
[docs] def test_cmap_list(self): """Test that colors can be a list/tuple.""" from satpy.enhancements import create_colormap colors = [ [0, 0, 1], [1, 0, 1], [0, 1, 1], [1, 1, 1], ] values = [2, 4, 6, 8] cmap = create_colormap({'colors': colors, 'color_scale': 1}) assert cmap.colors.shape[0] == 4 np.testing.assert_equal(cmap.colors[0], [0.0, 0.0, 1.0]) assert cmap.values.shape[0] == 4 assert cmap.values[0] == 0 assert cmap.values[-1] == 1.0 cmap = create_colormap({'colors': colors, 'color_scale': 1, 'values': values}) assert cmap.colors.shape[0] == 4 np.testing.assert_equal(cmap.colors[0], [0.0, 0.0, 1.0]) assert cmap.values.shape[0] == 4 assert cmap.values[0] == 2 assert cmap.values[-1] == 8
[docs] def test_on_separate_bands(): """Test the `on_separate_bands` decorator.""" def func(array, index, gain=2): return xr.DataArray(np.ones(array.shape, dtype=array.dtype) * index * gain, coords=array.coords, dims=array.dims, attrs=array.attrs) separate_func = on_separate_bands(func) arr = xr.DataArray(np.zeros((3, 10, 10)), dims=['bands', 'y', 'x'], coords={"bands": ["R", "G", "B"]}) assert separate_func(arr).shape == arr.shape assert all(separate_func(arr, gain=1).values[:, 0, 0] == [0, 1, 2])
[docs] def test_using_map_blocks(): """Test the `using_map_blocks` decorator.""" def func(np_array, block_info=None): value = block_info[0]['chunk-location'][-1] return np.ones(np_array.shape) * value map_blocked_func = using_map_blocks(func) arr = xr.DataArray(da.zeros((3, 10, 10), dtype=int, chunks=5), dims=['bands', 'y', 'x']) res = map_blocked_func(arr) assert res.shape == arr.shape assert res[0, 0, 0].compute() != res[0, 9, 9].compute()
[docs] def test_on_dask_array(): """Test the `on_dask_array` decorator.""" def func(dask_array): if not isinstance(dask_array, da.core.Array): pytest.fail("Array is not a dask array") return dask_array dask_func = on_dask_array(func) arr = xr.DataArray(da.zeros((3, 10, 10), dtype=int, chunks=5), dims=['bands', 'y', 'x']) res = dask_func(arr) assert res.shape == arr.shape
[docs] @pytest.fixture def fake_area(): """Return a fake 2×2 area.""" from pyresample.geometry import create_area_def return create_area_def("wingertsberg", 4087, area_extent=[-2_000, -2_000, 2_000, 2_000], shape=(2, 2))
_nwcsaf_geo_props = { 'cma_geo': ("geo", "cma", None, 'cma_pal', None, 'cloudmask', 'CMA', "uint8"), 'cma_pps': ("pps", "cma", None, 'cma_pal', None, 'cloudmask', 'CMA', "uint8"), 'cma_extended_pps': ("pps", "cma_extended", None, 'cma_extended_pal', None, 'cloudmask_extended', 'CMA', "uint8"), 'cmaprob_pps': ("pps", "cmaprob", None, 'cmaprob_pal', None, 'cloudmask_probability', 'CMAPROB', "uint8"), 'ct_geo': ("geo", "ct", None, 'ct_pal', None, 'cloudtype', 'CT', "uint8"), 'ct_pps': ("pps", "ct", None, 'ct_pal', None, 'cloudtype', 'CT', "uint8"), 'ctth_alti_geo': ("geo", "ctth_alti", None, 'ctth_alti_pal', None, 'cloud_top_height', 'CTTH', "float64"), 'ctth_alti_pps': ("pps", "ctth_alti", None, 'ctth_alti_pal', "ctth_status_flag", 'cloud_top_height', 'CTTH', "float64"), 'ctth_pres_geo': ("geo", "ctth_pres", None, 'ctth_pres_pal', None, 'cloud_top_pressure', 'CTTH', "float64"), 'ctth_pres_pps': ("pps", "ctth_pres", None, 'ctth_pres_pal', None, 'cloud_top_pressure', 'CTTH', "float64"), 'ctth_tempe_geo': ("geo", "ctth_tempe", None, 'ctth_tempe_pal', None, 'cloud_top_temperature', 'CTTH', "float64"), 'ctth_tempe_pps': ("pps", "ctth_tempe", None, 'ctth_tempe_pal', None, 'cloud_top_temperature', 'CTTH', "float64"), 'cmic_phase_geo': ("geo", "cmic_phase", None, 'cmic_phase_pal', None, 'cloud_top_phase', 'CMIC', "uint8"), 'cmic_phase_pps': ("pps", "cmic_phase", None, 'cmic_phase_pal', "cmic_status_flag", 'cloud_top_phase', 'CMIC', "uint8"), 'cmic_reff_geo': ("geo", "cmic_reff", None, 'cmic_reff_pal', None, 'cloud_drop_effective_radius', 'CMIC', "float64"), 'cmic_reff_pps': ("pps", "cmic_reff", "cmic_cre", 'cmic_cre_pal', "cmic_status_flag", 'cloud_drop_effective_radius', 'CMIC', "float64"), 'cmic_cot_geo': ("geo", "cmic_cot", None, 'cmic_cot_pal', None, 'cloud_optical_thickness', 'CMIC', "float64"), 'cmic_cot_pps': ("pps", "cmic_cot", None, 'cmic_cot_pal', None, 'cloud_optical_thickness', 'CMIC', "float64"), 'cmic_cwp_pps': ("pps", "cmic_cwp", None, 'cmic_cwp_pal', None, 'cloud_water_path', 'CMIC', "float64"), 'cmic_lwp_geo': ("geo", "cmic_lwp", None, 'cmic_lwp_pal', None, 'cloud_liquid_water_path', 'CMIC', "float64"), 'cmic_lwp_pps': ("pps", "cmic_lwp", None, 'cmic_lwp_pal', None, 'liquid_water_path', 'CMIC', "float64"), 'cmic_iwp_geo': ("geo", "cmic_iwp", None, 'cmic_iwp_pal', None, 'cloud_ice_water_path', 'CMIC', "float64"), 'cmic_iwp_pps': ("pps", "cmic_iwp", None, 'cmic_iwp_pal', None, 'ice_water_path', 'CMIC', "float64"), 'pc': ("geo", "pc", None, 'pc_pal', None, 'precipitation_probability', 'PC', "uint8"), 'crr': ("geo", "crr", None, 'crr_pal', None, 'convective_rain_rate', 'CRR', "uint8"), 'crr_accum': ("geo", "crr_accum", None, 'crr_pal', None, 'convective_precipitation_hourly_accumulation', 'CRR', "uint8"), 'ishai_tpw': ("geo", "ishai_tpw", None, 'ishai_tpw_pal', None, 'total_precipitable_water', 'iSHAI', "float64"), 'ishai_shw': ("geo", "ishai_shw", None, 'ishai_shw_pal', None, 'showalter_index', 'iSHAI', "float64"), 'ishai_li': ("geo", "ishai_li", None, 'ishai_li_pal', None, 'lifted_index', 'iSHAI', "float64"), 'ci_prob30': ("geo", "ci_prob30", None, 'ci_pal', None, 'convection_initiation_prob30', 'CI', "float64"), 'ci_prob60': ("geo", "ci_prob60", None, 'ci_pal', None, 'convection_initiation_prob60', 'CI', "float64"), 'ci_prob90': ("geo", "ci_prob90", None, 'ci_pal', None, 'convection_initiation_prob90', 'CI', "float64"), 'asii_turb_trop_prob': ("geo", "asii_turb_trop_prob", None, 'asii_turb_prob_pal', None, 'asii_prob', 'ASII-NG', "float64"), 'MapCellCatType': ("geo", "MapCellCatType", None, 'MapCellCatType_pal', None, 'rdt_cell_type', 'RDT-CW', "uint8"), }
[docs] @pytest.mark.parametrize( "data", ['cma_geo', 'cma_pps', 'cma_extended_pps', 'cmaprob_pps', 'ct_geo', 'ct_pps', 'ctth_alti_geo', 'ctth_alti_pps', 'ctth_pres_geo', 'ctth_pres_pps', 'ctth_tempe_geo', 'ctth_tempe_pps', 'cmic_phase_geo', 'cmic_phase_pps', 'cmic_reff_geo', 'cmic_reff_pps', 'cmic_cot_geo', 'cmic_cot_pps', 'cmic_cwp_pps', 'cmic_lwp_geo', 'cmic_lwp_pps', 'cmic_iwp_geo', 'cmic_iwp_pps', 'pc', 'crr', 'crr_accum', 'ishai_tpw', 'ishai_shw', 'ishai_li', 'ci_prob30', 'ci_prob60', 'ci_prob90', 'asii_turb_trop_prob', 'MapCellCatType'] ) def test_nwcsaf_comps(fake_area, tmp_path, data): """Test loading NWCSAF composites.""" from satpy.writers import get_enhanced_image from ... import Scene (flavour, dvname, altname, palettename, statusname, comp, filelabel, dtp) = _nwcsaf_geo_props[data] rng = (0, 100) if dtp == "uint8" else (-100, 1000) if flavour == "geo": fn = f"S_NWC_{filelabel:s}_MSG2_MSG-N-VISIR_20220124T094500Z.nc" reader = "nwcsaf-geo" id_ = {"satellite_identifier": "MSG4"} else: fn = f"S_NWC_{filelabel:s}_noaa20_00000_20230301T1200213Z_20230301T1201458Z.nc" reader = "nwcsaf-pps_nc" id_ = {"platform": "NOAA-20"} fk = tmp_path / fn # create a minimally fake netCDF file, otherwise satpy won't load the # composite ds = xr.Dataset( coords={"nx": [0], "ny": [0]}, attrs={ "source": "satpy unit test", "time_coverage_start": "0001-01-01T00:00:00Z", "time_coverage_end": "0001-01-01T01:00:00Z", } ) ds.attrs.update(id_) ds.to_netcdf(fk) sc = Scene(filenames=[os.fspath(fk)], reader=[reader]) sc[palettename] = xr.DataArray( da.tile(da.arange(256), [3, 1]).T, dims=("pal02_colors", "pal_RGB")) fake_alti = da.linspace(rng[0], rng[1], 4, chunks=2, dtype=dtp).reshape(2, 2) ancvars = [sc[palettename]] if statusname is not None: sc[statusname] = xr.DataArray( da.zeros(shape=(2, 2), dtype="uint8"), attrs={ "area": fake_area, "_FillValue": 123}, dims=("y", "x")) ancvars.append(sc[statusname]) sc[dvname] = xr.DataArray( fake_alti, dims=("y", "x"), attrs={ "area": fake_area, "scaled_FillValue": 123, "ancillary_variables": ancvars, "valid_range": rng}) def _fake_get_varname(info, info_type="file_key"): return altname or dvname with mock.patch("satpy.readers.nwcsaf_nc.NcNWCSAF._get_varname_in_file") as srnN_: srnN_.side_effect = _fake_get_varname sc.load([comp]) im = get_enhanced_image(sc[comp]) if flavour == "geo": assert im.mode == "P" np.testing.assert_array_equal(im.data.coords["bands"], ["P"]) if dtp == "float64": np.testing.assert_allclose( im.data.sel(bands="P"), ((fake_alti - rng[0]) * (255 / np.ptp(rng))).round()) else: np.testing.assert_allclose(im.data.sel(bands="P"), fake_alti)
[docs] class TestTCREnhancement: """Test the AHI enhancement functions."""
[docs] def setup_method(self): """Create test data.""" data = da.arange(-100, 1000, 110).reshape(2, 5) rgb_data = np.stack([data, data, data]) self.rgb = xr.DataArray(rgb_data, dims=('bands', 'y', 'x'), coords={'bands': ['R', 'G', 'B']}, attrs={'platform_name': 'Himawari-8'})
[docs] def test_jma_true_color_reproduction(self): """Test the jma_true_color_reproduction enhancement.""" from trollimage.xrimage import XRImage from satpy.enhancements import jma_true_color_reproduction expected = [[[-109.93, 10.993, 131.916, 252.839, 373.762], [494.685, 615.608, 736.531, 857.454, 978.377]], [[-97.73, 9.773, 117.276, 224.779, 332.282], [439.785, 547.288, 654.791, 762.294, 869.797]], [[-93.29, 9.329, 111.948, 214.567, 317.186], [419.805, 522.424, 625.043, 727.662, 830.281]]] img = XRImage(self.rgb) jma_true_color_reproduction(img) np.testing.assert_almost_equal(img.data.compute(), expected) self.rgb.attrs['platform_name'] = None img = XRImage(self.rgb) with pytest.raises(ValueError, match="Missing platform name."): jma_true_color_reproduction(img) self.rgb.attrs['platform_name'] = 'Fakesat' img = XRImage(self.rgb) with pytest.raises(KeyError, match="No conversion matrix found for platform Fakesat"): jma_true_color_reproduction(img)