#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2022-2023 Pytroll developers
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Module for testing the ATMS SDR HDF5 reader."""
import os
from datetime import datetime
from unittest import mock
import numpy as np
import pytest
from satpy._config import config_search_paths
from satpy.readers import load_reader
from satpy.readers.atms_sdr_hdf5 import ATMS_CHANNEL_NAMES
from satpy.readers.viirs_atms_sdr_base import DATASET_KEYS
from satpy.tests.reader_tests.test_hdf5_utils import FakeHDF5FileHandler
DEFAULT_FILE_DTYPE = np.uint16
DEFAULT_FILE_SHAPE = (1, 96)
# Mimicking one scan line of data
DEFAULT_FILE_DATA = np.arange(DEFAULT_FILE_SHAPE[0] * DEFAULT_FILE_SHAPE[1],
dtype=DEFAULT_FILE_DTYPE).reshape(DEFAULT_FILE_SHAPE)
DEFAULT_FILE_FACTORS = np.array([2.0, 1.0], dtype=np.float32)
[docs]
class FakeHDF5_ATMS_SDR_FileHandler(FakeHDF5FileHandler):
"""Swap-in HDF5 File Handler."""
_num_test_granules = 1
_num_scans_per_gran = [12]
_num_of_bands = 22
def __init__(self, filename, filename_info, filetype_info, include_factors=True):
"""Create fake file handler."""
self.include_factors = include_factors
super().__init__(filename, filename_info, filetype_info)
[docs]
@staticmethod
def _add_basic_metadata_to_file_content(file_content, filename_info, num_grans):
start_time = filename_info['start_time']
end_time = filename_info['end_time'].replace(year=start_time.year,
month=start_time.month,
day=start_time.day)
begin_date = start_time.strftime('%Y%m%d')
begin_time = start_time.strftime('%H%M%S.%fZ')
ending_date = end_time.strftime('%Y%m%d')
ending_time = end_time.strftime('%H%M%S.%fZ')
new_file_content = {
"{prefix2}/attr/AggregateNumberGranules": num_grans,
"{prefix2}/attr/AggregateBeginningDate": begin_date,
"{prefix2}/attr/AggregateBeginningTime": begin_time,
"{prefix2}/attr/AggregateEndingDate": ending_date,
"{prefix2}/attr/AggregateEndingTime": ending_time,
"{prefix2}/attr/G-Ring_Longitude": np.array([0.0, 0.1, 0.2, 0.3]),
"{prefix2}/attr/G-Ring_Latitude": np.array([0.0, 0.1, 0.2, 0.3]),
"{prefix2}/attr/AggregateBeginningOrbitNumber": "{0:d}".format(filename_info['orbit']),
"{prefix2}/attr/AggregateEndingOrbitNumber": "{0:d}".format(filename_info['orbit']),
"{prefix1}/attr/Instrument_Short_Name": "ATMS",
"/attr/Platform_Short_Name": "J01",
}
file_content.update(new_file_content)
[docs]
def _add_granule_specific_info_to_file_content(self, file_content, dataset_group,
num_granules, num_scans_per_granule, gran_group_prefix):
lons_lists = self._get_per_granule_lons()
lats_lists = self._get_per_granule_lats()
file_content["{prefix3}/NumberOfScans"] = np.array([1] * num_granules)
for granule_idx in range(num_granules):
prefix_gran = '{prefix}/{dataset_group}_Gran_{idx}'.format(prefix=gran_group_prefix,
dataset_group=dataset_group,
idx=granule_idx)
num_scans = num_scans_per_granule[granule_idx]
file_content[prefix_gran + '/attr/N_Number_Of_Scans'] = num_scans
file_content[prefix_gran + '/attr/G-Ring_Longitude'] = lons_lists[granule_idx]
file_content[prefix_gran + '/attr/G-Ring_Latitude'] = lats_lists[granule_idx]
[docs]
@staticmethod
def _get_per_granule_lons():
return [
np.array(
[
50.51393, 49.566296, 48.865967, 18.96082, -4.0238385,
-7.05221, -10.405702, 14.638646
],
dtype=np.float32),
np.array(
[
53.52594, 51.685738, 50.439102, 14.629087, -10.247547,
-13.951393, -18.256989, 8.36572
],
dtype=np.float32),
np.array(
[
59.386833, 55.770416, 53.38952, 8.353765, -18.062435,
-22.608992, -27.867302, -1.3537619
],
dtype=np.float32),
np.array(
[
72.50243, 64.17125, 59.15234, -1.3654504, -27.620953,
-33.091743, -39.28113, -17.749891
],
dtype=np.float32)
]
[docs]
@staticmethod
def _get_per_granule_lats():
return [
np.array(
[
67.969505, 65.545685, 63.103046, 61.853905, 55.169273,
57.062447, 58.86063, 66.495514
],
dtype=np.float32),
np.array(
[
72.74879, 70.2493, 67.84738, 66.49691, 58.77254,
60.465942, 62.11525, 71.08249
],
dtype=np.float32),
np.array(
[
77.393425, 74.977875, 72.62976, 71.083435, 62.036346,
63.465122, 64.78075, 75.36842
],
dtype=np.float32),
np.array(
[
81.67615, 79.49934, 77.278656, 75.369415, 64.72178,
65.78417, 66.66166, 79.00025
],
dtype=np.float32),
]
[docs]
def _add_data_info_to_file_content(self, file_content, filename, data_var_prefix, num_grans):
# ATMS SDR files always produce data with 12 scans per granule even if there are less? FIXME!
total_rows = DEFAULT_FILE_SHAPE[0] * 12 * num_grans
new_shape = (total_rows, DEFAULT_FILE_SHAPE[1], self._num_of_bands)
key = 'BrightnessTemperature'
key = data_var_prefix + "/" + key
file_content[key] = np.repeat(DEFAULT_FILE_DATA.copy(), 12 * num_grans, axis=0)
file_content[key] = np.repeat(file_content[key][:, :, np.newaxis], self._num_of_bands, axis=2)
file_content[key + "/shape"] = new_shape
if self.include_factors:
file_content[key + "Factors"] = np.repeat(
DEFAULT_FILE_FACTORS.copy()[None, :], num_grans, axis=0).ravel()
[docs]
@staticmethod
def _add_geolocation_info_to_file_content(file_content, filename, data_var_prefix, num_grans):
# ATMS SDR files always produce data with 12 scans per granule even if there are less? FIXME!
total_rows = DEFAULT_FILE_SHAPE[0] * 12 * num_grans
new_shape = (total_rows, DEFAULT_FILE_SHAPE[1])
lon_data = np.linspace(15, 55, DEFAULT_FILE_SHAPE[1]).astype(DEFAULT_FILE_DTYPE)
lat_data = np.linspace(55, 75, DEFAULT_FILE_SHAPE[1]).astype(DEFAULT_FILE_DTYPE)
for k in ["Latitude"]:
k = data_var_prefix + "/" + k
file_content[k] = lat_data
file_content[k] = np.repeat([file_content[k]], total_rows, axis=0)
file_content[k + "/shape"] = new_shape
for k in ["Longitude"]:
k = data_var_prefix + "/" + k
file_content[k] = lon_data
file_content[k] = np.repeat([file_content[k]], total_rows, axis=0)
file_content[k + "/shape"] = new_shape
angles = ['SolarZenithAngle',
'SolarAzimuthAngle',
'SatelliteZenithAngle',
'SatelliteAzimuthAngle']
for k in angles:
k = data_var_prefix + "/" + k
file_content[k] = lon_data # close enough to SZA
file_content[k] = np.repeat([file_content[k]], total_rows, axis=0)
file_content[k + "/shape"] = new_shape
[docs]
@staticmethod
def _add_geo_ref(file_content, filename):
geo_prefix = 'GATMO'
file_content['/attr/N_GEO_Ref'] = geo_prefix + filename[5:]
[docs]
@staticmethod
def _convert_numpy_content_to_dataarray(final_content):
import dask.array as da
from xarray import DataArray
for key, val in final_content.items():
if isinstance(val, np.ndarray):
val = da.from_array(val, chunks=val.shape)
if val.ndim > 2:
final_content[key] = DataArray(val, dims=('y', 'x', 'z'))
elif val.ndim > 1:
final_content[key] = DataArray(val, dims=('y', 'x'))
else:
final_content[key] = DataArray(val)
[docs]
def get_test_content(self, filename, filename_info, filetype_info):
"""Mimic reader input file content."""
final_content = {}
for dataset in self.datasets:
dataset_group = DATASET_KEYS[dataset]
prefix1 = 'Data_Products/{dataset_group}'.format(dataset_group=dataset_group)
prefix2 = '{prefix}/{dataset_group}_Aggr'.format(prefix=prefix1, dataset_group=dataset_group)
prefix3 = 'All_Data/{dataset_group}_All'.format(dataset_group=dataset_group)
file_content = {}
self._add_basic_metadata_to_file_content(file_content, filename_info, self._num_test_granules)
self._add_granule_specific_info_to_file_content(file_content, dataset_group,
self._num_test_granules, self._num_scans_per_gran,
prefix1)
self._add_geo_ref(file_content, filename)
for k, v in list(file_content.items()):
file_content[k.format(prefix1=prefix1, prefix2=prefix2, prefix3=prefix3)] = v
if filename[:5] in ['SATMS', 'TATMS']:
self._add_data_info_to_file_content(file_content, filename, prefix3,
self._num_test_granules)
elif filename[0] == 'G':
self._add_geolocation_info_to_file_content(file_content, filename, prefix3,
self._num_test_granules)
final_content.update(file_content)
self._convert_numpy_content_to_dataarray(final_content)
return final_content
[docs]
class TestATMS_SDR_Reader:
"""Test ATMS SDR Reader."""
yaml_file = "atms_sdr_hdf5.yaml"
[docs]
def _assert_bt_properties(self, data_arr, num_scans=1, with_area=True):
assert np.issubdtype(data_arr.dtype, np.float32)
assert data_arr.attrs['calibration'] == 'brightness_temperature'
assert data_arr.attrs['units'] == 'K'
assert data_arr.attrs['rows_per_scan'] == num_scans
if with_area:
assert 'area' in data_arr.attrs
assert data_arr.attrs['area'] is not None
assert data_arr.attrs['area'].shape == data_arr.shape
else:
assert 'area' not in data_arr.attrs
[docs]
def setup_method(self):
"""Wrap HDF5 file handler with our own fake handler."""
from satpy.readers.viirs_atms_sdr_base import JPSS_SDR_FileHandler
self.reader_configs = config_search_paths(os.path.join('readers', self.yaml_file))
# http://stackoverflow.com/questions/12219967/how-to-mock-a-base-class-with-python-mock-library
self.p = mock.patch.object(JPSS_SDR_FileHandler, '__bases__', (FakeHDF5_ATMS_SDR_FileHandler,))
self.fake_handler = self.p.start()
self.p.is_local = True
[docs]
def teardown_method(self):
"""Stop wrapping the HDF5 file handler."""
self.p.stop()
[docs]
def test_init(self):
"""Test basic init with no extra parameters."""
from satpy.readers import load_reader
r = load_reader(self.reader_configs)
loadables = r.select_files_from_pathnames([
'/path/to/atms/sdr/data/SATMS_j01_d20221220_t0910240_e0921356_b26361_c20221220100456348770_cspp_dev.h5',
])
assert len(loadables) == 1
r.create_filehandlers(loadables)
# make sure we have some files
assert r.file_handlers
[docs]
def test_init_start_end_time(self):
"""Test basic init with start and end times around the start/end times of the provided file."""
r = load_reader(self.reader_configs,
filter_parameters={
'start_time': datetime(2022, 12, 19),
'end_time': datetime(2022, 12, 21)
})
loadables = r.select_files_from_pathnames([
'SATMS_j01_d20221220_t0910240_e0921356_b26361_c20221220100456348770_cspp_dev.h5',
])
assert len(loadables) == 1
r.create_filehandlers(loadables)
# make sure we have some files
assert r.file_handlers
[docs]
@pytest.mark.parametrize("files, expected",
[(['SATMS_j01_d20221220_t0910240_e0921356_b26361_c20221220100456348770_cspp_dev.h5',
'GATMO_j01_d20221220_t0910240_e0921356_b26361_c20221220100456680030_cspp_dev.h5'],
True),
(['SATMS_j01_d20221220_t0910240_e0921356_b26361_c20221220100456348770_cspp_dev.h5', ],
False)]
)
def test_load_all_bands(self, files, expected):
"""Load brightness temperatures for all 22 ATMS channels, with/without geolocation."""
from satpy.readers import load_reader
r = load_reader(self.reader_configs)
loadables = r.select_files_from_pathnames(files)
r.create_filehandlers(loadables)
ds = r.load(ATMS_CHANNEL_NAMES)
assert len(ds) == 22
for d in ds.values():
self._assert_bt_properties(d, with_area=expected)