#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2018 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""The hrit ahi reader tests package."""
import unittest
from unittest import mock
import dask.array as da
import numpy as np
from xarray import DataArray
from satpy.tests.utils import make_dataid
[docs]
class TestHRITJMAFileHandler(unittest.TestCase):
"""Test the HRITJMAFileHandler."""
[docs]
@mock.patch('satpy.readers.hrit_jma.HRITFileHandler.__init__')
def _get_reader(self, mocked_init, mda, filename_info=None, filetype_info=None, reader_kwargs=None):
from satpy.readers.hrit_jma import HRITJMAFileHandler
if not filename_info:
filename_info = {}
if not filetype_info:
filetype_info = {}
if not reader_kwargs:
reader_kwargs = {}
HRITJMAFileHandler.filename = 'filename'
HRITJMAFileHandler.mda = mda
HRITJMAFileHandler._start_time = filename_info.get('start_time')
return HRITJMAFileHandler('filename', filename_info, filetype_info, **reader_kwargs)
[docs]
def _get_acq_time(self, nlines):
"""Get sample header entry for scanline acquisition times.
Lines: 1, 21, 41, 61, ..., nlines
Times: 1970-01-01 00:00 + (1, 21, 41, 61, ..., nlines) seconds
So the interpolated times are expected to be 1970-01-01 +
(1, 2, 3, 4, ..., nlines) seconds. Note that there will be some
floating point inaccuracies, because timestamps are stored
with only 6 decimals precision.
"""
mjd_1970 = 40587.0
lines_sparse = np.array(list(range(1, nlines, 20)) + [nlines])
times_sparse = mjd_1970 + lines_sparse / 24 / 3600
acq_time_s = ['LINE:={}\rTIME:={:.6f}\r'.format(line, time)
for line, time in zip(lines_sparse, times_sparse)]
acq_time_b = ''.join(acq_time_s).encode()
return acq_time_b
[docs]
def _get_mda(self, loff=5500.0, coff=5500.0, nlines=11000, ncols=11000,
segno=0, numseg=1, vis=True, platform='Himawari-8'):
"""Create metadata dict like HRITFileHandler would do it."""
if vis:
idf = b'$HALFTONE:=16\r_NAME:=VISIBLE\r_UNIT:=ALBEDO(%)\r' \
b'0:=-0.10\r1023:=100.00\r65535:=100.00\r'
else:
idf = b'$HALFTONE:=16\r_NAME:=INFRARED\r_UNIT:=KELVIN\r' \
b'0:=329.98\r1023:=130.02\r65535:=130.02\r'
proj_h8 = b'GEOS(140.70) '
proj_mtsat2 = b'GEOS(145.00) '
proj_name = proj_h8 if platform == 'Himawari-8' else proj_mtsat2
return {'image_segm_seq_no': segno,
'total_no_image_segm': numseg,
'projection_name': proj_name,
'projection_parameters': {
'a': 6378169.00,
'b': 6356583.80,
'h': 35785831.00,
},
'cfac': 10233128,
'lfac': 10233128,
'coff': coff,
'loff': loff,
'number_of_columns': ncols,
'number_of_lines': nlines,
'image_data_function': idf,
'image_observation_time': self._get_acq_time(nlines)}
[docs]
def test_init(self):
"""Test creating the file handler."""
from satpy.readers.hrit_jma import HIMAWARI8, UNKNOWN_AREA
# Test addition of extra metadata
mda = self._get_mda()
mda_expected = mda.copy()
mda_expected.update(
{'planned_end_segment_number': 1,
'planned_start_segment_number': 1,
'segment_sequence_number': 0,
'unit': 'ALBEDO(%)'})
mda_expected['projection_parameters']['SSP_longitude'] = 140.7
reader = self._get_reader(mda=mda)
self.assertEqual(reader.mda, mda_expected)
# Check projection name
self.assertEqual(reader.projection_name, 'GEOS(140.70)')
# Check calibration table
cal_expected = np.array([[0, -0.1],
[1023, 100],
[65535, 100]])
self.assertTrue(np.all(reader.calibration_table == cal_expected))
# Check if scanline timestamps are there (dedicated test below)
self.assertIsInstance(reader.acq_time, np.ndarray)
# Check platform
self.assertEqual(reader.platform, HIMAWARI8)
# Check is_segmented attribute
expected = {0: False, 1: True, 8: True}
for segno, is_segmented in expected.items():
mda = self._get_mda(segno=segno)
reader = self._get_reader(mda=mda)
self.assertEqual(reader.is_segmented, is_segmented)
# Check area IDs
expected = [
({'area': 1}, 1),
({'area': 1234}, UNKNOWN_AREA),
({}, UNKNOWN_AREA)
]
mda = self._get_mda()
for filename_info, area_id in expected:
reader = self._get_reader(mda=mda, filename_info=filename_info)
self.assertEqual(reader.area_id, area_id)
[docs]
def test_get_area_def(self):
"""Test getting an AreaDefinition."""
from satpy.readers.hrit_jma import AREA_NAMES, FULL_DISK, NORTH_HEMIS, SOUTH_HEMIS
cases = [
# Non-segmented, full disk
{'loff': 1375.0, 'coff': 1375.0,
'nlines': 2750, 'ncols': 2750,
'segno': 0, 'numseg': 1,
'area': FULL_DISK,
'extent': (-5498000.088960204, -5498000.088960204,
5502000.089024927, 5502000.089024927)},
# Non-segmented, northern hemisphere
{'loff': 1325.0, 'coff': 1375.0,
'nlines': 1375, 'ncols': 2750,
'segno': 0, 'numseg': 1,
'area': NORTH_HEMIS,
'extent': (-5498000.088960204, -198000.00320373234,
5502000.089024927, 5302000.085788833)},
# Non-segmented, southern hemisphere
{'loff': 50, 'coff': 1375.0,
'nlines': 1375, 'ncols': 2750,
'segno': 0, 'numseg': 1,
'area': SOUTH_HEMIS,
'extent': (-5498000.088960204, -5298000.085724112,
5502000.089024927, 202000.0032684542)},
# Segmented, segment #1
{'loff': 1375.0, 'coff': 1375.0,
'nlines': 275, 'ncols': 2750,
'segno': 1, 'numseg': 10,
'area': FULL_DISK,
'extent': (-5498000.088960204, 4402000.071226413,
5502000.089024927, 5502000.089024927)},
# Segmented, segment #7
{'loff': 1375.0, 'coff': 1375.0,
'nlines': 275, 'ncols': 2750,
'segno': 7, 'numseg': 10,
'area': FULL_DISK,
'extent': (-5498000.088960204, -2198000.035564665,
5502000.089024927, -1098000.0177661523)},
]
for case in cases:
mda = self._get_mda(loff=case['loff'], coff=case['coff'],
nlines=case['nlines'], ncols=case['ncols'],
segno=case['segno'], numseg=case['numseg'])
reader = self._get_reader(mda=mda,
filename_info={'area': case['area']})
area = reader.get_area_def('some_id')
self.assertTupleEqual(area.area_extent, case['extent'])
self.assertEqual(area.description, AREA_NAMES[case['area']]['long'])
[docs]
def test_calibrate(self):
"""Test calibration."""
# Generate test data
counts = np.linspace(0, 1200, 25).reshape(5, 5)
counts[-1, -1] = 65535
counts = DataArray(da.from_array(counts, chunks=5))
refl = np.array(
[[-0.1, 4.79247312, 9.68494624, 14.57741935, 19.46989247],
[24.36236559, 29.25483871, 34.14731183, 39.03978495, 43.93225806],
[48.82473118, 53.7172043, 58.60967742, 63.50215054, 68.39462366],
[73.28709677, 78.17956989, 83.07204301, 87.96451613, 92.85698925],
[97.74946237, 100., 100., 100., np.nan]]
)
bt = np.array(
[[329.98, 320.20678397, 310.43356794, 300.66035191, 290.88713587],
[281.11391984, 271.34070381, 261.56748778, 251.79427175, 242.02105572],
[232.24783969, 222.47462366, 212.70140762, 202.92819159, 193.15497556],
[183.38175953, 173.6085435, 163.83532747, 154.06211144, 144.28889541],
[134.51567937, 130.02, 130.02, 130.02, np.nan]]
)
# Choose an area near the subsatellite point to avoid masking
# of space pixels
mda = self._get_mda(nlines=5, ncols=5, loff=1375.0, coff=1375.0,
segno=0)
reader = self._get_reader(mda=mda)
# 1. Counts
res = reader.calibrate(data=counts, calibration='counts')
self.assertTrue(np.all(counts.values == res.values))
# 2. Reflectance
res = reader.calibrate(data=counts, calibration='reflectance')
np.testing.assert_allclose(refl, res.values) # also compares NaN
# 3. Brightness temperature
mda_bt = self._get_mda(nlines=5, ncols=5, loff=1375.0, coff=1375.0,
segno=0, vis=False)
reader_bt = self._get_reader(mda=mda_bt)
res = reader_bt.calibrate(data=counts,
calibration='brightness_temperature')
np.testing.assert_allclose(bt, res.values) # also compares NaN
[docs]
def test_mask_space(self):
"""Test masking of space pixels."""
mda = self._get_mda(loff=1375.0, coff=1375.0, nlines=275, ncols=1375,
segno=1, numseg=10)
reader = self._get_reader(mda=mda)
data = DataArray(da.ones((275, 1375), chunks=1024))
masked = reader._mask_space(data)
# First line of the segment should be space, in the middle of the
# last line there should be some valid pixels
np.testing.assert_allclose(masked.values[0, :], np.nan)
self.assertTrue(np.all(masked.values[-1, 588:788] == 1))
[docs]
@mock.patch('satpy.readers.hrit_jma.HRITFileHandler.get_dataset')
def test_get_dataset(self, base_get_dataset):
"""Test getting a dataset."""
from satpy.readers.hrit_jma import HIMAWARI8
mda = self._get_mda(loff=1375.0, coff=1375.0, nlines=275, ncols=1375,
segno=1, numseg=10)
reader = self._get_reader(mda=mda)
key = make_dataid(name="VIS", calibration="reflectance")
base_get_dataset.return_value = DataArray(da.ones((275, 1375),
chunks=1024),
dims=('y', 'x'))
# Check attributes
res = reader.get_dataset(key, {'units': '%', 'sensor': 'ahi'})
self.assertEqual(res.attrs['units'], '%')
self.assertEqual(res.attrs['sensor'], 'ahi')
self.assertEqual(res.attrs['platform_name'], HIMAWARI8)
self.assertDictEqual(res.attrs['orbital_parameters'], {'projection_longitude': 140.7,
'projection_latitude': 0.,
'projection_altitude': 35785831.0})
# Check if acquisition time is a coordinate
self.assertIn('acq_time', res.coords)
# Check called methods
with mock.patch.object(reader, '_mask_space') as mask_space:
with mock.patch.object(reader, 'calibrate') as calibrate:
reader.get_dataset(key, {'units': '%', 'sensor': 'ahi'})
mask_space.assert_called()
calibrate.assert_called()
with mock.patch('logging.Logger.error') as log_mock:
reader.get_dataset(key, {'units': '%', 'sensor': 'jami'})
log_mock.assert_called()
[docs]
def test_mjd2datetime64(self):
"""Test conversion from modified julian day to datetime64."""
from satpy.readers.hrit_jma import mjd2datetime64
self.assertEqual(mjd2datetime64(np.array([0])),
np.datetime64('1858-11-17', 'us'))
self.assertEqual(mjd2datetime64(np.array([40587.5])),
np.datetime64('1970-01-01 12:00', 'us'))
[docs]
def test_get_acq_time(self):
"""Test computation of scanline acquisition times."""
dt_line = np.arange(1, 11000+1).astype('timedelta64[s]')
acq_time_exp = np.datetime64('1970-01-01', 'us') + dt_line
for platform in ['Himawari-8', 'MTSAT-2']:
# Results are not exactly identical because timestamps are stored in
# the header with only 6 decimals precision (max diff here: 45 msec).
mda = self._get_mda(platform=platform)
reader = self._get_reader(mda=mda)
np.testing.assert_allclose(reader.acq_time.astype(np.int64),
acq_time_exp.astype(np.int64),
atol=45000)
[docs]
def test_start_time_from_filename(self):
"""Test that by default the datetime in the filename is returned."""
import datetime as dt
start_time = dt.datetime(2022, 1, 20, 12, 10)
for platform in ['Himawari-8', 'MTSAT-2']:
mda = self._get_mda(platform=platform)
reader = self._get_reader(
mda=mda,
filename_info={'start_time': start_time})
assert reader._start_time == start_time
[docs]
def test_start_time_from_aqc_time(self):
"""Test that by the datetime from the metadata returned when `use_acquisition_time_as_start_time=True`."""
import datetime as dt
start_time = dt.datetime(2022, 1, 20, 12, 10)
for platform in ['Himawari-8', 'MTSAT-2']:
mda = self._get_mda(platform=platform)
reader = self._get_reader(
mda=mda,
filename_info={'start_time': start_time},
reader_kwargs={'use_acquisition_time_as_start_time': True})
assert reader.start_time == reader.acq_time[0].astype(dt.datetime)