Source code for satpy.tests.reader_tests.test_mirs

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2019 Satpy developers
#
# This file is part of Satpy.
#
# Satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# Satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Module for testing the satpy.readers.tropomi_l2 module."""

import os
from datetime import datetime
from unittest import mock

import numpy as np
import pytest
import xarray as xr

METOP_FILE = "IMG_SX.M2.D17037.S1601.E1607.B0000001.WE.HR.ORB.nc"
NPP_MIRS_L2_SWATH = "NPR-MIRS-IMG_v11r6_npp_s201702061601000_e201702061607000_c202012201658410.nc"
N20_MIRS_L2_SWATH = "NPR-MIRS-IMG_v11r4_n20_s201702061601000_e201702061607000_c202012201658410.nc"
OTHER_MIRS_L2_SWATH = "NPR-MIRS-IMG_v11r4_gpm_s201702061601000_e201702061607000_c202010080001310.nc"

EXAMPLE_FILES = [METOP_FILE, NPP_MIRS_L2_SWATH, OTHER_MIRS_L2_SWATH]

N_CHANNEL = 22
N_FOV = 96
N_SCANLINE = 100
DEFAULT_FILE_DTYPE = np.float64
DEFAULT_2D_SHAPE = (N_SCANLINE, N_FOV)
DEFAULT_DATE = datetime(2019, 6, 19, 13, 0)
DEFAULT_LAT = np.linspace(23.09356, 36.42844, N_SCANLINE * N_FOV,
                          dtype=DEFAULT_FILE_DTYPE)
DEFAULT_LON = np.linspace(127.6879, 144.5284, N_SCANLINE * N_FOV,
                          dtype=DEFAULT_FILE_DTYPE)
FREQ = xr.DataArray([23.8, 31.4, 50.3, 51.76, 52.8, 53.596, 54.4, 54.94, 55.5,
                     57.29, 57.29, 57.29, 57.29, 57.29, 57.29, 88.2, 165.5,
                     183.31, 183.31, 183.31, 183.31, 183.31][:N_CHANNEL],
                    dims='Channel',
                    attrs={'description': "Central Frequencies (GHz)"})
POLO = xr.DataArray([2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2, 3, 3, 3,
                     3, 3, 3][:N_CHANNEL], dims='Channel',
                    attrs={'description': "Polarizations"})

DS_IDS = ['RR', 'longitude', 'latitude']
TEST_VARS = ['btemp_88v', 'btemp_165h',
             'btemp_23v', 'RR', 'Sfc_type']
DEFAULT_UNITS = {'btemp_88v': 'K', 'btemp_165h': 'K',
                 'btemp_23v': 'K', 'RR': 'mm/hr', 'Sfc_type': "1"}
PLATFORM = {"M2": "metop-a", "NPP": "npp", "GPM": "gpm"}
SENSOR = {"m2": "amsu-mhs", "npp": "atms", "gpm": "GPI"}

START_TIME = datetime(2017, 2, 6, 16, 1, 0)
END_TIME = datetime(2017, 2, 6, 16, 7, 0)


[docs] def fake_coeff_from_fn(fn): """Create Fake Coefficients.""" ameans = np.random.uniform(261, 267, N_CHANNEL) locations = [ [1, 2], [1, 2], [3, 4, 5], [3, 4, 5], [4, 5, 6], [5, 6, 7], [6, 7, 8], [7, 8], [9, 10, 11], [10, 11], [10, 11, 12], [11, 12, 13], [12, 13], [12, 13, 14], [14, 15], [1, 16], [17, 18], [18, 19], [18, 19, 20], [19, 20, 21], [20, 21, 22], [21, 22], ] all_nchx = [len(loc) for loc in locations] coeff_str = [] for idx in range(1, N_CHANNEL + 1): nx = idx - 1 coeff_str.append('\n') next_line = ' {} {} {}\n'.format(idx, all_nchx[nx], ameans[nx]) coeff_str.append(next_line) next_line = ' {}\n'.format(" ".join([str(x) for x in locations[idx - 1]])) coeff_str.append(next_line) for fov in range(1, N_FOV+1): random_coeff = np.random.rand(all_nchx[nx]) random_coeff = np.ones(all_nchx[nx]) str_coeff = ' '.join([str(x) for x in random_coeff]) random_means = np.random.uniform(261, 267, all_nchx[nx]) random_means = np.zeros(all_nchx[nx]) str_means = ' '.join([str(x) for x in random_means]) error_val = np.random.uniform(0, 4) coeffs_line = ' {:>2} {:>2} {} {} {}\n'.format(idx, fov, str_coeff, str_means, error_val) coeff_str.append(coeffs_line) return coeff_str
[docs] def _get_datasets_with_attributes(**kwargs): """Represent files with two resolution of variables in them (ex. OCEAN).""" bt = xr.DataArray(np.linspace(1830, 3930, N_SCANLINE * N_FOV * N_CHANNEL). reshape(N_SCANLINE, N_FOV, N_CHANNEL), attrs={'long_name': "Channel Temperature (K)", 'units': "Kelvin", 'coordinates': "Longitude Latitude Freq", 'scale_factor': 0.01, '_FillValue': -999, 'valid_range': [0, 50000]}, dims=('Scanline', 'Field_of_view', 'Channel')) rr = xr.DataArray(np.random.randint(100, 500, size=(N_SCANLINE, N_FOV)), attrs={'long_name': "Rain Rate (mm/hr)", 'units': "mm/hr", 'coordinates': "Longitude Latitude", 'scale_factor': 0.1, '_FillValue': -999, 'valid_range': [0, 1000]}, dims=('Scanline', 'Field_of_view')) sfc_type = xr.DataArray(np.random.randint(0, 4, size=(N_SCANLINE, N_FOV)), attrs={'description': "type of surface:0-ocean," + "1-sea ice,2-land,3-snow", 'units': "1", 'coordinates': "Longitude Latitude", '_FillValue': -999, 'valid_range': [0, 3] }, dims=('Scanline', 'Field_of_view')) latitude = xr.DataArray(DEFAULT_LAT.reshape(DEFAULT_2D_SHAPE), attrs={'long_name': "Latitude of the view (-90,90)"}, dims=('Scanline', 'Field_of_view')) longitude = xr.DataArray(DEFAULT_LON.reshape(DEFAULT_2D_SHAPE), attrs={'long_name': "Longitude of the view (-180,180)"}, dims=('Scanline', 'Field_of_view')) ds_vars = { 'Freq': FREQ, 'Polo': POLO, 'BT': bt, 'RR': rr, 'Sfc_type': sfc_type, 'Latitude': latitude, 'Longitude': longitude } attrs = {'missing_value': -999.} ds = xr.Dataset(ds_vars, attrs=attrs) ds = ds.assign_coords({"Freq": FREQ, "Latitude": latitude, "Longitude": longitude}) return ds
[docs] def _get_datasets_with_less_attributes(): """Represent files with two resolution of variables in them (ex. OCEAN).""" bt = xr.DataArray(np.linspace(1830, 3930, N_SCANLINE * N_FOV * N_CHANNEL). reshape(N_SCANLINE, N_FOV, N_CHANNEL), attrs={'long_name': "Channel Temperature (K)", 'scale_factor': 0.01}, dims=('Scanline', 'Field_of_view', 'Channel')) rr = xr.DataArray(np.random.randint(100, 500, size=(N_SCANLINE, N_FOV)), attrs={'long_name': "Rain Rate (mm/hr)", 'scale_factor': 0.1}, dims=('Scanline', 'Field_of_view')) sfc_type = xr.DataArray(np.random.randint(0, 4, size=(N_SCANLINE, N_FOV)), attrs={'description': "type of surface:0-ocean," + "1-sea ice,2-land,3-snow"}, dims=('Scanline', 'Field_of_view')) latitude = xr.DataArray(DEFAULT_LAT.reshape(DEFAULT_2D_SHAPE), attrs={'long_name': "Latitude of the view (-90,90)"}, dims=('Scanline', 'Field_of_view')) longitude = xr.DataArray(DEFAULT_LON.reshape(DEFAULT_2D_SHAPE), attrs={"long_name": "Longitude of the view (-180,180)"}, dims=('Scanline', 'Field_of_view')) ds_vars = { 'Freq': FREQ, 'Polo': POLO, 'BT': bt, 'RR': rr, 'Sfc_type': sfc_type, 'Longitude': longitude, 'Latitude': latitude } attrs = {'missing_value': -999.} ds = xr.Dataset(ds_vars, attrs=attrs) ds = ds.assign_coords({"Freq": FREQ, "Latitude": latitude, "Longitude": longitude}) return ds
[docs] def fake_open_dataset(filename, **kwargs): """Create a Dataset similar to reading an actual file with xarray.open_dataset.""" if filename == METOP_FILE: return _get_datasets_with_less_attributes() return _get_datasets_with_attributes()
[docs] class TestMirsL2_NcReader: """Test mirs Reader.""" yaml_file = "mirs.yaml"
[docs] def setup_method(self): """Read fake data.""" from satpy._config import config_search_paths self.reader_configs = config_search_paths(os.path.join('readers', self.yaml_file))
[docs] @pytest.mark.parametrize( ("filenames", "expected_loadables"), [ ([METOP_FILE], 1), ([NPP_MIRS_L2_SWATH], 1), ([OTHER_MIRS_L2_SWATH], 1), ] ) def test_reader_creation(self, filenames, expected_loadables): """Test basic initialization.""" from satpy.readers import load_reader with mock.patch('satpy.readers.mirs.xr.open_dataset') as od: od.side_effect = fake_open_dataset r = load_reader(self.reader_configs) loadables = r.select_files_from_pathnames(filenames) assert len(loadables) == expected_loadables r.create_filehandlers(loadables) # make sure we have some files assert r.file_handlers
[docs] @pytest.mark.parametrize( ("filenames", "expected_datasets"), [ ([METOP_FILE], DS_IDS), ([NPP_MIRS_L2_SWATH], DS_IDS), ([OTHER_MIRS_L2_SWATH], DS_IDS), ] ) def test_available_datasets(self, filenames, expected_datasets): """Test that variables are dynamically discovered.""" from satpy.readers import load_reader with mock.patch('satpy.readers.mirs.xr.open_dataset') as od: od.side_effect = fake_open_dataset r = load_reader(self.reader_configs) loadables = r.select_files_from_pathnames(filenames) r.create_filehandlers(loadables) avails = list(r.available_dataset_names) for var_name in expected_datasets: assert var_name in avails
[docs] @staticmethod def _check_area(data_arr): from pyresample.geometry import SwathDefinition area = data_arr.attrs['area'] assert isinstance(area, SwathDefinition)
[docs] @staticmethod def _check_fill(data_arr): assert '_FillValue' not in data_arr.attrs if np.issubdtype(data_arr.dtype, np.floating): # we started with float32, it should stay that way assert data_arr.dtype.type == np.float64
[docs] @staticmethod def _check_valid_range(data_arr, test_valid_range): # valid_range is popped out of data_arr.attrs when it is applied assert 'valid_range' not in data_arr.attrs assert data_arr.data.min() >= test_valid_range[0] assert data_arr.data.max() <= test_valid_range[1]
[docs] @staticmethod def _check_fill_value(data_arr, test_fill_value): assert '_FillValue' not in data_arr.attrs assert not (data_arr.data == test_fill_value).any()
[docs] @staticmethod def _check_attrs(data_arr, platform_name): attrs = data_arr.attrs assert 'scale_factor' not in attrs assert 'platform_name' in attrs assert attrs['platform_name'] == platform_name assert attrs['start_time'] == START_TIME assert attrs['end_time'] == END_TIME
[docs] @pytest.mark.parametrize( ("filenames", "loadable_ids", "platform_name"), [ ([METOP_FILE], TEST_VARS, "metop-a"), ([NPP_MIRS_L2_SWATH], TEST_VARS, "npp"), ([N20_MIRS_L2_SWATH], TEST_VARS, "noaa-20"), ([OTHER_MIRS_L2_SWATH], TEST_VARS, "gpm"), ] ) @pytest.mark.parametrize('reader_kw', [{}, {'limb_correction': False}]) def test_basic_load(self, filenames, loadable_ids, platform_name, reader_kw): """Test that variables are loaded properly.""" from satpy.readers import load_reader with mock.patch('satpy.readers.mirs.xr.open_dataset') as od: od.side_effect = fake_open_dataset r = load_reader(self.reader_configs) loadables = r.select_files_from_pathnames(filenames) r.create_filehandlers(loadables, fh_kwargs=reader_kw) with mock.patch('satpy.readers.mirs.read_atms_coeff_to_string') as \ fd, mock.patch('satpy.readers.mirs.retrieve'): fd.side_effect = fake_coeff_from_fn loaded_data_arrs = r.load(loadable_ids) assert len(loaded_data_arrs) == len(loadable_ids) test_data = fake_open_dataset(filenames[0]) for _data_id, data_arr in loaded_data_arrs.items(): data_arr = data_arr.compute() var_name = data_arr.attrs["name"] if var_name not in ['latitude', 'longitude']: self._check_area(data_arr) self._check_fill(data_arr) self._check_attrs(data_arr, platform_name) input_fake_data = test_data['BT'] if "btemp" in var_name \ else test_data[var_name] if "valid_range" in input_fake_data.attrs: valid_range = input_fake_data.attrs["valid_range"] self._check_valid_range(data_arr, valid_range) if "_FillValue" in input_fake_data.attrs: fill_value = input_fake_data.attrs["_FillValue"] self._check_fill_value(data_arr, fill_value) sensor = data_arr.attrs['sensor'] if reader_kw.get('limb_correction', True) and sensor == 'atms': fd.assert_called() else: fd.assert_not_called() assert data_arr.attrs['units'] == DEFAULT_UNITS[var_name]