#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2018 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""The hrit ahi reader tests package."""
import unittest
from unittest import mock
import dask.array as da
import numpy as np
from xarray import DataArray
from satpy.tests.utils import make_dataid
[docs]
class TestHRITJMAFileHandler(unittest.TestCase):
"""Test the HRITJMAFileHandler."""
[docs]
@mock.patch("satpy.readers.hrit_jma.HRITFileHandler.__init__")
def _get_reader(self, mocked_init, mda, filename_info=None, filetype_info=None, reader_kwargs=None):
from satpy.readers.hrit_jma import HRITJMAFileHandler
if not filename_info:
filename_info = {}
if not filetype_info:
filetype_info = {}
if not reader_kwargs:
reader_kwargs = {}
HRITJMAFileHandler.filename = "filename"
HRITJMAFileHandler.mda = mda
HRITJMAFileHandler._start_time = filename_info.get("start_time")
return HRITJMAFileHandler("filename", filename_info, filetype_info, **reader_kwargs)
[docs]
def _get_acq_time(self, nlines):
"""Get sample header entry for scanline acquisition times.
Lines: 1, 21, 41, 61, ..., nlines
Times: 1970-01-01 00:00 + (1, 21, 41, 61, ..., nlines) seconds
So the interpolated times are expected to be 1970-01-01 +
(1, 2, 3, 4, ..., nlines) seconds. Note that there will be some
floating point inaccuracies, because timestamps are stored
with only 6 decimals precision.
"""
mjd_1970 = 40587.0
lines_sparse = np.array(list(range(1, nlines, 20)) + [nlines])
times_sparse = mjd_1970 + lines_sparse / 24 / 3600
acq_time_s = ["LINE:={}\rTIME:={:.6f}\r".format(line, time)
for line, time in zip(lines_sparse, times_sparse)]
acq_time_b = "".join(acq_time_s).encode()
return acq_time_b
[docs]
def _get_mda(self, loff=5500.0, coff=5500.0, nlines=11000, ncols=11000,
segno=0, numseg=1, vis=True, platform="Himawari-8"):
"""Create metadata dict like HRITFileHandler would do it."""
if vis:
idf = b"$HALFTONE:=16\r_NAME:=VISIBLE\r_UNIT:=ALBEDO(%)\r" \
b"0:=-0.10\r1023:=100.00\r65535:=100.00\r"
else:
idf = b"$HALFTONE:=16\r_NAME:=INFRARED\r_UNIT:=KELVIN\r" \
b"0:=329.98\r1023:=130.02\r65535:=130.02\r"
proj_h8 = b"GEOS(140.70) "
proj_mtsat2 = b"GEOS(145.00) "
proj_name = proj_h8 if platform == "Himawari-8" else proj_mtsat2
return {"image_segm_seq_no": segno,
"total_no_image_segm": numseg,
"projection_name": proj_name,
"projection_parameters": {
"a": 6378169.00,
"b": 6356583.80,
"h": 35785831.00,
},
"cfac": 10233128,
"lfac": 10233128,
"coff": coff,
"loff": loff,
"number_of_columns": ncols,
"number_of_lines": nlines,
"image_data_function": idf,
"image_observation_time": self._get_acq_time(nlines)}
[docs]
def test_init(self):
"""Test creating the file handler."""
from satpy.readers.hrit_jma import HIMAWARI8, UNKNOWN_AREA
# Test addition of extra metadata
mda = self._get_mda()
mda_expected = mda.copy()
mda_expected.update(
{"planned_end_segment_number": 1,
"planned_start_segment_number": 1,
"segment_sequence_number": 0,
"unit": "ALBEDO(%)"})
mda_expected["projection_parameters"]["SSP_longitude"] = 140.7
reader = self._get_reader(mda=mda)
assert reader.mda == mda_expected
# Check projection name
assert reader.projection_name == "GEOS(140.70)"
# Check calibration table
cal_expected = np.array([[0, -0.1],
[1023, 100],
[65535, 100]])
assert np.all(reader.calibration_table == cal_expected)
# Check if scanline timestamps are there (dedicated test below)
assert isinstance(reader.acq_time, np.ndarray)
assert reader.acq_time.dtype == np.dtype("datetime64[ns]")
# Check platform
assert reader.platform == HIMAWARI8
# Check is_segmented attribute
expected = {0: False, 1: True, 8: True}
for segno, is_segmented in expected.items():
mda = self._get_mda(segno=segno)
reader = self._get_reader(mda=mda)
assert reader.is_segmented == is_segmented
# Check area IDs
expected = [
({"area": 1}, 1),
({"area": 1234}, UNKNOWN_AREA),
({}, UNKNOWN_AREA)
]
mda = self._get_mda()
for filename_info, area_id in expected:
reader = self._get_reader(mda=mda, filename_info=filename_info)
assert reader.area_id == area_id
[docs]
def test_get_area_def(self):
"""Test getting an AreaDefinition."""
from satpy.readers.hrit_jma import AREA_NAMES, FULL_DISK, NORTH_HEMIS, SOUTH_HEMIS
cases = [
# Non-segmented, full disk
{"loff": 1375.0, "coff": 1375.0,
"nlines": 2750, "ncols": 2750,
"segno": 0, "numseg": 1,
"area": FULL_DISK,
"extent": (-5498000.088960204, -5498000.088960204,
5502000.089024927, 5502000.089024927)},
# Non-segmented, northern hemisphere
{"loff": 1325.0, "coff": 1375.0,
"nlines": 1375, "ncols": 2750,
"segno": 0, "numseg": 1,
"area": NORTH_HEMIS,
"extent": (-5498000.088960204, -198000.00320373234,
5502000.089024927, 5302000.085788833)},
# Non-segmented, southern hemisphere
{"loff": 50, "coff": 1375.0,
"nlines": 1375, "ncols": 2750,
"segno": 0, "numseg": 1,
"area": SOUTH_HEMIS,
"extent": (-5498000.088960204, -5298000.085724112,
5502000.089024927, 202000.0032684542)},
# Segmented, segment #1
{"loff": 1375.0, "coff": 1375.0,
"nlines": 275, "ncols": 2750,
"segno": 1, "numseg": 10,
"area": FULL_DISK,
"extent": (-5498000.088960204, 4402000.071226413,
5502000.089024927, 5502000.089024927)},
# Segmented, segment #7
{"loff": 1375.0, "coff": 1375.0,
"nlines": 275, "ncols": 2750,
"segno": 7, "numseg": 10,
"area": FULL_DISK,
"extent": (-5498000.088960204, -2198000.035564665,
5502000.089024927, -1098000.0177661523)},
]
for case in cases:
mda = self._get_mda(loff=case["loff"], coff=case["coff"],
nlines=case["nlines"], ncols=case["ncols"],
segno=case["segno"], numseg=case["numseg"])
reader = self._get_reader(mda=mda,
filename_info={"area": case["area"]})
area = reader.get_area_def("some_id")
assert area.area_extent == case["extent"]
assert area.description == AREA_NAMES[case["area"]]["long"]
[docs]
def test_calibrate(self):
"""Test calibration."""
# Generate test data
counts = np.linspace(0, 1200, 25).reshape(5, 5)
counts[-1, -1] = 65535
counts = DataArray(da.from_array(counts, chunks=5))
refl = np.array(
[[-0.1, 4.79247312, 9.68494624, 14.57741935, 19.46989247],
[24.36236559, 29.25483871, 34.14731183, 39.03978495, 43.93225806],
[48.82473118, 53.7172043, 58.60967742, 63.50215054, 68.39462366],
[73.28709677, 78.17956989, 83.07204301, 87.96451613, 92.85698925],
[97.74946237, 100., 100., 100., np.nan]]
)
bt = np.array(
[[329.98, 320.20678397, 310.43356794, 300.66035191, 290.88713587],
[281.11391984, 271.34070381, 261.56748778, 251.79427175, 242.02105572],
[232.24783969, 222.47462366, 212.70140762, 202.92819159, 193.15497556],
[183.38175953, 173.6085435, 163.83532747, 154.06211144, 144.28889541],
[134.51567937, 130.02, 130.02, 130.02, np.nan]]
)
# Choose an area near the subsatellite point to avoid masking
# of space pixels
mda = self._get_mda(nlines=5, ncols=5, loff=1375.0, coff=1375.0,
segno=0)
reader = self._get_reader(mda=mda)
# 1. Counts
res = reader.calibrate(data=counts, calibration="counts")
assert np.all(counts.values == res.values)
# 2. Reflectance
res = reader.calibrate(data=counts, calibration="reflectance")
np.testing.assert_allclose(refl, res.values) # also compares NaN
# 3. Brightness temperature
mda_bt = self._get_mda(nlines=5, ncols=5, loff=1375.0, coff=1375.0,
segno=0, vis=False)
reader_bt = self._get_reader(mda=mda_bt)
res = reader_bt.calibrate(data=counts,
calibration="brightness_temperature")
np.testing.assert_allclose(bt, res.values) # also compares NaN
[docs]
def test_mask_space(self):
"""Test masking of space pixels."""
mda = self._get_mda(loff=1375.0, coff=1375.0, nlines=275, ncols=1375,
segno=1, numseg=10)
reader = self._get_reader(mda=mda)
data = DataArray(da.ones((275, 1375), chunks=1024))
masked = reader._mask_space(data)
# First line of the segment should be space, in the middle of the
# last line there should be some valid pixels
np.testing.assert_allclose(masked.values[0, :], np.nan)
assert np.all(masked.values[-1, 588:788] == 1)
[docs]
@mock.patch("satpy.readers.hrit_jma.HRITFileHandler.get_dataset")
def test_get_dataset(self, base_get_dataset):
"""Test getting a dataset."""
from satpy.readers.hrit_jma import HIMAWARI8
mda = self._get_mda(loff=1375.0, coff=1375.0, nlines=275, ncols=1375,
segno=1, numseg=10)
reader = self._get_reader(mda=mda)
key = make_dataid(name="VIS", calibration="reflectance")
base_get_dataset.return_value = DataArray(da.ones((275, 1375),
chunks=1024),
dims=("y", "x"))
# Check attributes
res = reader.get_dataset(key, {"units": "%", "sensor": "ahi"})
assert res.attrs["units"] == "%"
assert res.attrs["sensor"] == "ahi"
assert res.attrs["platform_name"] == HIMAWARI8
assert res.attrs["orbital_parameters"] == {"projection_longitude": 140.7,
"projection_latitude": 0.0,
"projection_altitude": 35785831.0}
# Check if acquisition time is a coordinate
assert "acq_time" in res.coords
# Check called methods
with mock.patch.object(reader, "_mask_space") as mask_space:
with mock.patch.object(reader, "calibrate") as calibrate:
reader.get_dataset(key, {"units": "%", "sensor": "ahi"})
mask_space.assert_called()
calibrate.assert_called()
with mock.patch("logging.Logger.error") as log_mock:
reader.get_dataset(key, {"units": "%", "sensor": "jami"})
log_mock.assert_called()
[docs]
def test_mjd2datetime64(self):
"""Test conversion from modified julian day to datetime64."""
from satpy.readers.hrit_jma import mjd2datetime64
assert mjd2datetime64(np.array([0])) == np.datetime64("1858-11-17", "ns")
assert mjd2datetime64(np.array([40587.5])) == np.datetime64("1970-01-01 12:00", "ns")
[docs]
def test_get_acq_time(self):
"""Test computation of scanline acquisition times."""
dt_line = np.arange(1, 11000+1).astype("timedelta64[s]")
acq_time_exp = np.datetime64("1970-01-01", "ns") + dt_line
for platform in ["Himawari-8", "MTSAT-2"]:
# Results are not exactly identical because timestamps are stored in
# the header with only 6 decimals precision (max diff here: 45 msec).
mda = self._get_mda(platform=platform)
reader = self._get_reader(mda=mda)
np.testing.assert_allclose(reader.acq_time.astype(np.int64),
acq_time_exp.astype(np.int64),
atol=45000000)
[docs]
def test_start_time_from_filename(self):
"""Test that by default the datetime in the filename is returned."""
import datetime as dt
start_time = dt.datetime(2022, 1, 20, 12, 10)
for platform in ["Himawari-8", "MTSAT-2"]:
mda = self._get_mda(platform=platform)
reader = self._get_reader(
mda=mda,
filename_info={"start_time": start_time})
assert reader._start_time == start_time
[docs]
def test_start_time_from_aqc_time(self):
"""Test that by the datetime from the metadata returned when `use_acquisition_time_as_start_time=True`."""
import datetime as dt
start_time = dt.datetime(2022, 1, 20, 12, 10)
for platform in ["Himawari-8", "MTSAT-2"]:
mda = self._get_mda(platform=platform)
reader = self._get_reader(
mda=mda,
filename_info={"start_time": start_time},
reader_kwargs={"use_acquisition_time_as_start_time": True})
assert reader.start_time == reader.acq_time[0].astype(dt.datetime)