Source code for satpy.tests.scene_tests.test_data_access

# Copyright (c) 2010-2023 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Unit tests for data access methods and properties of the Scene class."""
import math

import numpy as np
import pytest
import xarray as xr
from dask import array as da

from satpy import Scene
from satpy.dataset.dataid import default_id_keys_config
from satpy.tests.utils import FAKE_FILEHANDLER_END, FAKE_FILEHANDLER_START, make_cid, make_dataid

# NOTE:
# The following fixtures are not defined in this file, but are used and injected by Pytest:
# - include_test_etc


[docs] @pytest.mark.usefixtures("include_test_etc") class TestDataAccessMethods: """Test the scene class."""
[docs] @pytest.mark.parametrize( ("reader", "filenames", "exp_sensors"), [ ("fake1", ["fake1_1.txt"], {"fake_sensor"}), (None, {"fake1": ["fake1_1.txt"], "fake2_1ds": ["fake2_1ds_1.txt"]}, {"fake_sensor", "fake_sensor2"}), ] ) def test_sensor_names_readers(self, reader, filenames, exp_sensors): """Test that Scene sensor_names handles different cases properly.""" scene = Scene(reader=reader, filenames=filenames) assert scene.start_time == FAKE_FILEHANDLER_START assert scene.end_time == FAKE_FILEHANDLER_END assert scene.sensor_names == exp_sensors
[docs] @pytest.mark.parametrize( ("include_reader", "added_sensor", "exp_sensors"), [ (False, "my_sensor", {"my_sensor"}), (True, "my_sensor", {"my_sensor", "fake_sensor"}), (False, {"my_sensor"}, {"my_sensor"}), (True, {"my_sensor"}, {"my_sensor", "fake_sensor"}), (False, {"my_sensor1", "my_sensor2"}, {"my_sensor1", "my_sensor2"}), (True, {"my_sensor1", "my_sensor2"}, {"my_sensor1", "my_sensor2", "fake_sensor"}), ] ) def test_sensor_names_added_datasets(self, include_reader, added_sensor, exp_sensors): """Test that Scene sensor_names handles contained sensors properly.""" if include_reader: scene = Scene(reader="fake1", filenames=["fake1_1.txt"]) else: scene = Scene() scene["my_ds"] = xr.DataArray([], attrs={"sensor": added_sensor}) assert scene.sensor_names == exp_sensors
[docs] def test_iter(self): """Test iteration over the scene.""" scene = Scene() scene["1"] = xr.DataArray(np.arange(5)) scene["2"] = xr.DataArray(np.arange(5)) scene["3"] = xr.DataArray(np.arange(5)) for x in scene: assert isinstance(x, xr.DataArray)
[docs] def test_iter_by_area_swath(self): """Test iterating by area on a swath.""" from pyresample.geometry import SwathDefinition scene = Scene() sd = SwathDefinition(lons=np.arange(5), lats=np.arange(5)) scene["1"] = xr.DataArray(np.arange(5), attrs={"area": sd}) scene["2"] = xr.DataArray(np.arange(5), attrs={"area": sd}) scene["3"] = xr.DataArray(np.arange(5)) for area_obj, ds_list in scene.iter_by_area(): ds_list_names = set(ds["name"] for ds in ds_list) if area_obj is sd: assert ds_list_names == {"1", "2"} else: assert area_obj is None assert ds_list_names == {"3"}
[docs] def test_bad_setitem(self): """Test setting an item wrongly.""" scene = Scene() with pytest.raises(ValueError, match="Key must be a DataID when value is not an xarray DataArray or dict"): scene.__setitem__("1", np.arange(5))
[docs] def test_setitem(self): """Test setting an item.""" from satpy.tests.utils import make_dataid scene = Scene() scene["1"] = ds1 = xr.DataArray(np.arange(5)) expected_id = make_cid(**ds1.attrs) assert set(scene._datasets.keys()) == {expected_id} assert set(scene._wishlist) == {expected_id} did = make_dataid(name="oranges") scene[did] = ds1 assert "oranges" in scene nparray = np.arange(5*5).reshape(5, 5) with pytest.raises(ValueError, match="Key must be a DataID when value is not an xarray DataArray or dict"): scene["apples"] = nparray assert "apples" not in scene did = make_dataid(name="apples") scene[did] = nparray assert "apples" in scene
[docs] def test_getitem(self): """Test __getitem__ with names only.""" scene = Scene() scene["1"] = ds1 = xr.DataArray(np.arange(5)) scene["2"] = ds2 = xr.DataArray(np.arange(5)) scene["3"] = ds3 = xr.DataArray(np.arange(5)) assert scene["1"] is ds1 assert scene["2"] is ds2 assert scene["3"] is ds3 pytest.raises(KeyError, scene.__getitem__, "4") assert scene.get("3") is ds3 assert scene.get("4") is None
[docs] def test_getitem_modifiers(self): """Test __getitem__ with names and modifiers.""" # Return least modified item scene = Scene() scene["1"] = ds1_m0 = xr.DataArray(np.arange(5)) scene[make_dataid(name="1", modifiers=("mod1",)) ] = xr.DataArray(np.arange(5)) assert scene["1"] is ds1_m0 assert len(list(scene.keys())) == 2 scene = Scene() scene["1"] = ds1_m0 = xr.DataArray(np.arange(5)) scene[make_dataid(name="1", modifiers=("mod1",)) ] = xr.DataArray(np.arange(5)) scene[make_dataid(name="1", modifiers=("mod1", "mod2")) ] = xr.DataArray(np.arange(5)) assert scene["1"] is ds1_m0 assert len(list(scene.keys())) == 3 scene = Scene() scene[make_dataid(name="1", modifiers=("mod1", "mod2")) ] = ds1_m2 = xr.DataArray(np.arange(5)) scene[make_dataid(name="1", modifiers=("mod1",)) ] = ds1_m1 = xr.DataArray(np.arange(5)) assert scene["1"] is ds1_m1 assert scene[make_dataid(name="1", modifiers=("mod1", "mod2"))] is ds1_m2 pytest.raises(KeyError, scene.__getitem__, make_dataid(name="1", modifiers=tuple())) assert len(list(scene.keys())) == 2
[docs] def test_getitem_slices(self): """Test __getitem__ with slices.""" from pyresample.geometry import AreaDefinition, SwathDefinition from pyresample.utils import proj4_str_to_dict scene1 = Scene() scene2 = Scene() proj_dict = proj4_str_to_dict("+proj=lcc +datum=WGS84 +ellps=WGS84 " "+lon_0=-95. +lat_0=25 +lat_1=25 " "+units=m +no_defs") area_def = AreaDefinition( "test", "test", "test", proj_dict, 200, 400, (-1000., -1500., 1000., 1500.), ) swath_def = SwathDefinition(lons=np.zeros((5, 10)), lats=np.zeros((5, 10))) scene1["1"] = scene2["1"] = xr.DataArray(np.zeros((5, 10))) scene1["2"] = scene2["2"] = xr.DataArray(np.zeros((5, 10)), dims=("y", "x")) scene1["3"] = xr.DataArray(np.zeros((5, 10)), dims=("y", "x"), attrs={"area": area_def}) anc_vars = [xr.DataArray(np.ones((5, 10)), attrs={"name": "anc_var", "area": area_def})] attrs = {"ancillary_variables": anc_vars, "area": area_def} scene1["3a"] = xr.DataArray(np.zeros((5, 10)), dims=("y", "x"), attrs=attrs) scene2["4"] = xr.DataArray(np.zeros((5, 10)), dims=("y", "x"), attrs={"area": swath_def}) anc_vars = [xr.DataArray(np.ones((5, 10)), attrs={"name": "anc_var", "area": swath_def})] attrs = {"ancillary_variables": anc_vars, "area": swath_def} scene2["4a"] = xr.DataArray(np.zeros((5, 10)), dims=("y", "x"), attrs=attrs) new_scn1 = scene1[2:5, 2:8] new_scn2 = scene2[2:5, 2:8] for new_scn in [new_scn1, new_scn2]: # datasets without an area don't get sliced assert new_scn["1"].shape == (5, 10) assert new_scn["2"].shape == (5, 10) assert new_scn1["3"].shape == (3, 6) assert "area" in new_scn1["3"].attrs assert new_scn1["3"].attrs["area"].shape == (3, 6) assert new_scn1["3a"].shape == (3, 6) a_var = new_scn1["3a"].attrs["ancillary_variables"][0] assert a_var.shape == (3, 6) assert new_scn2["4"].shape == (3, 6) assert "area" in new_scn2["4"].attrs assert new_scn2["4"].attrs["area"].shape == (3, 6) assert new_scn2["4a"].shape == (3, 6) a_var = new_scn2["4a"].attrs["ancillary_variables"][0] assert a_var.shape == (3, 6)
[docs] def test_contains(self): """Test contains.""" scene = Scene() scene["1"] = xr.DataArray(np.arange(5), attrs={"wavelength": (0.1, 0.2, 0.3), "_satpy_id_keys": default_id_keys_config}) assert "1" in scene assert 0.15 in scene assert "2" not in scene assert 0.31 not in scene scene = Scene() scene["blueberry"] = xr.DataArray(np.arange(5)) scene["blackberry"] = xr.DataArray(np.arange(5)) scene["strawberry"] = xr.DataArray(np.arange(5)) scene["raspberry"] = xr.DataArray(np.arange(5)) # deepcode ignore replace~keys~list~compare: This is on purpose assert make_cid(name="blueberry") in scene.keys() assert make_cid(name="blueberry") in scene assert "blueberry" in scene assert "blueberry" not in scene.keys()
[docs] def test_delitem(self): """Test deleting an item.""" scene = Scene() scene["1"] = xr.DataArray(np.arange(5), attrs={"wavelength": (0.1, 0.2, 0.3), "_satpy_id_keys": default_id_keys_config}) scene["2"] = xr.DataArray(np.arange(5), attrs={"wavelength": (0.4, 0.5, 0.6), "_satpy_id_keys": default_id_keys_config}) scene["3"] = xr.DataArray(np.arange(5), attrs={"wavelength": (0.7, 0.8, 0.9), "_satpy_id_keys": default_id_keys_config}) del scene["1"] del scene["3"] del scene[0.45] assert not scene._wishlist assert not list(scene._datasets.keys()) pytest.raises(KeyError, scene.__delitem__, 0.2)
[docs] def _create_coarest_finest_data_array(shape, area_def, attrs=None): data_arr = xr.DataArray( da.arange(math.prod(shape)).reshape(shape), attrs={ "area": area_def, }) if attrs: data_arr.attrs.update(attrs) return data_arr
[docs] def _create_coarsest_finest_area_def(shape, extents): from pyresample import AreaDefinition proj_str = "+proj=lcc +datum=WGS84 +ellps=WGS84 +lon_0=-95. +lat_0=25 +lat_1=25 +units=m +no_defs" area_def = AreaDefinition( "test", "test", "test", proj_str, shape[1], shape[0], extents, ) return area_def
[docs] def _create_coarsest_finest_swath_def(shape, extents, name_suffix): from pyresample import SwathDefinition if len(shape) == 1: lons_arr = da.linspace(extents[0], extents[2], shape[0], dtype=np.float32) lats_arr = da.linspace(extents[1], extents[3], shape[0], dtype=np.float32) else: lons_arr = da.repeat(da.linspace(extents[0], extents[2], shape[1], dtype=np.float32)[None, :], shape[0], axis=0) lats_arr = da.repeat(da.linspace(extents[1], extents[3], shape[0], dtype=np.float32)[:, None], shape[1], axis=1) lons_data_arr = xr.DataArray(lons_arr, attrs={"name": f"longitude{name_suffix}"}) lats_data_arr = xr.DataArray(lats_arr, attrs={"name": f"latitude1{name_suffix}"}) return SwathDefinition(lons_data_arr, lats_data_arr)
[docs] class TestFinestCoarsestArea: """Test the Scene logic for finding the finest and coarsest area."""
[docs] @pytest.mark.parametrize( ("coarse_area", "fine_area"), [ (_create_coarsest_finest_area_def((2, 5), (1000.0, 1500.0, -1000.0, -1500.0)), _create_coarsest_finest_area_def((4, 10), (1000.0, 1500.0, -1000.0, -1500.0))), (_create_coarsest_finest_area_def((2, 5), (-1000.0, -1500.0, 1000.0, 1500.0)), _create_coarsest_finest_area_def((4, 10), (-1000.0, -1500.0, 1000.0, 1500.0))), (_create_coarsest_finest_swath_def((2, 5), (1000.0, 1500.0, -1000.0, -1500.0), "1"), _create_coarsest_finest_swath_def((4, 10), (1000.0, 1500.0, -1000.0, -1500.0), "1")), (_create_coarsest_finest_swath_def((5,), (1000.0, 1500.0, -1000.0, -1500.0), "1"), _create_coarsest_finest_swath_def((10,), (1000.0, 1500.0, -1000.0, -1500.0), "1")), ] ) def test_coarsest_finest_area_different_shape(self, coarse_area, fine_area): """Test 'coarsest_area' and 'finest_area' methods for upright areas.""" ds1 = _create_coarest_finest_data_array(coarse_area.shape, coarse_area, {"wavelength": (0.1, 0.2, 0.3)}) ds2 = _create_coarest_finest_data_array(fine_area.shape, fine_area, {"wavelength": (0.4, 0.5, 0.6)}) ds3 = _create_coarest_finest_data_array(fine_area.shape, fine_area, {"wavelength": (0.7, 0.8, 0.9)}) scn = Scene() scn["1"] = ds1 scn["2"] = ds2 scn["3"] = ds3 assert scn.coarsest_area() is coarse_area assert scn.finest_area() is fine_area assert scn.coarsest_area(["2", "3"]) is fine_area
[docs] @pytest.mark.parametrize( ("area_def", "shifted_area"), [ (_create_coarsest_finest_area_def((2, 5), (-1000.0, -1500.0, 1000.0, 1500.0)), _create_coarsest_finest_area_def((2, 5), (-900.0, -1400.0, 1100.0, 1600.0))), (_create_coarsest_finest_swath_def((2, 5), (-1000.0, -1500.0, 1000.0, 1500.0), "1"), _create_coarsest_finest_swath_def((2, 5), (-900.0, -1400.0, 1100.0, 1600.0), "2")), ], ) def test_coarsest_finest_area_same_shape(self, area_def, shifted_area): """Test that two areas with the same shape are consistently returned. If two geometries (ex. two AreaDefinitions or two SwathDefinitions) have the same resolution (shape) but different coordinates, which one has the finer resolution would ultimately be determined by the semi-random ordering of the internal container of the Scene (a dict) if only pixel resolution was compared. This test makes sure that it is always the same object returned. """ ds1 = _create_coarest_finest_data_array(area_def.shape, area_def) ds2 = _create_coarest_finest_data_array(area_def.shape, shifted_area) scn = Scene() scn["ds1"] = ds1 scn["ds2"] = ds2 course_area1 = scn.coarsest_area() scn = Scene() scn["ds2"] = ds2 scn["ds1"] = ds1 coarse_area2 = scn.coarsest_area() # doesn't matter what order they were added, this should be the same area assert coarse_area2 is course_area1
[docs] @pytest.mark.usefixtures("include_test_etc") class TestComputePersist: """Test methods that compute the internal data in some way."""
[docs] def test_compute_pass_through(self): """Test pass through of xarray compute.""" import numpy as np scene = Scene(filenames=["fake1_1.txt"], reader="fake1") scene.load(["ds1"]) scene = scene.compute() assert isinstance(scene["ds1"].data, np.ndarray)
[docs] def test_persist_pass_through(self): """Test pass through of xarray persist.""" from dask.array.utils import assert_eq scene = Scene(filenames=["fake1_1.txt"], reader="fake1") scene.load(["ds1"]) scenep = scene.persist() assert_eq(scene["ds1"].data, scenep["ds1"].data) assert set(scenep["ds1"].data.dask).issubset(scene["ds1"].data.dask) assert len(scenep["ds1"].data.dask) == scenep["ds1"].data.npartitions
[docs] def test_chunk_pass_through(self): """Test pass through of xarray chunk.""" scene = Scene(filenames=["fake1_1.txt"], reader="fake1") scene.load(["ds1"]) scene = scene.chunk(chunks=2) assert scene["ds1"].data.chunksize == (2, 2)