Source code for satpy.tests.reader_tests.conftest

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2021, 2024, 2025 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.

"""Setup and configuration for all reader tests."""

import datetime as dt
import os
from random import randrange

import numpy as np
import pytest
import xarray as xr
from trollsift import compose, parse
from xarray import DataTree

from satpy.readers.mwr_l1b import AWS_EPS_Sterna_MWR_L1BFile
from satpy.readers.mwr_l1c import AWS_MWR_L1CFile

DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f"

platform_name = "AWS1"
# W_XX-EUMETSAT-Darmstadt,SAT,AWS1-MWR-1B-RAD_C_EUMT_20241121085911_G_D_20241109234502_20241110004559_T_N____.nc
file_pattern = "W_{country:2s}-{organisation:s}-{location:s},SAT,{platform_name}-MWR-{processing_level}-RAD_C_{originator:4s}_{processing_time:%Y%m%d%H%M%S}_G_D_{start_time:%Y%m%d%H%M%S}_{end_time:%Y%m%d%H%M%S}_T_B____.nc"  # noqa


rng = np.random.default_rng()

[docs] def random_date(start, end): """Create a random datetime between two datetimes.""" delta = end - start int_delta = (delta.days * 24 * 60 * 60) + delta.seconds random_second = randrange(int_delta) return start + dt.timedelta(seconds=random_second)
[docs] @pytest.fixture(scope="module") def fake_mwr_data_array(): """Return a fake AWS/EPS-Sterna MWR l1b data array.""" fake_data_np = rng.integers(0, 700000, size=10*145*19).reshape((10, 145, 19)) fake_data_np[0, 0, 0] = -2147483648 fake_data_np[1, 0, 0] = 700000 + 10 fake_data_np[2, 0, 0] = -10 array_dims = ["n_scans", "n_fovs", "n_channels"] return xr.DataArray(fake_data_np, dims=array_dims)
[docs] def make_fake_angles(geo_size, geo_dims, shape): """Return fake sun-satellite angle array.""" maxval = 36000 dummy_array = (np.arange(0, geo_size) * maxval/geo_size).astype("int32") return xr.DataArray(dummy_array.reshape(shape), dims=geo_dims)
[docs] def make_fake_mwr_lonlats(geo_size, geo_dims, shape): """Return fake geolocation data arrays for all 4 MWR horns.""" maxval = 3600000 dummy_array = (np.arange(0, geo_size) * maxval/geo_size).astype("int32") fake_lon_data = xr.DataArray(dummy_array.reshape(shape), dims=geo_dims) maxval = 1800000 dummy_array = (np.arange(0, geo_size) * maxval/geo_size - maxval/2).astype("int32") fake_lat_data = xr.DataArray(dummy_array.reshape(shape), dims=geo_dims) return (fake_lon_data, fake_lat_data)
[docs] def make_fake_mwr_l1c_lonlats(geo_size, geo_dims): """Return fake level-1c geolocation data arrays.""" maxval = 3600000 dummy_array = (np.arange(0, geo_size) * maxval/geo_size).astype("int32") fake_lon_data = xr.DataArray(dummy_array.reshape((10, 145)), dims=geo_dims) maxval = 1800000 dummy_array = (np.arange(0, geo_size) * maxval/geo_size - maxval/2).astype("int32") fake_lat_data = xr.DataArray(dummy_array.reshape((10, 145)), dims=geo_dims) return (fake_lon_data, fake_lat_data)
[docs] def aws_eps_sterna_mwr_level1_file(fake_mwr_data_array, eps_sterna=True, l1b=True): """Create an AWS and EPS-Sterna MWR l1b file.""" if eps_sterna: n_feedhorns="n_feedhorns" prefix = "" longitude_attr = "longitude" latitude_attr = "latitude" else: n_feedhorns="n_geo_groups" prefix = "aws_" longitude_attr = "aws_lon" latitude_attr = "aws_lat" if l1b: geo_dims = ["n_scans", "n_fovs", n_feedhorns] geo_size = 10 * 145 * 4 shape = (10, 145, 4) else: geo_dims = ["n_scans", "n_fovs"] geo_size = 10 * 145 shape = (10, 145) ds = DataTree() start_time = dt.datetime(2024, 9, 1, 12, 0) ds.attrs["sensing_start_time_utc"] = start_time.strftime(DATETIME_FORMAT) end_time = dt.datetime(2024, 9, 1, 12, 15) ds.attrs["sensing_end_time_utc"] = end_time.strftime(DATETIME_FORMAT) ds.attrs["instrument"] = "MWR" ds.attrs["orbit_start"] = 9991 ds.attrs["orbit_end"] = 9992 dset_name = f"data/calibration/{prefix}toa_brightness_temperature" ds[dset_name] = fake_mwr_data_array ds[dset_name].attrs["scale_factor"] = 0.001 ds[dset_name].attrs["add_offset"] = 0.0 ds[dset_name].attrs["missing_value"] = -2147483648 ds[dset_name].attrs["valid_min"] = 0 ds[dset_name].attrs["valid_max"] = 700000 fake_lon_data, fake_lat_data = make_fake_mwr_lonlats(geo_size, geo_dims, shape) ds[f"data/navigation/{longitude_attr}"] = fake_lon_data ds[f"data/navigation/{longitude_attr}"].attrs["scale_factor"] = 1e-4 ds[f"data/navigation/{longitude_attr}"].attrs["add_offset"] = 0.0 ds[f"data/navigation/{latitude_attr}"] = fake_lat_data ds[f"data/navigation/{prefix}solar_azimuth_angle"] = make_fake_angles(geo_size, geo_dims, shape) ds[f"data/navigation/{prefix}solar_zenith_angle"] = make_fake_angles(geo_size, geo_dims, shape) ds[f"data/navigation/{prefix}satellite_azimuth_angle"] = make_fake_angles(geo_size, geo_dims, shape) ds[f"data/navigation/{prefix}satellite_zenith_angle"] = make_fake_angles(geo_size, geo_dims, shape) if l1b: ds["status/satellite/subsat_latitude_end"] = np.array(22.39) ds["status/satellite/subsat_longitude_start"] = np.array(304.79) ds["status/satellite/subsat_latitude_start"] = np.array(55.41) ds["status/satellite/subsat_longitude_end"] = np.array(296.79) return ds
[docs] def create_mwr_file(tmpdir, data_array, eps_sterna=False, l1b=True): """Create an AWS or EPS-Sterna MWR l1b (or level-1c) file.""" ds = aws_eps_sterna_mwr_level1_file(data_array, eps_sterna=eps_sterna, l1b=l1b) start_time = dt.datetime.fromisoformat(ds.attrs["sensing_start_time_utc"]) end_time = dt.datetime.fromisoformat(ds.attrs["sensing_end_time_utc"]) platform_name = "ST01" if eps_sterna else "AWS1" processing_level = "1B" if l1b else "1C" processing_time = random_date(dt.datetime(2024, 9, 1, 13), dt.datetime(2030, 6, 1)) filename = tmpdir / compose(file_pattern, dict(country="XX", organisation="EUMETSAT", location="Darmstadt", processing_level=processing_level, originator="EUMT", start_time=start_time, end_time=end_time, processing_time=processing_time, platform_name=platform_name)) ds.to_netcdf(filename) return filename
[docs] @pytest.fixture(scope="module") def eps_sterna_mwr_file(tmp_path_factory, fake_mwr_data_array): """Create an EPS-Sterna MWR l1b file.""" tmpdir = tmp_path_factory.mktemp("eps_sterna_mwr_l1b_tests") return create_mwr_file(tmpdir, fake_mwr_data_array, eps_sterna=True)
[docs] @pytest.fixture(scope="module") def aws_mwr_file(tmp_path_factory, fake_mwr_data_array): """Create an AWS MWR l1b file.""" tmpdir = tmp_path_factory.mktemp("aws_l1b_tests") return create_mwr_file(tmpdir, fake_mwr_data_array, eps_sterna=False)
[docs] @pytest.fixture(scope="module") def aws_mwr_l1c_file(tmp_path_factory, fake_mwr_data_array): """Create an AWS MWR l1c file.""" tmpdir = tmp_path_factory.mktemp("aws_l1c_tests") return create_mwr_file(tmpdir, fake_mwr_data_array, eps_sterna=False, l1b=False)
[docs] @pytest.fixture(scope="module") def eps_sterna_mwr_handler(eps_sterna_mwr_file): """Create an EPS-Sterna MWR filehandler.""" filename_info = parse(file_pattern, os.path.basename(eps_sterna_mwr_file)) filetype_info = dict() filetype_info["file_type"] = "eps_sterna_mwr_l1b" filetype_info["feed_horn_group_name"] = "n_feedhorns" return AWS_EPS_Sterna_MWR_L1BFile(eps_sterna_mwr_file, filename_info, filetype_info)
[docs] @pytest.fixture(scope="module") def aws_mwr_handler(aws_mwr_file): """Create an AWS MWR filehandler.""" filename_info = parse(file_pattern, os.path.basename(aws_mwr_file)) filetype_info = dict() filetype_info["file_type"] = "aws1_mwr_l1b" filetype_info["feed_horn_group_name"] = "n_geo_groups" return AWS_EPS_Sterna_MWR_L1BFile(aws_mwr_file, filename_info, filetype_info)
[docs] @pytest.fixture(scope="module") def aws_mwr_l1c_handler(aws_mwr_l1c_file): """Create an AWS MWR level-1c filehandler.""" filename_info = parse(file_pattern, os.path.basename(aws_mwr_l1c_file)) filetype_info = dict() filetype_info["file_type"] = "aws1_mwr_l1c" filetype_info["feed_horn_group_name"] = None return AWS_MWR_L1CFile(aws_mwr_l1c_file, filename_info, filetype_info)