#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2017-2018 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Tests for the AWIPS Tiled writer."""
import datetime as dt
import logging
import os
import shutil
from glob import glob
import dask
import dask.array as da
import numpy as np
import pytest
import xarray as xr
from pyproj import CRS
from satpy.resample import update_resampled_coords
START_TIME = dt.datetime(2018, 1, 1, 12, 0, 0)
END_TIME = START_TIME + dt.timedelta(minutes=20)
# NOTE:
# The following fixtures are not defined in this file, but are used and injected by Pytest:
# - tmp_path
# - caplog
[docs]
def _check_production_location(ds):
if "production_site" in ds.attrs:
prod_loc_name = "production_site"
elif "production_location" in ds.attrs:
prod_loc_name = "producton_location"
else:
return
if prod_loc_name in ds.attrs:
assert len(ds.attrs[prod_loc_name]) == 31
[docs]
def check_required_properties(unmasked_ds, masked_ds):
"""Check various aspects of coordinates and attributes for correctness."""
_check_scaled_x_coordinate_variable(unmasked_ds, masked_ds)
_check_scaled_y_coordinate_variable(unmasked_ds, masked_ds)
_check_required_common_attributes(unmasked_ds)
[docs]
def _check_required_common_attributes(ds):
"""Check common properties of the created AWIPS tiles for validity."""
for attr_name in ("tile_row_offset", "tile_column_offset",
"product_tile_height", "product_tile_width",
"number_product_tiles",
"product_rows", "product_columns"):
assert attr_name in ds.attrs
_check_production_location(ds)
for data_arr in ds.data_vars.values():
if data_arr.ndim == 0:
# grid mapping variable
assert "grid_mapping_name" in data_arr.attrs
continue
assert data_arr.encoding.get("zlib", False)
assert "grid_mapping" in data_arr.attrs
assert data_arr.attrs["grid_mapping"] in ds
assert "units" in data_arr.attrs
if data_arr.name != "DQF":
assert data_arr.dtype == np.int16
assert data_arr.attrs["_Unsigned"] == "true"
[docs]
def _check_scaled_x_coordinate_variable(ds, masked_ds):
assert "x" in ds.coords
x_coord = ds.coords["x"]
np.testing.assert_equal(np.diff(x_coord), 1)
x_attrs = x_coord.attrs
assert x_attrs.get("standard_name") == "projection_x_coordinate"
assert x_attrs.get("units") == "meters"
assert "scale_factor" in x_attrs
assert x_attrs["scale_factor"] > 0
assert "add_offset" in x_attrs
unscaled_x = masked_ds.coords["x"].values
assert (np.diff(unscaled_x) > 0).all()
[docs]
def _check_scaled_y_coordinate_variable(ds, masked_ds):
assert "y" in ds.coords
y_coord = ds.coords["y"]
np.testing.assert_equal(np.diff(y_coord), 1)
y_attrs = y_coord.attrs
assert y_attrs.get("standard_name") == "projection_y_coordinate"
assert y_attrs.get("units") == "meters"
assert "scale_factor" in y_attrs
assert y_attrs["scale_factor"] < 0
assert "add_offset" in y_attrs
unscaled_y = masked_ds.coords["y"].values
assert (np.diff(unscaled_y) < 0).all()
[docs]
def _get_test_area(shape=(200, 100), crs=None, extents=None):
from pyresample.geometry import AreaDefinition
if crs is None:
crs = CRS("+proj=lcc +datum=WGS84 +ellps=WGS84 +lon_0=-95. +lat_0=25 +lat_1=25 +units=m +no_defs")
if extents is None:
extents = (-1000., -1500., 1000., 1500.)
area_def = AreaDefinition(
"test",
"test",
"test",
crs,
shape[1],
shape[0],
extents,
)
return area_def
[docs]
def _get_test_data(shape=(200, 100), chunks=50):
data = np.linspace(0., 1., shape[0] * shape[1], dtype=np.float32).reshape(shape)
return da.from_array(data, chunks=chunks)
[docs]
def _get_test_lcc_data(dask_arr, area_def, extra_attrs=None):
attrs = dict(
name="test_ds",
platform_name="PLAT",
sensor="SENSOR",
units="1",
standard_name="toa_bidirectional_reflectance",
area=area_def,
start_time=START_TIME,
end_time=END_TIME
)
if extra_attrs:
attrs.update(extra_attrs)
ds = xr.DataArray(
dask_arr,
dims=("y", "x") if dask_arr.ndim == 2 else ("bands", "y", "x"),
attrs=attrs,
)
return update_resampled_coords(ds, ds, area_def)
[docs]
class TestAWIPSTiledWriter:
"""Test basic functionality of AWIPS Tiled writer."""
[docs]
def test_init(self, tmp_path):
"""Test basic init method of writer."""
from satpy.writers.awips_tiled import AWIPSTiledWriter
AWIPSTiledWriter(base_dir=str(tmp_path))
[docs]
@pytest.mark.parametrize("use_save_dataset",
[(False,), (True,)])
@pytest.mark.parametrize(
("extra_attrs", "expected_filename"),
[
({}, "TESTS_AII_PLAT_SENSOR_test_ds_TEST_T001_20180101_1200.nc"),
({"sensor": "viirs", "name": "I01"}, "TESTS_AII_PLAT_viirs_I01_TEST_T001_20180101_1200.nc"),
]
)
def test_basic_numbered_1_tile(self, extra_attrs, expected_filename, use_save_dataset, caplog, tmp_path):
"""Test creating a single numbered tile."""
from satpy.writers.awips_tiled import AWIPSTiledWriter
data = _get_test_data()
area_def = _get_test_area()
input_data_arr = _get_test_lcc_data(data, area_def, extra_attrs)
with caplog.at_level(logging.DEBUG):
w = AWIPSTiledWriter(base_dir=str(tmp_path), compress=True)
if use_save_dataset:
w.save_dataset(input_data_arr, sector_id="TEST", source_name="TESTS")
else:
w.save_datasets([input_data_arr], sector_id="TEST", source_name="TESTS")
assert "no routine matching" not in caplog.text
assert "Can't format string" not in caplog.text
all_files = glob(os.path.join(str(tmp_path), "TESTS_AII*.nc"))
assert len(all_files) == 1
assert os.path.basename(all_files[0]) == expected_filename
for fn in all_files:
unmasked_ds = xr.open_dataset(fn, mask_and_scale=False)
output_ds = xr.open_dataset(fn, mask_and_scale=True)
check_required_properties(unmasked_ds, output_ds)
scale_factor = output_ds["data"].encoding["scale_factor"]
np.testing.assert_allclose(input_data_arr.values, output_ds["data"].data,
atol=scale_factor * 0.75)
[docs]
def test_units_length_warning(self, tmp_path):
"""Test long 'units' warnings are raised."""
from satpy.writers.awips_tiled import AWIPSTiledWriter
data = _get_test_data()
area_def = _get_test_area()
input_data_arr = _get_test_lcc_data(data, area_def)
input_data_arr.attrs["units"] = "this is a really long units string"
w = AWIPSTiledWriter(base_dir=str(tmp_path), compress=True)
with pytest.warns(UserWarning, match=r".*this is a really long units string.*too long.*"):
w.save_dataset(input_data_arr, sector_id="TEST", source_name="TESTS")
[docs]
@pytest.mark.parametrize(
("tile_count", "tile_size"),
[
((3, 3), None),
(None, (67, 34)),
(None, None),
]
)
def test_basic_numbered_tiles(self, tile_count, tile_size, tmp_path):
"""Test creating a multiple numbered tiles."""
from satpy.tests.utils import CustomScheduler
from satpy.writers.awips_tiled import AWIPSTiledWriter
data = _get_test_data()
area_def = _get_test_area()
input_data_arr = _get_test_lcc_data(data, area_def)
w = AWIPSTiledWriter(base_dir=str(tmp_path), compress=True)
save_kwargs = dict(
sector_id="TEST",
source_name="TESTS",
tile_count=tile_count,
tile_size=tile_size,
extra_global_attrs={"my_global": "TEST"}
)
should_error = tile_count is None and tile_size is None
if should_error:
with dask.config.set(scheduler=CustomScheduler(0)), \
pytest.raises(ValueError, match=r"Either.*tile_count.*"):
w.save_datasets([input_data_arr], **save_kwargs)
else:
with dask.config.set(scheduler=CustomScheduler(1 * 2)): # precompute=*2
w.save_datasets([input_data_arr], **save_kwargs)
all_files = glob(os.path.join(str(tmp_path), "TESTS_AII*.nc"))
expected_num_files = 0 if should_error else 9
assert len(all_files) == expected_num_files
for fn in all_files:
unmasked_ds = xr.open_dataset(fn, mask_and_scale=False)
masked_ds = xr.open_dataset(fn, mask_and_scale=True)
check_required_properties(unmasked_ds, masked_ds)
assert unmasked_ds.attrs["my_global"] == "TEST"
assert unmasked_ds.attrs["sector_id"] == "TEST"
assert "physical_element" in unmasked_ds.attrs
stime = input_data_arr.attrs["start_time"]
assert unmasked_ds.attrs["start_date_time"] == stime.strftime("%Y-%m-%dT%H:%M:%S")
[docs]
def test_basic_lettered_tiles(self, tmp_path):
"""Test creating a lettered grid."""
from satpy.writers.awips_tiled import AWIPSTiledWriter
w = AWIPSTiledWriter(base_dir=str(tmp_path), compress=True)
data = _get_test_data(shape=(2000, 1000), chunks=500)
area_def = _get_test_area(shape=(2000, 1000),
extents=(-1000000., -1500000., 1000000., 1500000.))
ds = _get_test_lcc_data(data, area_def)
# tile_count should be ignored since we specified lettered_grid
w.save_datasets([ds], sector_id="LCC", source_name="TESTS", tile_count=(3, 3), lettered_grid=True)
all_files = glob(os.path.join(str(tmp_path), "TESTS_AII*.nc"))
assert len(all_files) == 16
for fn in all_files:
unmasked_ds = xr.open_dataset(fn, mask_and_scale=False)
masked_ds = xr.open_dataset(fn, mask_and_scale=True)
check_required_properties(unmasked_ds, masked_ds)
assert masked_ds.attrs["start_date_time"] == START_TIME.strftime("%Y-%m-%dT%H:%M:%S")
[docs]
def test_basic_lettered_tiles_diff_projection(self, tmp_path):
"""Test creating a lettered grid from data with differing projection.."""
from satpy.writers.awips_tiled import AWIPSTiledWriter
w = AWIPSTiledWriter(base_dir=str(tmp_path), compress=True)
crs = CRS("+proj=lcc +datum=WGS84 +ellps=WGS84 +lon_0=-95. +lat_0=45 +lat_1=45 +units=m +no_defs")
data = _get_test_data(shape=(2000, 1000), chunks=500)
area_def = _get_test_area(shape=(2000, 1000), crs=crs,
extents=(-1000000., -1500000., 1000000., 1500000.))
ds = _get_test_lcc_data(data, area_def)
# tile_count should be ignored since we specified lettered_grid
w.save_datasets([ds], sector_id="LCC", source_name="TESTS", tile_count=(3, 3), lettered_grid=True)
all_files = sorted(glob(os.path.join(str(tmp_path), "TESTS_AII*.nc")))
assert len(all_files) == 24
assert "TC02" in all_files[0] # the first tile should be TC02
for fn in all_files:
unmasked_ds = xr.open_dataset(fn, mask_and_scale=False)
masked_ds = xr.open_dataset(fn, mask_and_scale=True)
check_required_properties(unmasked_ds, masked_ds)
assert masked_ds.attrs["start_date_time"] == START_TIME.strftime("%Y-%m-%dT%H:%M:%S")
[docs]
def test_lettered_tiles_update_existing(self, tmp_path):
"""Test updating lettered tiles with additional data."""
from satpy.writers.awips_tiled import AWIPSTiledWriter
first_base_dir = os.path.join(str(tmp_path), "first")
w = AWIPSTiledWriter(base_dir=first_base_dir, compress=True)
shape = (2000, 1000)
data = np.linspace(0., 1., shape[0] * shape[1], dtype=np.float32).reshape(shape)
# pixels to be filled in later
data[:, -200:] = np.nan
data = da.from_array(data, chunks=500)
area_def = _get_test_area(shape=(2000, 1000),
extents=(-1000000., -1500000., 1000000., 1500000.))
ds = _get_test_lcc_data(data, area_def)
# tile_count should be ignored since we specified lettered_grid
w.save_datasets([ds], sector_id="LCC", source_name="TESTS", tile_count=(3, 3), lettered_grid=True)
all_files = sorted(glob(os.path.join(first_base_dir, "TESTS_AII*.nc")))
assert len(all_files) == 16
first_files = []
second_base_dir = os.path.join(str(tmp_path), "second")
os.makedirs(second_base_dir)
for fn in all_files:
new_fn = fn.replace(first_base_dir, second_base_dir)
shutil.copy(fn, new_fn)
first_files.append(new_fn)
# Second writing/updating
# Area is about 100 pixels to the right
area_def2 = _get_test_area(shape=(2000, 1000),
extents=(-800000., -1500000., 1200000., 1500000.))
data2 = np.linspace(0., 1., 2000000, dtype=np.float32).reshape((2000, 1000))
# a gap at the beginning where old values remain
data2[:, :200] = np.nan
# a gap at the end where old values remain
data2[:, -400:-300] = np.nan
data2 = da.from_array(data2, chunks=500)
ds2 = _get_test_lcc_data(data2, area_def2)
w = AWIPSTiledWriter(base_dir=second_base_dir, compress=True)
# HACK: The _copy_to_existing function hangs when opening the output
# file multiple times...sometimes. If we limit dask to one worker
# it seems to work fine.
with dask.config.set(num_workers=1):
w.save_datasets([ds2], sector_id="LCC", source_name="TESTS", tile_count=(3, 3), lettered_grid=True)
all_files = glob(os.path.join(second_base_dir, "TESTS_AII*.nc"))
# 16 original tiles + 4 new tiles
assert len(all_files) == 20
# these tiles should be the right-most edge of the first image
first_right_edge_files = [x for x in first_files if "P02" in x or "P04" in x or "V02" in x or "V04" in x]
for new_file in first_right_edge_files:
orig_file = new_file.replace(second_base_dir, first_base_dir)
orig_nc = xr.open_dataset(orig_file)
orig_data = orig_nc["data"].values
if not np.isnan(orig_data).any():
# we only care about the tiles that had NaNs originally
continue
new_nc = xr.open_dataset(new_file)
new_data = new_nc["data"].values
# there should be at least some areas of the file
# that old data was present and hasn't been replaced
np.testing.assert_allclose(orig_data[:, :20], new_data[:, :20])
# it isn't exactly 200 because the tiles aren't aligned with the
# data (the left-most tile doesn't have data until some columns
# in), but it should be at least that many columns
assert np.isnan(orig_data[:, 200:]).all()
assert not np.isnan(new_data[:, 200:]).all()
[docs]
def test_lettered_tiles_sector_ref(self, tmp_path):
"""Test creating a lettered grid using the sector as reference."""
from satpy.writers.awips_tiled import AWIPSTiledWriter
w = AWIPSTiledWriter(base_dir=str(tmp_path), compress=True)
data = _get_test_data(shape=(2000, 1000), chunks=500)
area_def = _get_test_area(shape=(2000, 1000),
extents=(-1000000., -1500000., 1000000., 1500000.))
ds = _get_test_lcc_data(data, area_def)
w.save_datasets([ds], sector_id="LCC", source_name="TESTS",
lettered_grid=True, use_sector_reference=True,
use_end_time=True)
all_files = glob(os.path.join(str(tmp_path), "TESTS_AII*.nc"))
assert len(all_files) == 16
for fn in all_files:
unmasked_ds = xr.open_dataset(fn, mask_and_scale=False)
masked_ds = xr.open_dataset(fn, mask_and_scale=True)
check_required_properties(unmasked_ds, masked_ds)
expected_start = (START_TIME + dt.timedelta(minutes=20)).strftime("%Y-%m-%dT%H:%M:%S")
assert masked_ds.attrs["start_date_time"] == expected_start
[docs]
def test_lettered_tiles_no_fit(self, tmp_path):
"""Test creating a lettered grid with no data overlapping the grid."""
from satpy.writers.awips_tiled import AWIPSTiledWriter
w = AWIPSTiledWriter(base_dir=str(tmp_path), compress=True)
data = _get_test_data(shape=(2000, 1000), chunks=500)
area_def = _get_test_area(shape=(2000, 1000),
extents=(4000000., 5000000., 5000000., 6000000.))
ds = _get_test_lcc_data(data, area_def)
w.save_datasets([ds], sector_id="LCC", source_name="TESTS", tile_count=(3, 3), lettered_grid=True)
# No files created
all_files = glob(os.path.join(str(tmp_path), "TESTS_AII*.nc"))
assert not all_files
[docs]
def test_lettered_tiles_no_valid_data(self, tmp_path):
"""Test creating a lettered grid with no valid data."""
from satpy.writers.awips_tiled import AWIPSTiledWriter
w = AWIPSTiledWriter(base_dir=str(tmp_path), compress=True)
data = da.full((2000, 1000), np.nan, chunks=500, dtype=np.float32)
area_def = _get_test_area(shape=(2000, 1000),
extents=(-1000000., -1500000., 1000000., 1500000.))
ds = _get_test_lcc_data(data, area_def)
w.save_datasets([ds], sector_id="LCC", source_name="TESTS", tile_count=(3, 3), lettered_grid=True)
# No files created - all NaNs should result in no tiles being created
all_files = glob(os.path.join(str(tmp_path), "TESTS_AII*.nc"))
assert not all_files
[docs]
def test_lettered_tiles_bad_filename(self, tmp_path):
"""Test creating a lettered grid with a bad filename."""
from satpy.writers.awips_tiled import AWIPSTiledWriter
w = AWIPSTiledWriter(base_dir=str(tmp_path), compress=True, filename="{Bad Key}.nc")
data = _get_test_data(shape=(2000, 1000), chunks=500)
area_def = _get_test_area(shape=(2000, 1000),
extents=(-1000000., -1500000., 1000000., 1500000.))
ds = _get_test_lcc_data(data, area_def)
with pytest.raises(KeyError):
w.save_datasets([ds],
sector_id="LCC",
source_name="TESTS",
tile_count=(3, 3),
lettered_grid=True)
[docs]
def test_basic_numbered_tiles_rgb(self, tmp_path):
"""Test creating a multiple numbered tiles with RGB."""
from satpy.writers.awips_tiled import AWIPSTiledWriter
w = AWIPSTiledWriter(base_dir=str(tmp_path), compress=True)
data = da.from_array(np.linspace(0., 1., 60000, dtype=np.float32).reshape((3, 200, 100)), chunks=50)
area_def = _get_test_area()
ds = _get_test_lcc_data(data, area_def)
ds = ds.rename(dict((old, new) for old, new in zip(ds.dims, ["bands", "y", "x"])))
ds.coords["bands"] = ["R", "G", "B"]
w.save_datasets([ds], sector_id="TEST", source_name="TESTS", tile_count=(3, 3))
chan_files = glob(os.path.join(str(tmp_path), "TESTS_AII*test_ds_R*.nc"))
all_files = chan_files[:]
assert len(chan_files) == 9
chan_files = glob(os.path.join(str(tmp_path), "TESTS_AII*test_ds_G*.nc"))
all_files.extend(chan_files)
assert len(chan_files) == 9
chan_files = glob(os.path.join(str(tmp_path), "TESTS_AII*test_ds_B*.nc"))
assert len(chan_files) == 9
all_files.extend(chan_files)
for fn in all_files:
unmasked_ds = xr.open_dataset(fn, mask_and_scale=False)
masked_ds = xr.open_dataset(fn, mask_and_scale=True)
check_required_properties(unmasked_ds, masked_ds)
[docs]
@pytest.mark.parametrize(
"sector",
["C",
"F"]
)
@pytest.mark.parametrize(
"extra_kwargs",
[
{},
{"environment_prefix": "AA"},
{"environment_prefix": "BB", "filename": "{environment_prefix}_{name}_GLM_T{tile_number:04d}.nc"},
]
)
def test_multivar_numbered_tiles_glm(self, sector, extra_kwargs, tmp_path):
"""Test creating a tiles with multiple variables."""
from satpy.writers.awips_tiled import AWIPSTiledWriter
os.environ["ORGANIZATION"] = "1" * 50
w = AWIPSTiledWriter(base_dir=tmp_path, compress=True)
data = _get_test_data()
area_def = _get_test_area()
ds1 = _get_test_lcc_data(data, area_def)
ds1.attrs.update(
dict(
name="total_energy",
platform_name="GOES-17",
sensor="SENSOR",
units="1",
scan_mode="M3",
scene_abbr=sector,
platform_shortname="G17"
)
)
ds2 = ds1.copy()
ds2.attrs.update({
"name": "flash_extent_density",
})
ds3 = ds1.copy()
ds3.attrs.update({
"name": "average_flash_area",
})
dqf = ds1.copy()
dqf = (dqf * 255).astype(np.uint8)
dqf.attrs = ds1.attrs.copy()
dqf.attrs.update({
"name": "DQF",
"_FillValue": 1,
})
with pytest.warns(UserWarning, match="Production location attribute "):
w.save_datasets([ds1, ds2, ds3, dqf], sector_id="TEST", source_name="TESTS",
tile_count=(3, 3), template="glm_l2_rad{}".format(sector.lower()),
**extra_kwargs)
fn_glob = self._get_glm_glob_filename(extra_kwargs)
all_files = glob(os.path.join(str(tmp_path), fn_glob))
assert len(all_files) == 9
for fn in all_files:
unmasked_ds = xr.open_dataset(fn, mask_and_scale=False)
masked_ds = xr.open_dataset(fn, mask_and_scale=True)
check_required_properties(unmasked_ds, masked_ds)
if sector == "C":
assert masked_ds.attrs["time_coverage_end"] == END_TIME.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
else: # 'F'
assert masked_ds.attrs["time_coverage_end"] == END_TIME.strftime("%Y-%m-%dT%H:%M:%SZ")
[docs]
@staticmethod
def _get_glm_glob_filename(extra_kwargs):
if "filename" in extra_kwargs:
return "BB*_GLM*.nc"
elif "environment_prefix" in extra_kwargs:
return "AA*_GLM*.nc"
return "DR*_GLM*.nc"