Source code for satpy.tests.reader_tests.test_ahi_hrit

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2018 Satpy developers
# This file is part of satpy.
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <>.
"""The hrit ahi reader tests package."""

import unittest
from unittest import mock

import dask.array as da
import numpy as np
from xarray import DataArray

from satpy.tests.utils import make_dataid

[docs] class TestHRITJMAFileHandler(unittest.TestCase): """Test the HRITJMAFileHandler."""
[docs] @mock.patch("satpy.readers.hrit_jma.HRITFileHandler.__init__") def _get_reader(self, mocked_init, mda, filename_info=None, filetype_info=None, reader_kwargs=None): from satpy.readers.hrit_jma import HRITJMAFileHandler if not filename_info: filename_info = {} if not filetype_info: filetype_info = {} if not reader_kwargs: reader_kwargs = {} HRITJMAFileHandler.filename = "filename" HRITJMAFileHandler.mda = mda HRITJMAFileHandler._start_time = filename_info.get("start_time") return HRITJMAFileHandler("filename", filename_info, filetype_info, **reader_kwargs)
[docs] def _get_acq_time(self, nlines): """Get sample header entry for scanline acquisition times. Lines: 1, 21, 41, 61, ..., nlines Times: 1970-01-01 00:00 + (1, 21, 41, 61, ..., nlines) seconds So the interpolated times are expected to be 1970-01-01 + (1, 2, 3, 4, ..., nlines) seconds. Note that there will be some floating point inaccuracies, because timestamps are stored with only 6 decimals precision. """ mjd_1970 = 40587.0 lines_sparse = np.array(list(range(1, nlines, 20)) + [nlines]) times_sparse = mjd_1970 + lines_sparse / 24 / 3600 acq_time_s = ["LINE:={}\rTIME:={:.6f}\r".format(line, time) for line, time in zip(lines_sparse, times_sparse)] acq_time_b = "".join(acq_time_s).encode() return acq_time_b
[docs] def _get_mda(self, loff=5500.0, coff=5500.0, nlines=11000, ncols=11000, segno=0, numseg=1, vis=True, platform="Himawari-8"): """Create metadata dict like HRITFileHandler would do it.""" if vis: idf = b"$HALFTONE:=16\r_NAME:=VISIBLE\r_UNIT:=ALBEDO(%)\r" \ b"0:=-0.10\r1023:=100.00\r65535:=100.00\r" else: idf = b"$HALFTONE:=16\r_NAME:=INFRARED\r_UNIT:=KELVIN\r" \ b"0:=329.98\r1023:=130.02\r65535:=130.02\r" proj_h8 = b"GEOS(140.70) " proj_mtsat2 = b"GEOS(145.00) " proj_name = proj_h8 if platform == "Himawari-8" else proj_mtsat2 return {"image_segm_seq_no": segno, "total_no_image_segm": numseg, "projection_name": proj_name, "projection_parameters": { "a": 6378169.00, "b": 6356583.80, "h": 35785831.00, }, "cfac": 10233128, "lfac": 10233128, "coff": coff, "loff": loff, "number_of_columns": ncols, "number_of_lines": nlines, "image_data_function": idf, "image_observation_time": self._get_acq_time(nlines)}
[docs] def test_init(self): """Test creating the file handler.""" from satpy.readers.hrit_jma import HIMAWARI8, UNKNOWN_AREA # Test addition of extra metadata mda = self._get_mda() mda_expected = mda.copy() mda_expected.update( {"planned_end_segment_number": 1, "planned_start_segment_number": 1, "segment_sequence_number": 0, "unit": "ALBEDO(%)"}) mda_expected["projection_parameters"]["SSP_longitude"] = 140.7 reader = self._get_reader(mda=mda) assert reader.mda == mda_expected # Check projection name assert reader.projection_name == "GEOS(140.70)" # Check calibration table cal_expected = np.array([[0, -0.1], [1023, 100], [65535, 100]]) assert np.all(reader.calibration_table == cal_expected) # Check if scanline timestamps are there (dedicated test below) assert isinstance(reader.acq_time, np.ndarray) assert reader.acq_time.dtype == np.dtype("datetime64[ns]") # Check platform assert reader.platform == HIMAWARI8 # Check is_segmented attribute expected = {0: False, 1: True, 8: True} for segno, is_segmented in expected.items(): mda = self._get_mda(segno=segno) reader = self._get_reader(mda=mda) assert reader.is_segmented == is_segmented # Check area IDs expected = [ ({"area": 1}, 1), ({"area": 1234}, UNKNOWN_AREA), ({}, UNKNOWN_AREA) ] mda = self._get_mda() for filename_info, area_id in expected: reader = self._get_reader(mda=mda, filename_info=filename_info) assert reader.area_id == area_id
[docs] @mock.patch("satpy.readers.hrit_jma.HRITJMAFileHandler.__init__") def test_get_platform(self, mocked_init): """Test platform identification.""" from satpy.readers.hrit_jma import PLATFORMS, UNKNOWN_PLATFORM, HRITJMAFileHandler mocked_init.return_value = None reader = HRITJMAFileHandler() for proj_name, platform in PLATFORMS.items(): reader.projection_name = proj_name assert reader._get_platform() == platform with mock.patch("logging.Logger.error") as mocked_log: reader.projection_name = "invalid" assert reader._get_platform() == UNKNOWN_PLATFORM mocked_log.assert_called()
[docs] def test_get_area_def(self): """Test getting an AreaDefinition.""" from satpy.readers.hrit_jma import AREA_NAMES, FULL_DISK, NORTH_HEMIS, SOUTH_HEMIS cases = [ # Non-segmented, full disk {"loff": 1375.0, "coff": 1375.0, "nlines": 2750, "ncols": 2750, "segno": 0, "numseg": 1, "area": FULL_DISK, "extent": (-5498000.088960204, -5498000.088960204, 5502000.089024927, 5502000.089024927)}, # Non-segmented, northern hemisphere {"loff": 1325.0, "coff": 1375.0, "nlines": 1375, "ncols": 2750, "segno": 0, "numseg": 1, "area": NORTH_HEMIS, "extent": (-5498000.088960204, -198000.00320373234, 5502000.089024927, 5302000.085788833)}, # Non-segmented, southern hemisphere {"loff": 50, "coff": 1375.0, "nlines": 1375, "ncols": 2750, "segno": 0, "numseg": 1, "area": SOUTH_HEMIS, "extent": (-5498000.088960204, -5298000.085724112, 5502000.089024927, 202000.0032684542)}, # Segmented, segment #1 {"loff": 1375.0, "coff": 1375.0, "nlines": 275, "ncols": 2750, "segno": 1, "numseg": 10, "area": FULL_DISK, "extent": (-5498000.088960204, 4402000.071226413, 5502000.089024927, 5502000.089024927)}, # Segmented, segment #7 {"loff": 1375.0, "coff": 1375.0, "nlines": 275, "ncols": 2750, "segno": 7, "numseg": 10, "area": FULL_DISK, "extent": (-5498000.088960204, -2198000.035564665, 5502000.089024927, -1098000.0177661523)}, ] for case in cases: mda = self._get_mda(loff=case["loff"], coff=case["coff"], nlines=case["nlines"], ncols=case["ncols"], segno=case["segno"], numseg=case["numseg"]) reader = self._get_reader(mda=mda, filename_info={"area": case["area"]}) area = reader.get_area_def("some_id") assert area.area_extent == case["extent"] assert area.description == AREA_NAMES[case["area"]]["long"]
[docs] def test_calibrate(self): """Test calibration.""" # Generate test data counts = np.linspace(0, 1200, 25).reshape(5, 5) counts[-1, -1] = 65535 counts = DataArray(da.from_array(counts, chunks=5)) refl = np.array( [[-0.1, 4.79247312, 9.68494624, 14.57741935, 19.46989247], [24.36236559, 29.25483871, 34.14731183, 39.03978495, 43.93225806], [48.82473118, 53.7172043, 58.60967742, 63.50215054, 68.39462366], [73.28709677, 78.17956989, 83.07204301, 87.96451613, 92.85698925], [97.74946237, 100., 100., 100., np.nan]] ) bt = np.array( [[329.98, 320.20678397, 310.43356794, 300.66035191, 290.88713587], [281.11391984, 271.34070381, 261.56748778, 251.79427175, 242.02105572], [232.24783969, 222.47462366, 212.70140762, 202.92819159, 193.15497556], [183.38175953, 173.6085435, 163.83532747, 154.06211144, 144.28889541], [134.51567937, 130.02, 130.02, 130.02, np.nan]] ) # Choose an area near the subsatellite point to avoid masking # of space pixels mda = self._get_mda(nlines=5, ncols=5, loff=1375.0, coff=1375.0, segno=0) reader = self._get_reader(mda=mda) # 1. Counts res = reader.calibrate(data=counts, calibration="counts") assert np.all(counts.values == res.values) # 2. Reflectance res = reader.calibrate(data=counts, calibration="reflectance") np.testing.assert_allclose(refl, res.values) # also compares NaN # 3. Brightness temperature mda_bt = self._get_mda(nlines=5, ncols=5, loff=1375.0, coff=1375.0, segno=0, vis=False) reader_bt = self._get_reader(mda=mda_bt) res = reader_bt.calibrate(data=counts, calibration="brightness_temperature") np.testing.assert_allclose(bt, res.values) # also compares NaN
[docs] def test_mask_space(self): """Test masking of space pixels.""" mda = self._get_mda(loff=1375.0, coff=1375.0, nlines=275, ncols=1375, segno=1, numseg=10) reader = self._get_reader(mda=mda) data = DataArray(da.ones((275, 1375), chunks=1024)) masked = reader._mask_space(data) # First line of the segment should be space, in the middle of the # last line there should be some valid pixels np.testing.assert_allclose(masked.values[0, :], np.nan) assert np.all(masked.values[-1, 588:788] == 1)
[docs] @mock.patch("satpy.readers.hrit_jma.HRITFileHandler.get_dataset") def test_get_dataset(self, base_get_dataset): """Test getting a dataset.""" from satpy.readers.hrit_jma import HIMAWARI8 mda = self._get_mda(loff=1375.0, coff=1375.0, nlines=275, ncols=1375, segno=1, numseg=10) reader = self._get_reader(mda=mda) key = make_dataid(name="VIS", calibration="reflectance") base_get_dataset.return_value = DataArray(da.ones((275, 1375), chunks=1024), dims=("y", "x")) # Check attributes res = reader.get_dataset(key, {"units": "%", "sensor": "ahi"}) assert res.attrs["units"] == "%" assert res.attrs["sensor"] == "ahi" assert res.attrs["platform_name"] == HIMAWARI8 assert res.attrs["orbital_parameters"] == {"projection_longitude": 140.7, "projection_latitude": 0.0, "projection_altitude": 35785831.0} # Check if acquisition time is a coordinate assert "acq_time" in res.coords # Check called methods with mock.patch.object(reader, "_mask_space") as mask_space: with mock.patch.object(reader, "calibrate") as calibrate: reader.get_dataset(key, {"units": "%", "sensor": "ahi"}) mask_space.assert_called() calibrate.assert_called() with mock.patch("logging.Logger.error") as log_mock: reader.get_dataset(key, {"units": "%", "sensor": "jami"}) log_mock.assert_called()
[docs] def test_mjd2datetime64(self): """Test conversion from modified julian day to datetime64.""" from satpy.readers.hrit_jma import mjd2datetime64 assert mjd2datetime64(np.array([0])) == np.datetime64("1858-11-17", "ns") assert mjd2datetime64(np.array([40587.5])) == np.datetime64("1970-01-01 12:00", "ns")
[docs] def test_get_acq_time(self): """Test computation of scanline acquisition times.""" dt_line = np.arange(1, 11000+1).astype("timedelta64[s]") acq_time_exp = np.datetime64("1970-01-01", "ns") + dt_line for platform in ["Himawari-8", "MTSAT-2"]: # Results are not exactly identical because timestamps are stored in # the header with only 6 decimals precision (max diff here: 45 msec). mda = self._get_mda(platform=platform) reader = self._get_reader(mda=mda) np.testing.assert_allclose(reader.acq_time.astype(np.int64), acq_time_exp.astype(np.int64), atol=45000000)
[docs] def test_start_time_from_filename(self): """Test that by default the datetime in the filename is returned.""" import datetime as dt start_time = dt.datetime(2022, 1, 20, 12, 10) for platform in ["Himawari-8", "MTSAT-2"]: mda = self._get_mda(platform=platform) reader = self._get_reader( mda=mda, filename_info={"start_time": start_time}) assert reader._start_time == start_time
[docs] def test_start_time_from_aqc_time(self): """Test that by the datetime from the metadata returned when `use_acquisition_time_as_start_time=True`.""" import datetime as dt start_time = dt.datetime(2022, 1, 20, 12, 10) for platform in ["Himawari-8", "MTSAT-2"]: mda = self._get_mda(platform=platform) reader = self._get_reader( mda=mda, filename_info={"start_time": start_time}, reader_kwargs={"use_acquisition_time_as_start_time": True}) assert reader.start_time == reader.acq_time[0].astype(dt.datetime)