#!/usr/bin/python
# Copyright (c) 2015 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Test generic writer functions."""
from __future__ import annotations
import datetime as dt
import os
import pathlib
import shutil
import warnings
from unittest import mock
import dask.array as da
import numpy as np
import pytest
import xarray as xr
from trollimage.colormap import greys
from satpy.writers import ImageWriter
[docs]
class TestWritersModule:
"""Test the writers module."""
[docs]
def test_to_image_1d(self):
"""Conversion to image."""
from satpy.writers import to_image
p = xr.DataArray(np.arange(25), dims=["y"])
with pytest.raises(ValueError, match="Need at least a 2D array to make an image."):
to_image(p)
[docs]
@mock.patch("satpy.writers.XRImage")
def test_to_image_2d(self, mock_geoimage):
"""Conversion to image."""
from satpy.writers import to_image
data = np.arange(25).reshape((5, 5))
p = xr.DataArray(data, attrs=dict(mode="L", fill_value=0,
palette=[0, 1, 2, 3, 4, 5]),
dims=["y", "x"])
to_image(p)
np.testing.assert_array_equal(
data, mock_geoimage.call_args[0][0].values)
mock_geoimage.reset_mock()
[docs]
@mock.patch("satpy.writers.XRImage")
def test_to_image_3d(self, mock_geoimage):
"""Conversion to image."""
from satpy.writers import to_image
data = np.arange(75).reshape((3, 5, 5))
p = xr.DataArray(data, dims=["bands", "y", "x"])
p["bands"] = ["R", "G", "B"]
to_image(p)
np.testing.assert_array_equal(data[0], mock_geoimage.call_args[0][0][0])
np.testing.assert_array_equal(data[1], mock_geoimage.call_args[0][0][1])
np.testing.assert_array_equal(data[2], mock_geoimage.call_args[0][0][2])
[docs]
@mock.patch("satpy.writers.get_enhanced_image")
def test_show(self, mock_get_image):
"""Check showing."""
from satpy.writers import show
data = np.arange(25).reshape((5, 5))
p = xr.DataArray(data, dims=["y", "x"])
show(p)
assert mock_get_image.return_value.show.called
[docs]
class TestEnhancer:
"""Test basic `Enhancer` functionality with builtin configs."""
[docs]
def test_basic_init_no_args(self):
"""Test Enhancer init with no arguments passed."""
from satpy.writers import Enhancer
e = Enhancer()
assert e.enhancement_tree is not None
[docs]
def test_basic_init_no_enh(self):
"""Test Enhancer init requesting no enhancements."""
from satpy.writers import Enhancer
e = Enhancer(enhancement_config_file=False)
assert e.enhancement_tree is None
[docs]
def test_basic_init_provided_enh(self):
"""Test Enhancer init with string enhancement configs."""
from satpy.writers import Enhancer
e = Enhancer(enhancement_config_file=["""enhancements:
enh1:
standard_name: toa_bidirectional_reflectance
operations:
- name: stretch
method: !!python/name:satpy.enhancements.stretch
kwargs: {stretch: linear}
"""])
assert e.enhancement_tree is not None
[docs]
def test_init_nonexistent_enh_file(self):
"""Test Enhancer init with a nonexistent enhancement configuration file."""
from satpy.writers import Enhancer
with pytest.raises(ValueError, match="YAML file doesn't exist or string is not YAML dict:.*"):
Enhancer(enhancement_config_file="is_not_a_valid_filename_?.yaml")
[docs]
class _CustomImageWriter(ImageWriter):
def __init__(self, **kwargs):
super().__init__(name="test", config_files=[], **kwargs)
self.img = None
[docs]
def save_image(self, img, **kwargs):
self.img = img
[docs]
class _BaseCustomEnhancementConfigTests:
TEST_CONFIGS: dict[str, str] = {}
[docs]
@pytest.fixture(scope="class", autouse=True)
def test_configs_path(self, tmp_path_factory):
"""Create test enhancement configuration files in a temporary directory.
The root temporary directory is changed to and returned.
"""
prev_cwd = pathlib.Path.cwd()
tmp_path = tmp_path_factory.mktemp("config")
os.chdir(tmp_path)
for fn, content in self.TEST_CONFIGS.items():
config_rel_dir = os.path.dirname(fn)
if config_rel_dir:
os.makedirs(config_rel_dir, exist_ok=True)
with open(fn, "w") as f:
f.write(content)
try:
yield tmp_path
finally:
os.chdir(prev_cwd)
[docs]
class TestComplexSensorEnhancerConfigs(_BaseCustomEnhancementConfigTests):
"""Test enhancement configs that use or expect multiple sensors."""
ENH_FN = "test_sensor1.yaml"
ENH_FN2 = "test_sensor2.yaml"
TEST_CONFIGS = {
ENH_FN: """
enhancements:
test1_sensor1_specific:
name: test1
sensor: test_sensor1
operations:
- name: stretch
method: !!python/name:satpy.enhancements.stretch
kwargs: {stretch: crude, min_stretch: 0, max_stretch: 200}
""",
ENH_FN2: """
enhancements:
default:
operations:
- name: stretch
method: !!python/name:satpy.enhancements.stretch
kwargs: {stretch: crude, min_stretch: 0, max_stretch: 100}
test1_sensor2_specific:
name: test1
sensor: test_sensor2
operations:
- name: stretch
method: !!python/name:satpy.enhancements.stretch
kwargs: {stretch: crude, min_stretch: 0, max_stretch: 50}
exact_multisensor_comp:
name: my_comp
sensor: [test_sensor1, test_sensor2]
operations:
- name: stretch
method: !!python/name:satpy.enhancements.stretch
kwargs: {stretch: crude, min_stretch: 0, max_stretch: 20}
""",
}
[docs]
def test_multisensor_choice(self, test_configs_path):
"""Test that a DataArray with two sensors works."""
from xarray import DataArray
from satpy.writers import Enhancer, get_enhanced_image
ds = DataArray(np.arange(1, 11.).reshape((2, 5)),
attrs={
"name": "test1",
"sensor": {"test_sensor2", "test_sensor1"},
"mode": "L"
},
dims=["y", "x"])
e = Enhancer()
assert e.enhancement_tree is not None
img = get_enhanced_image(ds, enhance=e)
# make sure that both sensor configs were loaded
assert (set(pathlib.Path(config) for config in e.sensor_enhancement_configs) ==
{test_configs_path / self.ENH_FN,
test_configs_path / self.ENH_FN2})
# test_sensor1 config should have been used because it is
# alphabetically first
np.testing.assert_allclose(img.data.values[0], ds.data / 200.0)
[docs]
def test_multisensor_exact(self, test_configs_path):
"""Test that a DataArray with two sensors can match exactly."""
from xarray import DataArray
from satpy.writers import Enhancer, get_enhanced_image
ds = DataArray(np.arange(1, 11.).reshape((2, 5)),
attrs={
"name": "my_comp",
"sensor": {"test_sensor2", "test_sensor1"},
"mode": "L"
},
dims=["y", "x"])
e = Enhancer()
assert e.enhancement_tree is not None
img = get_enhanced_image(ds, enhance=e)
# make sure that both sensor configs were loaded
assert (set(pathlib.Path(config) for config in e.sensor_enhancement_configs) ==
{test_configs_path / self.ENH_FN,
test_configs_path / self.ENH_FN2})
# test_sensor1 config should have been used because it is
# alphabetically first
np.testing.assert_allclose(img.data.values[0], ds.data / 20.0)
[docs]
def test_enhance_bad_query_value(self):
"""Test Enhancer doesn't fail when query includes bad values."""
from xarray import DataArray
from satpy.writers import Enhancer, get_enhanced_image
ds = DataArray(np.arange(1, 11.).reshape((2, 5)),
attrs=dict(name=["I", "am", "invalid"], sensor="test_sensor2", mode="L"),
dims=["y", "x"])
e = Enhancer()
assert e.enhancement_tree is not None
with pytest.raises(KeyError, match="No .* found for None"):
get_enhanced_image(ds, enhance=e)
[docs]
class TestEnhancerUserConfigs(_BaseCustomEnhancementConfigTests):
"""Test `Enhancer` functionality when user's custom configurations are present."""
ENH_FN = "test_sensor.yaml"
ENH_ENH_FN = os.path.join("enhancements", ENH_FN)
ENH_FN2 = "test_sensor2.yaml"
ENH_ENH_FN2 = os.path.join("enhancements", ENH_FN2)
ENH_FN3 = "test_empty.yaml"
TEST_CONFIGS = {
ENH_FN: """
enhancements:
test1_default:
name: test1
operations:
- name: stretch
method: !!python/name:satpy.enhancements.stretch
kwargs: {stretch: linear, cutoffs: [0., 0.]}
""",
ENH_ENH_FN: """
enhancements:
test1_kelvin:
name: test1
units: kelvin
operations:
- name: stretch
method: !!python/name:satpy.enhancements.stretch
kwargs: {stretch: crude, min_stretch: 0, max_stretch: 20}
""",
ENH_FN2: """
""",
ENH_ENH_FN2: """
""",
ENH_FN3: """""",
}
[docs]
def test_enhance_empty_config(self, test_configs_path):
"""Test Enhancer doesn't fail with empty enhancement file."""
from xarray import DataArray
from satpy.writers import Enhancer, get_enhanced_image
ds = DataArray(np.arange(1, 11.).reshape((2, 5)),
attrs=dict(sensor="test_empty", mode="L"),
dims=["y", "x"])
e = Enhancer()
assert e.enhancement_tree is not None
get_enhanced_image(ds, enhance=e)
assert (set(pathlib.Path(config) for config in e.sensor_enhancement_configs) ==
{test_configs_path / self.ENH_FN3})
[docs]
def test_enhance_with_sensor_no_entry(self, test_configs_path):
"""Test enhancing an image that has no configuration sections."""
from xarray import DataArray
from satpy.writers import Enhancer, get_enhanced_image
ds = DataArray(np.arange(1, 11.).reshape((2, 5)),
attrs=dict(sensor="test_sensor2", mode="L"),
dims=["y", "x"])
e = Enhancer()
assert e.enhancement_tree is not None
get_enhanced_image(ds, enhance=e)
assert (set(pathlib.Path(config) for config in e.sensor_enhancement_configs) ==
{test_configs_path / self.ENH_FN2,
test_configs_path / self.ENH_ENH_FN2})
[docs]
def test_no_enhance(self):
"""Test turning off enhancements."""
from xarray import DataArray
from satpy.writers import get_enhanced_image
ds = DataArray(np.arange(1, 11.).reshape((2, 5)),
attrs=dict(name="test1", sensor="test_sensor", mode="L"),
dims=["y", "x"])
img = get_enhanced_image(ds, enhance=False)
np.testing.assert_allclose(img.data.data.compute().squeeze(), ds.data)
[docs]
def test_writer_no_enhance(self):
"""Test turning off enhancements with writer."""
from xarray import DataArray
ds = DataArray(np.arange(1, 11.).reshape((2, 5)),
attrs=dict(name="test1", sensor="test_sensor", mode="L"),
dims=["y", "x"])
writer = _CustomImageWriter(enhance=False)
writer.save_datasets((ds,), compute=False)
img = writer.img
np.testing.assert_allclose(img.data.data.compute().squeeze(), ds.data)
[docs]
def test_writer_custom_enhance(self):
"""Test using custom enhancements with writer."""
from xarray import DataArray
from satpy.writers import Enhancer
ds = DataArray(np.arange(1, 11.).reshape((2, 5)),
attrs=dict(name="test1", sensor="test_sensor", mode="L"),
dims=["y", "x"])
enhance = Enhancer()
writer = _CustomImageWriter(enhance=enhance)
writer.save_datasets((ds,), compute=False)
img = writer.img
np.testing.assert_almost_equal(img.data.isel(bands=0).max().values, 1.)
[docs]
def test_enhance_with_sensor_entry(self, test_configs_path):
"""Test enhancing an image with a configuration section."""
from xarray import DataArray
from satpy.writers import Enhancer, get_enhanced_image
ds = DataArray(np.arange(1, 11.).reshape((2, 5)),
attrs=dict(name="test1", sensor="test_sensor", mode="L"),
dims=["y", "x"])
e = Enhancer()
assert e.enhancement_tree is not None
img = get_enhanced_image(ds, enhance=e)
assert (set(pathlib.Path(config) for config in e.sensor_enhancement_configs) ==
{test_configs_path / self.ENH_FN,
test_configs_path / self.ENH_ENH_FN})
np.testing.assert_almost_equal(img.data.isel(bands=0).max().values,
1.)
ds = DataArray(da.arange(1, 11., chunks=5).reshape((2, 5)),
attrs=dict(name="test1", sensor="test_sensor", mode="L"),
dims=["y", "x"])
e = Enhancer()
assert e.enhancement_tree is not None
img = get_enhanced_image(ds, enhance=e)
assert (set(pathlib.Path(config) for config in e.sensor_enhancement_configs) ==
{test_configs_path / self.ENH_FN,
test_configs_path / self.ENH_ENH_FN})
np.testing.assert_almost_equal(img.data.isel(bands=0).max().values, 1.)
[docs]
def test_enhance_with_sensor_entry2(self, test_configs_path):
"""Test enhancing an image with a more detailed configuration section."""
from xarray import DataArray
from satpy.writers import Enhancer, get_enhanced_image
ds = DataArray(np.arange(1, 11.).reshape((2, 5)),
attrs=dict(name="test1", units="kelvin",
sensor="test_sensor", mode="L"),
dims=["y", "x"])
e = Enhancer()
assert e.enhancement_tree is not None
img = get_enhanced_image(ds, enhance=e)
assert (set(pathlib.Path(config) for config in e.sensor_enhancement_configs) ==
{test_configs_path / self.ENH_FN,
test_configs_path / self.ENH_ENH_FN})
np.testing.assert_almost_equal(img.data.isel(bands=0).max().values, 0.5)
[docs]
class TestReaderEnhancerConfigs(_BaseCustomEnhancementConfigTests):
"""Test enhancement configs that use reader name."""
ENH_FN = "test_sensor1.yaml"
# NOTE: The sections are ordered in a special way so that if 'reader' key
# isn't provided that we'll get the section we didn't want and all tests
# will fail. Otherwise the correct sections get chosen just by the order
# of how they are added to the decision tree.
TEST_CONFIGS = {
ENH_FN: """
enhancements:
default_reader2:
reader: reader2
operations:
- name: stretch
method: !!python/name:satpy.enhancements.stretch
kwargs: {stretch: crude, min_stretch: 0, max_stretch: 75}
default:
operations:
- name: stretch
method: !!python/name:satpy.enhancements.stretch
kwargs: {stretch: crude, min_stretch: 0, max_stretch: 100}
test1_reader2_specific:
name: test1
reader: reader2
operations:
- name: stretch
method: !!python/name:satpy.enhancements.stretch
kwargs: {stretch: crude, min_stretch: 0, max_stretch: 50}
test1_reader1_specific:
name: test1
reader: reader1
operations:
- name: stretch
method: !!python/name:satpy.enhancements.stretch
kwargs: {stretch: crude, min_stretch: 0, max_stretch: 200}
""",
}
[docs]
def _get_test_data_array(self):
from xarray import DataArray
ds = DataArray(np.arange(1, 11.).reshape((2, 5)),
attrs={
"name": "test1",
"sensor": "test_sensor1",
"mode": "L",
},
dims=["y", "x"])
return ds
[docs]
def _get_enhanced_image(self, data_arr, test_configs_path):
from satpy.writers import Enhancer, get_enhanced_image
e = Enhancer()
assert e.enhancement_tree is not None
img = get_enhanced_image(data_arr, enhance=e)
# make sure that both configs were loaded
assert (set(pathlib.Path(config) for config in e.sensor_enhancement_configs) ==
{test_configs_path / self.ENH_FN})
return img
[docs]
def test_no_reader(self, test_configs_path):
"""Test that a DataArray with no 'reader' metadata works."""
data_arr = self._get_test_data_array()
img = self._get_enhanced_image(data_arr, test_configs_path)
# no reader available, should use default no specified reader
np.testing.assert_allclose(img.data.values[0], data_arr.data / 100.0)
[docs]
def test_no_matching_reader(self, test_configs_path):
"""Test that a DataArray with no matching 'reader' works."""
data_arr = self._get_test_data_array()
data_arr.attrs["reader"] = "reader3"
img = self._get_enhanced_image(data_arr, test_configs_path)
# no reader available, should use default no specified reader
np.testing.assert_allclose(img.data.values[0], data_arr.data / 100.0)
[docs]
def test_only_reader_matches(self, test_configs_path):
"""Test that a DataArray with only a matching 'reader' works."""
data_arr = self._get_test_data_array()
data_arr.attrs["reader"] = "reader2"
data_arr.attrs["name"] = "not_configured"
img = self._get_enhanced_image(data_arr, test_configs_path)
# no reader available, should use default no specified reader
np.testing.assert_allclose(img.data.values[0], data_arr.data / 75.0)
[docs]
def test_reader_and_name_match(self, test_configs_path):
"""Test that a DataArray with a matching 'reader' and 'name' works."""
data_arr = self._get_test_data_array()
data_arr.attrs["reader"] = "reader2"
img = self._get_enhanced_image(data_arr, test_configs_path)
# no reader available, should use default no specified reader
np.testing.assert_allclose(img.data.values[0], data_arr.data / 50.0)
[docs]
class TestYAMLFiles:
"""Test and analyze the writer configuration files."""
[docs]
def test_filename_matches_writer_name(self):
"""Test that every writer filename matches the name in the YAML."""
import yaml
class IgnoreLoader(yaml.SafeLoader):
def _ignore_all_tags(self, tag_suffix, node):
return tag_suffix + " " + node.value
IgnoreLoader.add_multi_constructor("", IgnoreLoader._ignore_all_tags)
from satpy._config import glob_config
from satpy.writers import read_writer_config
for writer_config in glob_config("writers/*.yaml"):
writer_fn = os.path.basename(writer_config)
writer_fn_name = os.path.splitext(writer_fn)[0]
writer_info = read_writer_config([writer_config],
loader=IgnoreLoader)
assert writer_fn_name == writer_info["name"]
[docs]
def test_available_writers(self):
"""Test the 'available_writers' function."""
from satpy import available_writers
writer_names = available_writers()
assert len(writer_names) > 0
assert isinstance(writer_names[0], str)
assert "geotiff" in writer_names
writer_infos = available_writers(as_dict=True)
assert len(writer_names) == len(writer_infos)
assert isinstance(writer_infos[0], dict)
for writer_info in writer_infos:
assert "name" in writer_info
[docs]
class TestComputeWriterResults:
"""Test compute_writer_results()."""
[docs]
def setup_method(self):
"""Create temporary directory to save files to and a mock scene."""
import tempfile
from pyresample.geometry import AreaDefinition
from satpy.scene import Scene
adef = AreaDefinition(
"test", "test", "test", "EPSG:4326",
100, 200, (-180., -90., 180., 90.),
)
ds1 = xr.DataArray(
da.zeros((100, 200), chunks=50),
dims=("y", "x"),
attrs={"name": "test",
"start_time": dt.datetime(2018, 1, 1, 0, 0, 0),
"area": adef}
)
self.scn = Scene()
self.scn["test"] = ds1
# Temp dir
self.base_dir = tempfile.mkdtemp()
[docs]
def teardown_method(self):
"""Remove the temporary directory created for a test."""
try:
shutil.rmtree(self.base_dir, ignore_errors=True)
except OSError:
pass
[docs]
def test_empty(self):
"""Test empty result list."""
from satpy.writers import compute_writer_results
compute_writer_results([])
[docs]
def test_simple_image(self):
"""Test writing to PNG file."""
from satpy.writers import compute_writer_results
fname = os.path.join(self.base_dir, "simple_image.png")
res = self.scn.save_datasets(filename=fname,
datasets=["test"],
writer="simple_image",
compute=False)
compute_writer_results([res])
assert os.path.isfile(fname)
[docs]
def test_geotiff(self):
"""Test writing to mitiff file."""
from satpy.writers import compute_writer_results
fname = os.path.join(self.base_dir, "geotiff.tif")
res = self.scn.save_datasets(filename=fname,
datasets=["test"],
writer="geotiff", compute=False)
compute_writer_results([res])
assert os.path.isfile(fname)
[docs]
def test_multiple_geotiff(self):
"""Test writing to mitiff file."""
from satpy.writers import compute_writer_results
fname1 = os.path.join(self.base_dir, "geotiff1.tif")
res1 = self.scn.save_datasets(filename=fname1,
datasets=["test"],
writer="geotiff", compute=False)
fname2 = os.path.join(self.base_dir, "geotiff2.tif")
res2 = self.scn.save_datasets(filename=fname2,
datasets=["test"],
writer="geotiff", compute=False)
compute_writer_results([res1, res2])
assert os.path.isfile(fname1)
assert os.path.isfile(fname2)
[docs]
def test_multiple_simple(self):
"""Test writing to geotiff files."""
from satpy.writers import compute_writer_results
fname1 = os.path.join(self.base_dir, "simple_image1.png")
res1 = self.scn.save_datasets(filename=fname1,
datasets=["test"],
writer="simple_image", compute=False)
fname2 = os.path.join(self.base_dir, "simple_image2.png")
res2 = self.scn.save_datasets(filename=fname2,
datasets=["test"],
writer="simple_image", compute=False)
compute_writer_results([res1, res2])
assert os.path.isfile(fname1)
assert os.path.isfile(fname2)
[docs]
def test_mixed(self):
"""Test writing to multiple mixed-type files."""
from satpy.writers import compute_writer_results
fname1 = os.path.join(self.base_dir, "simple_image3.png")
res1 = self.scn.save_datasets(filename=fname1,
datasets=["test"],
writer="simple_image", compute=False)
fname2 = os.path.join(self.base_dir, "geotiff3.tif")
res2 = self.scn.save_datasets(filename=fname2,
datasets=["test"],
writer="geotiff", compute=False)
res3 = []
compute_writer_results([res1, res2, res3])
assert os.path.isfile(fname1)
assert os.path.isfile(fname2)
[docs]
class TestBaseWriter:
"""Test the base writer class."""
[docs]
def setup_method(self):
"""Set up tests."""
import tempfile
from pyresample.geometry import AreaDefinition
from satpy.scene import Scene
adef = AreaDefinition(
"test", "test", "test", "EPSG:4326",
100, 200, (-180., -90., 180., 90.),
)
ds1 = xr.DataArray(
da.zeros((100, 200), chunks=50),
dims=("y", "x"),
attrs={
"name": "test",
"start_time": dt.datetime(2018, 1, 1, 0, 0, 0),
"sensor": "fake_sensor",
"area": adef,
}
)
ds2 = ds1.copy()
ds2.attrs["sensor"] = {"fake_sensor1", "fake_sensor2"}
self.scn = Scene()
self.scn["test"] = ds1
self.scn["test2"] = ds2
# Temp dir
self.base_dir = tempfile.mkdtemp()
[docs]
def teardown_method(self):
"""Remove the temporary directory created for a test."""
try:
shutil.rmtree(self.base_dir, ignore_errors=True)
except OSError:
pass
[docs]
def test_save_dataset_static_filename(self):
"""Test saving a dataset with a static filename specified."""
self.scn.save_datasets(base_dir=self.base_dir, filename="geotiff.tif")
assert os.path.isfile(os.path.join(self.base_dir, "geotiff.tif"))
[docs]
@pytest.mark.parametrize(
("fmt_fn", "exp_fns"),
[
("geotiff_{name}_{start_time:%Y%m%d_%H%M%S}.tif",
["geotiff_test_20180101_000000.tif", "geotiff_test2_20180101_000000.tif"]),
("geotiff_{name}_{sensor}.tif",
["geotiff_test_fake_sensor.tif", "geotiff_test2_fake_sensor1-fake_sensor2.tif"]),
]
)
def test_save_dataset_dynamic_filename(self, fmt_fn, exp_fns):
"""Test saving a dataset with a format filename specified."""
self.scn.save_datasets(base_dir=self.base_dir, filename=fmt_fn)
for exp_fn in exp_fns:
exp_path = os.path.join(self.base_dir, exp_fn)
assert os.path.isfile(exp_path)
[docs]
def test_save_dataset_dynamic_filename_with_dir(self):
"""Test saving a dataset with a format filename that includes a directory."""
fmt_fn = os.path.join("{start_time:%Y%m%d}", "geotiff_{name}_{start_time:%Y%m%d_%H%M%S}.tif")
exp_fn = os.path.join("20180101", "geotiff_test_20180101_000000.tif")
self.scn.save_datasets(base_dir=self.base_dir, filename=fmt_fn)
assert os.path.isfile(os.path.join(self.base_dir, exp_fn))
# change the filename pattern but keep the same directory
fmt_fn2 = os.path.join("{start_time:%Y%m%d}", "geotiff_{name}_{start_time:%Y%m%d_%H}.tif")
exp_fn2 = os.path.join("20180101", "geotiff_test_20180101_00.tif")
self.scn.save_datasets(base_dir=self.base_dir, filename=fmt_fn2)
assert os.path.isfile(os.path.join(self.base_dir, exp_fn2))
# the original file should still exist
assert os.path.isfile(os.path.join(self.base_dir, exp_fn))
[docs]
class TestOverlays:
"""Tests for add_overlay and add_decorate functions."""
[docs]
def setup_method(self):
"""Create test data and mock pycoast/pydecorate."""
from pyresample.geometry import AreaDefinition
from trollimage.xrimage import XRImage
proj_dict = {"proj": "lcc", "datum": "WGS84", "ellps": "WGS84",
"lon_0": -95., "lat_0": 25, "lat_1": 25,
"units": "m", "no_defs": True}
self.area_def = AreaDefinition(
"test", "test", "test", proj_dict,
200, 400, (-1000., -1500., 1000., 1500.),
)
self.orig_rgb_img = XRImage(
xr.DataArray(da.arange(75., chunks=10).reshape(3, 5, 5) / 75.,
dims=("bands", "y", "x"),
coords={"bands": ["R", "G", "B"]},
attrs={"name": "test_ds", "area": self.area_def})
)
self.orig_l_img = XRImage(
xr.DataArray(da.arange(25., chunks=10).reshape(5, 5) / 75.,
dims=("y", "x"),
attrs={"name": "test_ds", "area": self.area_def})
)
self.decorate = {
"decorate": [
{"logo": {"logo_path": "", "height": 143, "bg": "white", "bg_opacity": 255}},
{"text": {
"txt": "TEST",
"align": {"top_bottom": "bottom", "left_right": "right"},
"font": "",
"font_size": 22,
"height": 30,
"bg": "black",
"bg_opacity": 255,
"line": "white"}},
{"scale": {
"colormap": greys,
"extend": False,
"width": 1670, "height": 110,
"tick_marks": 5, "minor_tick_marks": 1,
"cursor": [0, 0], "bg": "white",
"title": "TEST TITLE OF SCALE",
"fontsize": 110, "align": "cc"
}}
]
}
import_mock = mock.MagicMock()
modules = {"pycoast": import_mock.pycoast,
"pydecorate": import_mock.pydecorate}
self.module_patcher = mock.patch.dict("sys.modules", modules)
self.module_patcher.start()
[docs]
def teardown_method(self):
"""Turn off pycoast/pydecorate mocking."""
self.module_patcher.stop()
[docs]
def test_add_overlay_basic_rgb(self):
"""Test basic add_overlay usage with RGB data."""
from pycoast import ContourWriterAGG
from satpy.writers import _burn_overlay, add_overlay
coast_dir = "/path/to/coast/data"
with mock.patch.object(self.orig_rgb_img, "apply_pil") as apply_pil:
apply_pil.return_value = self.orig_rgb_img
new_img = add_overlay(self.orig_rgb_img, self.area_def, coast_dir, fill_value=0)
assert self.orig_rgb_img.mode == new_img.mode
new_img = add_overlay(self.orig_rgb_img, self.area_def, coast_dir)
assert self.orig_rgb_img.mode + "A" == new_img.mode
with mock.patch.object(self.orig_rgb_img, "convert") as convert:
convert.return_value = self.orig_rgb_img
overlays = {"coasts": {"outline": "red"}}
new_img = add_overlay(self.orig_rgb_img, self.area_def, coast_dir,
overlays=overlays, fill_value=0)
pil_args = None
pil_kwargs = {"fill_value": 0}
fun_args = (self.orig_rgb_img.data.area, ContourWriterAGG.return_value, overlays)
fun_kwargs = None
apply_pil.assert_called_with(_burn_overlay, self.orig_rgb_img.mode,
pil_args, pil_kwargs, fun_args, fun_kwargs)
ContourWriterAGG.assert_called_with(coast_dir)
# test legacy call
grid = {"minor_is_tick": True}
color = "red"
expected_overlays = {"coasts": {"outline": color, "width": 0.5, "level": 1},
"borders": {"outline": color, "width": 0.5, "level": 1},
"grid": grid}
with warnings.catch_warnings(record=True) as wns:
warnings.simplefilter("always")
new_img = add_overlay(self.orig_rgb_img, self.area_def, coast_dir,
color=color, grid=grid, fill_value=0)
assert len(wns) == 1
assert issubclass(wns[0].category, DeprecationWarning)
assert "deprecated" in str(wns[0].message)
pil_args = None
pil_kwargs = {"fill_value": 0}
fun_args = (self.orig_rgb_img.data.area, ContourWriterAGG.return_value, expected_overlays)
fun_kwargs = None
apply_pil.assert_called_with(_burn_overlay, self.orig_rgb_img.mode,
pil_args, pil_kwargs, fun_args, fun_kwargs)
ContourWriterAGG.assert_called_with(coast_dir)
[docs]
def test_add_overlay_basic_l(self):
"""Test basic add_overlay usage with L data."""
from satpy.writers import add_overlay
new_img = add_overlay(self.orig_l_img, self.area_def, "", fill_value=0)
assert "RGB" == new_img.mode
new_img = add_overlay(self.orig_l_img, self.area_def, "")
assert "RGBA" == new_img.mode
[docs]
def test_add_decorate_basic_rgb(self):
"""Test basic add_decorate usage with RGB data."""
from satpy.writers import add_decorate
new_img = add_decorate(self.orig_rgb_img, **self.decorate)
assert "RGBA" == new_img.mode
[docs]
def test_add_decorate_basic_l(self):
"""Test basic add_decorate usage with L data."""
from satpy.writers import add_decorate
new_img = add_decorate(self.orig_l_img, **self.decorate)
assert "RGBA" == new_img.mode
[docs]
def test_group_results_by_output_file(tmp_path):
"""Test grouping results by output file.
Add a test for grouping the results from save_datasets(..., compute=False)
by output file. This is useful if for some reason we want to treat each
output file as a seperate computation (that can still be computed together
later).
"""
from pyresample import create_area_def
from satpy.tests.utils import make_fake_scene
from satpy.writers import group_results_by_output_file
x = 10
fake_area = create_area_def("sargasso", 4326, resolution=1, width=x, height=x, center=(0, 0))
fake_scene = make_fake_scene(
{"dragon_top_height": (dat := xr.DataArray(
dims=("y", "x"),
data=da.arange(x*x).reshape((x, x)))),
"penguin_bottom_height": dat,
"kraken_depth": dat},
daskify=True,
area=fake_area,
common_attrs={"start_time": dt.datetime(2022, 11, 16, 13, 27)})
# NB: even if compute=False, ``save_datasets`` creates (empty) files
(sources, targets) = fake_scene.save_datasets(
filename=os.fspath(tmp_path / "test-{name}.tif"),
writer="ninjogeotiff",
compress="NONE",
fill_value=0,
compute=False,
ChannelID="x",
DataType="x",
PhysicUnit="K",
PhysicValue="Temperature",
SatelliteNameID="x")
grouped = group_results_by_output_file(sources, targets)
assert len(grouped) == 3
assert len({x.rfile.path for x in grouped[0][1]}) == 1
for x in grouped:
assert len(x[0]) == len(x[1])
assert sources[:5] == grouped[0][0]
assert targets[:5] == grouped[0][1]
assert sources[10:] == grouped[2][0]
assert targets[10:] == grouped[2][1]