#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2019 Satpy developers
#
# This file is part of Satpy.
#
# Satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# Satpy. If not, see <http://www.gnu.org/licenses/>.
"""Module for testing the satpy.readers.mirs module."""
from __future__ import annotations
import os
from datetime import datetime
from unittest import mock
import numpy as np
import pytest
import xarray as xr
from satpy._config import config_search_paths
from satpy.dataset import DataID
from satpy.readers import load_reader
from satpy.readers.yaml_reader import FileYAMLReader
METOP_FILE = "IMG_SX.M2.D17037.S1601.E1607.B0000001.WE.HR.ORB.nc"
NPP_MIRS_L2_SWATH = "NPR-MIRS-IMG_v11r6_npp_s201702061601000_e201702061607000_c202012201658410.nc"
N20_MIRS_L2_SWATH = "NPR-MIRS-IMG_v11r4_n20_s201702061601000_e201702061607000_c202012201658410.nc"
N21_MIRS_L2_SWATH = "NPR-MIRS-IMG_v11r4_n21_s201702061601000_e201702061607000_c202012201658410.nc"
OTHER_MIRS_L2_SWATH = "NPR-MIRS-IMG_v11r4_gpm_s201702061601000_e201702061607000_c202010080001310.nc"
EXAMPLE_FILES = [METOP_FILE, NPP_MIRS_L2_SWATH, OTHER_MIRS_L2_SWATH]
N_CHANNEL = 22
N_FOV = 96
N_SCANLINE = 100
DEFAULT_FILE_DTYPE = np.float32
DEFAULT_2D_SHAPE = (N_SCANLINE, N_FOV)
DEFAULT_DATE = datetime(2019, 6, 19, 13, 0)
DEFAULT_LAT = np.linspace(23.09356, 36.42844, N_SCANLINE * N_FOV,
dtype=DEFAULT_FILE_DTYPE)
DEFAULT_LON = np.linspace(127.6879, 144.5284, N_SCANLINE * N_FOV,
dtype=DEFAULT_FILE_DTYPE)
FREQ = xr.DataArray(
np.array([23.8, 31.4, 50.3, 51.76, 52.8, 53.596, 54.4, 54.94, 55.5,
57.29, 57.29, 57.29, 57.29, 57.29, 57.29, 88.2, 165.5,
183.31, 183.31, 183.31, 183.31, 183.31][:N_CHANNEL], dtype=np.float32),
dims="Channel",
attrs={"description": "Central Frequencies (GHz)"},
)
POLO = xr.DataArray(
np.array([2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2, 3, 3, 3, 3, 3, 3][:N_CHANNEL], dtype=np.int16),
dims="Channel",
attrs={"description": "Polarizations"},
)
DS_IDS = ["RR", "longitude", "latitude"]
TEST_VARS = ["btemp_88v", "btemp_165h",
"btemp_23v", "RR", "Sfc_type"]
DEFAULT_UNITS = {"btemp_88v": "K", "btemp_165h": "K",
"btemp_23v": "K", "RR": "mm/hr", "Sfc_type": "1"}
PLATFORM = {"M2": "metop-a", "NPP": "npp", "GPM": "gpm"}
SENSOR = {"m2": "amsu-mhs", "npp": "atms", "gpm": "GPI"}
START_TIME = datetime(2017, 2, 6, 16, 1, 0)
END_TIME = datetime(2017, 2, 6, 16, 7, 0)
[docs]
def fake_coeff_from_fn(fn):
"""Create Fake Coefficients."""
ameans = np.random.uniform(261, 267, N_CHANNEL)
locations = [
[1, 2],
[1, 2],
[3, 4, 5],
[3, 4, 5],
[4, 5, 6],
[5, 6, 7],
[6, 7, 8],
[7, 8],
[9, 10, 11],
[10, 11],
[10, 11, 12],
[11, 12, 13],
[12, 13],
[12, 13, 14],
[14, 15],
[1, 16],
[17, 18],
[18, 19],
[18, 19, 20],
[19, 20, 21],
[20, 21, 22],
[21, 22],
]
all_nchx = [len(loc) for loc in locations]
coeff_str = []
for idx in range(1, N_CHANNEL + 1):
nx = idx - 1
coeff_str.append("\n")
next_line = " {} {} {}\n".format(idx, all_nchx[nx], ameans[nx])
coeff_str.append(next_line)
next_line = " {}\n".format(" ".join([str(x) for x in locations[idx - 1]]))
coeff_str.append(next_line)
for fov in range(1, N_FOV+1):
random_coeff = np.ones(all_nchx[nx])
str_coeff = " ".join([str(x) for x in random_coeff])
random_means = np.zeros(all_nchx[nx])
str_means = " ".join([str(x) for x in random_means])
error_val = np.random.uniform(0, 4)
coeffs_line = " {:>2} {:>2} {} {} {}\n".format(idx, fov,
str_coeff,
str_means,
error_val)
coeff_str.append(coeffs_line)
return coeff_str
[docs]
def _get_datasets_with_attributes(**kwargs):
"""Represent files with two resolution of variables in them (ex. OCEAN)."""
bt = xr.DataArray(np.linspace(1830, 3930, N_SCANLINE * N_FOV * N_CHANNEL, dtype=np.int16).
reshape(N_SCANLINE, N_FOV, N_CHANNEL),
attrs={"long_name": "Channel Temperature (K)",
"units": "Kelvin",
"coordinates": "Longitude Latitude Freq",
"scale_factor": 0.01,
"_FillValue": -999,
"valid_range": [0, 50000]},
dims=("Scanline", "Field_of_view", "Channel"))
rr = xr.DataArray(np.random.randint(100, 500, size=(N_SCANLINE, N_FOV), dtype=np.int16),
attrs={"long_name": "Rain Rate (mm/hr)",
"units": "mm/hr",
"coordinates": "Longitude Latitude",
"scale_factor": 0.1,
"_FillValue": -999,
"valid_range": [0, 1000]},
dims=("Scanline", "Field_of_view"))
sfc_type = xr.DataArray(np.random.randint(0, 4, size=(N_SCANLINE, N_FOV), dtype=np.int16),
attrs={"description": "type of surface:0-ocean," +
"1-sea ice,2-land,3-snow",
"units": "1",
"coordinates": "Longitude Latitude",
"_FillValue": -999,
"valid_range": [0, 3]
},
dims=("Scanline", "Field_of_view"))
latitude = xr.DataArray(DEFAULT_LAT.reshape(DEFAULT_2D_SHAPE),
attrs={"long_name":
"Latitude of the view (-90,90)"},
dims=("Scanline", "Field_of_view"))
longitude = xr.DataArray(DEFAULT_LON.reshape(DEFAULT_2D_SHAPE),
attrs={"long_name":
"Longitude of the view (-180,180)"},
dims=("Scanline", "Field_of_view"))
ds_vars = {
"Freq": FREQ,
"Polo": POLO,
"BT": bt,
"RR": rr,
"Sfc_type": sfc_type,
"Latitude": latitude,
"Longitude": longitude
}
attrs = {"missing_value": -999}
ds = xr.Dataset(ds_vars, attrs=attrs)
ds = ds.assign_coords({"Freq": FREQ, "Latitude": latitude, "Longitude": longitude})
return ds
[docs]
def _get_datasets_with_less_attributes():
"""Represent files with two resolution of variables in them (ex. OCEAN)."""
bt = xr.DataArray(np.linspace(1830, 3930, N_SCANLINE * N_FOV * N_CHANNEL, dtype=np.int16).
reshape(N_SCANLINE, N_FOV, N_CHANNEL),
attrs={"long_name": "Channel Temperature (K)",
"scale_factor": 0.01},
dims=("Scanline", "Field_of_view", "Channel"))
rr = xr.DataArray(np.random.randint(100, 500, size=(N_SCANLINE, N_FOV), dtype=np.int16),
attrs={"long_name": "Rain Rate (mm/hr)",
"scale_factor": 0.1},
dims=("Scanline", "Field_of_view"))
sfc_type = xr.DataArray(np.random.randint(0, 4, size=(N_SCANLINE, N_FOV), dtype=np.int16),
attrs={"description": "type of surface:0-ocean," +
"1-sea ice,2-land,3-snow"},
dims=("Scanline", "Field_of_view"))
latitude = xr.DataArray(DEFAULT_LAT.reshape(DEFAULT_2D_SHAPE),
attrs={"long_name":
"Latitude of the view (-90,90)"},
dims=("Scanline", "Field_of_view"))
longitude = xr.DataArray(DEFAULT_LON.reshape(DEFAULT_2D_SHAPE),
attrs={"long_name":
"Longitude of the view (-180,180)"},
dims=("Scanline", "Field_of_view"))
ds_vars = {
"Freq": FREQ,
"Polo": POLO,
"BT": bt,
"RR": rr,
"Sfc_type": sfc_type,
"Longitude": longitude,
"Latitude": latitude
}
attrs = {"missing_value": -999.}
ds = xr.Dataset(ds_vars, attrs=attrs)
ds = ds.assign_coords({"Freq": FREQ, "Latitude": latitude, "Longitude": longitude})
return ds
[docs]
def fake_open_dataset(filename, **kwargs):
"""Create a Dataset similar to reading an actual file with xarray.open_dataset."""
if filename == METOP_FILE:
return _get_datasets_with_less_attributes()
return _get_datasets_with_attributes()
[docs]
@pytest.mark.parametrize(
("filenames", "expected_datasets"),
[
([METOP_FILE], DS_IDS),
([NPP_MIRS_L2_SWATH], DS_IDS),
([OTHER_MIRS_L2_SWATH], DS_IDS),
]
)
def test_available_datasets(filenames, expected_datasets):
"""Test that variables are dynamically discovered."""
r = _create_fake_reader(filenames, {})
avails = list(r.available_dataset_names)
for var_name in expected_datasets:
assert var_name in avails
[docs]
@pytest.mark.parametrize(
("filenames", "loadable_ids", "platform_name"),
[
([METOP_FILE], TEST_VARS, "metop-a"),
([NPP_MIRS_L2_SWATH], TEST_VARS, "npp"),
([N20_MIRS_L2_SWATH], TEST_VARS, "noaa-20"),
([N21_MIRS_L2_SWATH], TEST_VARS, "noaa-21"),
([OTHER_MIRS_L2_SWATH], TEST_VARS, "gpm"),
]
)
@pytest.mark.parametrize("reader_kw", [{}, {"limb_correction": False}])
def test_basic_load(filenames, loadable_ids, platform_name, reader_kw):
"""Test that variables are loaded properly."""
r = _create_fake_reader(filenames, reader_kw)
test_data = fake_open_dataset(filenames[0])
exp_limb_corr = reader_kw.get("limb_correction", True) and platform_name in ("npp", "noaa-20", "noaa-21")
loaded_data_arrs = _load_and_check_limb_correction_variables(r, loadable_ids, platform_name, exp_limb_corr)
for _data_id, data_arr_dask in loaded_data_arrs.items():
data_arr = data_arr_dask.compute()
assert data_arr.dtype == data_arr_dask.dtype
if np.issubdtype(data_arr.dtype, np.floating):
# we started with float32, it should stay that way
# NOTE: Sfc_type does not have enough metadata to dynamically force integer type
# even though it is a mask/category product
assert data_arr.dtype.type == np.float32
_check_metadata(data_arr, test_data, platform_name)
[docs]
def _create_fake_reader(
filenames: list[str],
reader_kwargs: dict,
exp_loadable_files: int | None = None
) -> FileYAMLReader:
exp_loadable_files = exp_loadable_files if exp_loadable_files is not None else len(filenames)
reader_configs = config_search_paths(os.path.join("readers", "mirs.yaml"))
with mock.patch("satpy.readers.mirs.xr.open_dataset") as od:
od.side_effect = fake_open_dataset
r = load_reader(reader_configs)
loadables = r.select_files_from_pathnames(filenames)
r.create_filehandlers(loadables, fh_kwargs=reader_kwargs)
assert isinstance(r, FileYAMLReader)
assert len(loadables) == exp_loadable_files
assert r.file_handlers
return r
[docs]
def _load_and_check_limb_correction_variables(
reader: FileYAMLReader,
loadable_ids: list[str],
platform_name: str,
exp_limb_corr: bool
) -> dict[DataID, xr.DataArray]:
with mock.patch("satpy.readers.mirs.read_atms_coeff_to_string") as \
fd, mock.patch("satpy.readers.mirs.retrieve") as rtv:
fd.side_effect = fake_coeff_from_fn
loaded_data_arrs = reader.load(loadable_ids)
if exp_limb_corr:
fd.assert_called()
suffix = f"noaa{platform_name[-2:]}" if platform_name.startswith("noaa") else "snpp"
assert rtv.call_count == 2 * len([var_name for var_name in loadable_ids if "btemp" in var_name])
for calls_args in rtv.call_args_list:
assert calls_args[0][0].endswith(f"_{suffix}.txt")
else:
fd.assert_not_called()
rtv.assert_not_called()
assert len(loaded_data_arrs) == len(loadable_ids)
return loaded_data_arrs
[docs]
def _check_area(data_arr):
from pyresample.geometry import SwathDefinition
area = data_arr.attrs["area"]
assert isinstance(area, SwathDefinition)
[docs]
def _check_valid_range(data_arr, test_valid_range):
# valid_range is popped out of data_arr.attrs when it is applied
assert "valid_range" not in data_arr.attrs
assert data_arr.data.min() >= test_valid_range[0]
assert data_arr.data.max() <= test_valid_range[1]
[docs]
def _check_fill_value(data_arr, test_fill_value):
assert "_FillValue" not in data_arr.attrs
assert not (data_arr.data == test_fill_value).any()
[docs]
def _check_attrs(data_arr, platform_name):
attrs = data_arr.attrs
assert "scale_factor" not in attrs
assert "platform_name" in attrs
assert attrs["platform_name"] == platform_name
assert attrs["start_time"] == START_TIME
assert attrs["end_time"] == END_TIME