Source code for satpy.tests.reader_tests.test_abi_l1b

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2017 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""The abi_l1b reader tests package."""
from __future__ import annotations

from datetime import datetime
from pathlib import Path
from typing import Any, Callable
from unittest import mock

import dask
import dask.array as da
import numpy as np
import numpy.typing as npt
import pytest
import xarray as xr
from pytest_lazyfixture import lazy_fixture

from satpy import DataQuery
from satpy.readers.abi_l1b import NC_ABI_L1B
from satpy.readers.yaml_reader import FileYAMLReader
from satpy.utils import ignore_pyproj_proj_warnings

RAD_SHAPE = {
    500: (3000, 5000),  # conus - 500m
}
RAD_SHAPE[1000] = (RAD_SHAPE[500][0] // 2, RAD_SHAPE[500][1] // 2)
RAD_SHAPE[2000] = (RAD_SHAPE[500][0] // 4, RAD_SHAPE[500][1] // 4)


[docs] def _create_fake_rad_dataarray( rad: xr.DataArray | None = None, resolution: int = 2000, ) -> xr.DataArray: x_image = xr.DataArray(0.0) y_image = xr.DataArray(0.0) time = xr.DataArray(0.0) shape = RAD_SHAPE[resolution] if rad is None: rad_data = (np.arange(shape[0] * shape[1]).reshape(shape) + 1.0) * 50.0 rad_data = (rad_data + 1.0) / 0.5 rad_data = rad_data.astype(np.int16) rad = xr.DataArray( da.from_array(rad_data, chunks=226), dims=("y", "x"), attrs={ "scale_factor": 0.5, "add_offset": -1.0, "_FillValue": 1002, "units": "W m-2 um-1 sr-1", "valid_range": (0, 4095), }, ) rad.coords["t"] = time rad.coords["x_image"] = x_image rad.coords["y_image"] = y_image return rad
[docs] def _create_fake_rad_dataset(rad: xr.DataArray, resolution: int) -> xr.Dataset: rad = _create_fake_rad_dataarray(rad=rad, resolution=resolution) x__ = xr.DataArray( range(rad.shape[1]), attrs={"scale_factor": 2.0, "add_offset": -1.0}, dims=("x",), ) y__ = xr.DataArray( range(rad.shape[0]), attrs={"scale_factor": -2.0, "add_offset": 1.0}, dims=("y",), ) proj = xr.DataArray( np.int64(0), attrs={ "semi_major_axis": 1.0, "semi_minor_axis": 1.0, "perspective_point_height": 1.0, "longitude_of_projection_origin": -90.0, "latitude_of_projection_origin": 0.0, "sweep_angle_axis": "x", }, ) fake_dataset = xr.Dataset( data_vars={ "Rad": rad, "band_id": np.array(8), # 'x': x__, # 'y': y__, "x_image": xr.DataArray(0.0), "y_image": xr.DataArray(0.0), "goes_imager_projection": proj, "yaw_flip_flag": np.array([1]), "planck_fk1": np.array(13432.1), "planck_fk2": np.array(1497.61), "planck_bc1": np.array(0.09102), "planck_bc2": np.array(0.99971), "esun": np.array(2017), "nominal_satellite_subpoint_lat": np.array(0.0), "nominal_satellite_subpoint_lon": np.array(-89.5), "nominal_satellite_height": np.array(35786.02), "earth_sun_distance_anomaly_in_AU": np.array(0.99), }, coords={ "t": rad.coords["t"], "x": x__, "y": y__, }, attrs={ "time_coverage_start": "2017-09-20T17:30:40.8Z", "time_coverage_end": "2017-09-20T17:41:17.5Z", }, ) return fake_dataset
[docs] def generate_l1b_filename(chan_name: str) -> str: """Generate a l1b filename.""" return f"OR_ABI-L1b-RadC-M4{chan_name}_G16_s20161811540362_e20161811545170_c20161811545230_suffix.nc"
[docs] @pytest.fixture() def c01_refl(tmp_path) -> xr.DataArray: """Load c01 reflectances.""" with _apply_dask_chunk_size(): reader = _create_reader_for_data(tmp_path, "C01", None, 1000) return reader.load(["C01"])["C01"]
[docs] @pytest.fixture() def c01_rad(tmp_path) -> xr.DataArray: """Load c01 radiances.""" with _apply_dask_chunk_size(): reader = _create_reader_for_data(tmp_path, "C01", None, 1000) return reader.load([DataQuery(name="C01", calibration="radiance")])["C01"]
[docs] @pytest.fixture() def c01_rad_h5netcdf(tmp_path) -> xr.DataArray: """Load c01 radiances through h5netcdf.""" shape = RAD_SHAPE[1000] rad_data = (np.arange(shape[0] * shape[1]).reshape(shape) + 1.0) * 50.0 rad_data = (rad_data + 1.0) / 0.5 rad_data = rad_data.astype(np.int16) rad = xr.DataArray( da.from_array(rad_data, chunks=226), dims=("y", "x"), attrs={ "scale_factor": 0.5, "add_offset": -1.0, "_FillValue": np.array([1002]), "units": "W m-2 um-1 sr-1", "valid_range": (0, 4095), }, ) with _apply_dask_chunk_size(): reader = _create_reader_for_data(tmp_path, "C01", rad, 1000) return reader.load([DataQuery(name="C01", calibration="radiance")])["C01"]
[docs] @pytest.fixture() def c01_counts(tmp_path) -> xr.DataArray: """Load c01 counts.""" with _apply_dask_chunk_size(): reader = _create_reader_for_data(tmp_path, "C01", None, 1000) return reader.load([DataQuery(name="C01", calibration="counts")])["C01"]
[docs] @pytest.fixture() def c07_bt_creator(tmp_path) -> Callable: """Create a loader for c07 brightness temperatures.""" def _load_data_array( clip_negative_radiances: bool = False, ): rad = _fake_c07_data() with _apply_dask_chunk_size(): reader = _create_reader_for_data( tmp_path, "C07", rad, 2000, {"clip_negative_radiances": clip_negative_radiances}, ) return reader.load(["C07"])["C07"] return _load_data_array
[docs] def _fake_c07_data() -> xr.DataArray: shape = RAD_SHAPE[2000] values = np.arange(shape[0] * shape[1]) rad_data = (values.reshape(shape) + 1.0) * 50.0 rad_data[0, 0] = -0.0001 # introduce below minimum expected radiance rad_data = (rad_data + 1.3) / 0.5 data = rad_data.astype(np.int16) rad = xr.DataArray( da.from_array(data, chunks=226), dims=("y", "x"), attrs={ "scale_factor": 0.5, "add_offset": -1.3, "_FillValue": np.int16( np.floor(((9 + 1) * 50.0 + 1.3) / 0.5) ), # last rad_data value }, ) return rad
[docs] def _create_reader_for_data( tmp_path: Path, channel_name: str, rad: xr.DataArray | None, resolution: int, reader_kwargs: dict[str, Any] | None = None, ) -> FileYAMLReader: filename = generate_l1b_filename(channel_name) data_path = tmp_path / filename dataset = _create_fake_rad_dataset(rad=rad, resolution=resolution) dataset.to_netcdf( data_path, encoding={ "Rad": {"chunksizes": [226, 226]}, }, ) from satpy.readers import load_readers return load_readers([str(data_path)], "abi_l1b", reader_kwargs=reader_kwargs)["abi_l1b"]
[docs] def _apply_dask_chunk_size(): # 226 on-disk chunk size # 8 on-disk chunks for 500 meter data # Square (**2) for 2D size # 4 bytes for 32-bit floats return dask.config.set({"array.chunk-size": ((226 * 8) ** 2) * 4})
[docs] def _get_and_check_array(data_arr: xr.DataArray, exp_dtype: npt.DTypeLike) -> npt.NDArray: data_np = data_arr.data.compute() assert isinstance(data_arr, xr.DataArray) assert isinstance(data_arr.data, da.Array) assert isinstance(data_np, np.ndarray) res = 1000 if RAD_SHAPE[1000][0] == data_np.shape[0] else 2000 assert data_arr.chunks[0][0] == 226 * (8 / (res / 500)) assert data_arr.chunks[1][0] == 226 * (8 / (res / 500)) assert data_np.dtype == data_arr.dtype assert data_np.dtype == exp_dtype return data_np
[docs] def _check_area(data_arr: xr.DataArray) -> None: from pyresample.geometry import AreaDefinition area_def = data_arr.attrs["area"] assert isinstance(area_def, AreaDefinition) with ignore_pyproj_proj_warnings(): proj_dict = area_def.crs.to_dict() exp_dict = { "h": 1.0, "lon_0": -90.0, "proj": "geos", "sweep": "x", "units": "m", } if "R" in proj_dict: assert proj_dict["R"] == 1 else: assert proj_dict["a"] == 1 assert proj_dict["b"] == 1 for proj_key, proj_val in exp_dict.items(): assert proj_dict[proj_key] == proj_val assert area_def.shape == data_arr.shape if area_def.shape[0] == RAD_SHAPE[1000][0]: exp_extent = (-2.0, -2998.0, 4998.0, 2.0) else: exp_extent = (-2.0, -1498.0, 2498.0, 2.0) assert area_def.area_extent == exp_extent
[docs] def _check_dims_and_coords(data_arr: xr.DataArray) -> None: assert "y" in data_arr.dims assert "x" in data_arr.dims # we remove any time dimension information assert "t" not in data_arr.coords assert "t" not in data_arr.dims assert "time" not in data_arr.coords assert "time" not in data_arr.dims
[docs] @pytest.mark.parametrize( ("channel", "suffix"), [ ("C{:02d}".format(num), suffix) for num in range(1, 17) for suffix in ("", "_test_suffix") ], ) def test_file_patterns_match(channel, suffix): """Test that the configured file patterns work.""" from satpy.readers import configs_for_reader, load_reader reader_configs = list(configs_for_reader("abi_l1b"))[0] reader = load_reader(reader_configs) fn1 = ( "OR_ABI-L1b-RadM1-M3{}_G16_s20182541300210_e20182541300267" "_c20182541300308{}.nc" ).format(channel, suffix) loadables = reader.select_files_from_pathnames([fn1]) assert len(loadables) == 1 if not suffix and channel in ["C01", "C02", "C03", "C05"]: fn2 = ( "OR_ABI-L1b-RadM1-M3{}_G16_s20182541300210_e20182541300267" "_c20182541300308-000000_0.nc" ).format(channel) loadables = reader.select_files_from_pathnames([fn2]) assert len(loadables) == 1
[docs] @pytest.mark.parametrize( "c01_data_arr", [lazy_fixture("c01_rad"), lazy_fixture("c01_rad_h5netcdf")] ) class Test_NC_ABI_L1B: """Test the NC_ABI_L1B reader."""
[docs] def test_get_dataset(self, c01_data_arr): """Test the get_dataset method.""" exp = { "calibration": "radiance", "instrument_ID": None, "modifiers": (), "name": "C01", "observation_type": "Rad", "orbital_parameters": { "projection_altitude": 1.0, "projection_latitude": 0.0, "projection_longitude": -90.0, "satellite_nominal_altitude": 35786020.0, "satellite_nominal_latitude": 0.0, "satellite_nominal_longitude": -89.5, "yaw_flip": True, }, "orbital_slot": None, "platform_name": "GOES-16", "platform_shortname": "G16", "production_site": None, "reader": "abi_l1b", "resolution": 1000, "scan_mode": "M4", "scene_abbr": "C", "scene_id": None, "sensor": "abi", "timeline_ID": None, "suffix": "suffix", "units": "W m-2 um-1 sr-1", "start_time": datetime(2017, 9, 20, 17, 30, 40, 800000), "end_time": datetime(2017, 9, 20, 17, 41, 17, 500000), } res = c01_data_arr _get_and_check_array(res, np.float32) _check_area(res) _check_dims_and_coords(res) for exp_key, exp_val in exp.items(): assert res.attrs[exp_key] == exp_val
[docs] @pytest.mark.parametrize("clip_negative_radiances", [False, True]) def test_ir_calibrate(c07_bt_creator, clip_negative_radiances): """Test IR calibration.""" res = c07_bt_creator(clip_negative_radiances=clip_negative_radiances) clipped_ir = 134.68753 if clip_negative_radiances else np.nan expected = np.array( [ clipped_ir, 304.97037, 332.22778, 354.6147, 374.08688, 391.58655, 407.64786, 422.60635, 436.68802, np.nan, ] ) data_np = _get_and_check_array(res, np.float32) _check_area(res) _check_dims_and_coords(res) np.testing.assert_allclose( data_np[0, :10], expected, equal_nan=True, atol=1e-04 ) # make sure the attributes from the file are in the data array assert "scale_factor" not in res.attrs assert "_FillValue" not in res.attrs assert res.attrs["standard_name"] == "toa_brightness_temperature" assert res.attrs["long_name"] == "Brightness Temperature"
[docs] def test_vis_calibrate(c01_refl): """Test VIS calibration.""" res = c01_refl expected = np.array( [ 7.632808, 15.265616, 22.898426, 30.531233, 38.164043, 45.796852, 53.429657, 61.062466, 68.695274, np.nan, ] ) data_np = _get_and_check_array(res, np.float32) _check_area(res) _check_dims_and_coords(res) np.testing.assert_allclose(data_np[0, :10], expected, equal_nan=True) assert "scale_factor" not in res.attrs assert "_FillValue" not in res.attrs assert res.attrs["standard_name"] == "toa_bidirectional_reflectance" assert res.attrs["long_name"] == "Bidirectional Reflectance"
[docs] def test_raw_calibrate(c01_counts): """Test RAW calibration.""" res = c01_counts # We expect the raw data to be unchanged _get_and_check_array(res, np.int16) _check_area(res) _check_dims_and_coords(res) # check for the presence of typical attributes assert "scale_factor" in res.attrs assert "add_offset" in res.attrs assert "_FillValue" in res.attrs assert "orbital_parameters" in res.attrs assert "platform_shortname" in res.attrs assert "scene_id" in res.attrs # determine if things match their expected values/types. assert res.attrs["standard_name"] == "counts" assert res.attrs["long_name"] == "Raw Counts"
[docs] @mock.patch("satpy.readers.abi_base.xr") def test_open_dataset(_): # noqa: PT019 """Test opening a dataset.""" openable_thing = mock.MagicMock() NC_ABI_L1B(openable_thing, {"platform_shortname": "g16"}, {}) openable_thing.open.assert_called()