#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2010-2017 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""HRIT format reader for JMA data.
Introduction
------------
The JMA HRIT format is described in the `JMA HRIT - Mission Specific
Implementation`_. There are three readers for this format in Satpy:
- ``jami_hrit``: For data from the `JAMI` instrument on MTSAT-1R
- ``mtsat2-imager_hrit``: For data from the `Imager` instrument on MTSAT-2
- ``ahi_hrit``: For data from the `AHI` instrument on Himawari-8/9
Although the data format is identical, the instruments have different
characteristics, which is why there is a dedicated reader for each of them.
Sample data is available here:
- `JAMI/Imager sample data`_
- `AHI sample data`_
Example:
--------
Here is an example how to read Himwari-8 HRIT data with Satpy:
.. code-block:: python
from satpy import Scene
import glob
filenames = glob.glob('data/IMG_DK01B14_2018011109*')
scn = Scene(filenames=filenames, reader='ahi_hrit')
scn.load(['B14'])
print(scn['B14'])
Output:
.. code-block:: none
<xarray.DataArray (y: 5500, x: 5500)>
dask.array<concatenate, shape=(5500, 5500), dtype=float64, chunksize=(550, 4096), ...
Coordinates:
acq_time (y) datetime64[ns] 2018-01-11T09:00:20.995200 ... 2018-01-11T09:09:40.348800
crs object +proj=geos +lon_0=140.7 +h=35785831 +x_0=0 +y_0=0 +a=6378169 ...
* y (y) float64 5.5e+06 5.498e+06 5.496e+06 ... -5.496e+06 -5.498e+06
* x (x) float64 -5.498e+06 -5.496e+06 -5.494e+06 ... 5.498e+06 5.5e+06
Attributes:
orbital_parameters: {'projection_longitude': 140.7, 'projection_latitud...
standard_name: toa_brightness_temperature
level: None
wavelength: (11.0, 11.2, 11.4)
units: K
calibration: brightness_temperature
file_type: ['hrit_b14_seg', 'hrit_b14_fd']
modifiers: ()
polarization: None
sensor: ahi
name: B14
platform_name: Himawari-8
resolution: 4000
start_time: 2018-01-11 09:00:20.995200
end_time: 2018-01-11 09:09:40.348800
area: Area ID: FLDK, Description: Full Disk, Projection I...
ancillary_variables: []
JMA HRIT data contain the scanline acquisition time for only a subset of scanlines. Timestamps of
the remaining scanlines are computed using linear interpolation. This is what you'll find in the
``acq_time`` coordinate of the dataset.
Compression
-----------
Gzip-compressed MTSAT files can be decompressed on the fly using
:class:`~satpy.readers.FSFile`:
.. code-block:: python
import fsspec
from satpy import Scene
from satpy.readers import FSFile
filename = "/data/HRIT_MTSAT1_20090101_0630_DK01IR1.gz"
open_file = fsspec.open(filename, compression="gzip")
fs_file = FSFile(open_file)
scn = Scene([fs_file], reader="jami_hrit")
scn.load(["IR1"])
.. _JMA HRIT - Mission Specific Implementation: http://www.jma.go.jp/jma/jma-eng/satellite/introduction/4_2HRIT.pdf
.. _JAMI/Imager sample data: https://www.data.jma.go.jp/mscweb/en/operation/hrit_sample.html
.. _AHI sample data: https://www.data.jma.go.jp/mscweb/en/himawari89/space_segment/sample_hrit.html
"""
import datetime as dt
import logging
import numpy as np
import xarray as xr
import satpy.utils
from satpy.readers._geos_area import get_area_definition, get_area_extent
from satpy.readers.hrit_base import (
HRITFileHandler,
ancillary_text,
annotation_header,
base_hdr_map,
image_data_function,
)
from satpy.readers.utils import get_geostationary_mask
logger = logging.getLogger("hrit_jma")
# JMA implementation:
key_header = np.dtype([("key_number", "u4")])
segment_identification = np.dtype([("image_segm_seq_no", ">u1"),
("total_no_image_segm", ">u1"),
("line_no_image_segm", ">u2")])
encryption_key_message = np.dtype([("station_number", ">u2")])
image_compensation_information = np.dtype([("compensation", "|S1")])
image_observation_time = np.dtype([("times", "|S1")])
image_quality_information = np.dtype([("quality", "|S1")])
jma_variable_length_headers: dict = {}
jma_text_headers = {image_data_function: "image_data_function",
annotation_header: "annotation_header",
ancillary_text: "ancillary_text",
image_compensation_information: "image_compensation_information",
image_observation_time: "image_observation_time",
image_quality_information: "image_quality_information"}
jma_hdr_map = base_hdr_map.copy()
jma_hdr_map.update({7: key_header,
128: segment_identification,
129: encryption_key_message,
130: image_compensation_information,
131: image_observation_time,
132: image_quality_information
})
cuc_time = np.dtype([("coarse", "u1", (4, )),
("fine", "u1", (3, ))])
time_cds_expanded = np.dtype([("days", ">u2"),
("milliseconds", ">u4"),
("microseconds", ">u2"),
("nanoseconds", ">u2")])
FULL_DISK = 1
NORTH_HEMIS = 2
SOUTH_HEMIS = 3
UNKNOWN_AREA = -1
AREA_NAMES = {FULL_DISK: {"short": "FLDK", "long": "Full Disk"},
NORTH_HEMIS: {"short": "NH", "long": "Northern Hemisphere"},
SOUTH_HEMIS: {"short": "SH", "long": "Southern Hemisphere"},
UNKNOWN_AREA: {"short": "UNKNOWN", "long": "Unknown Area"}}
MTSAT1R = "MTSAT-1R"
MTSAT2 = "MTSAT-2"
HIMAWARI8 = "Himawari-8"
UNKNOWN_PLATFORM = "Unknown Platform"
PLATFORMS = {
"GEOS(140.00)": MTSAT1R,
"GEOS(140.25)": MTSAT1R,
"GEOS(140.70)": HIMAWARI8,
"GEOS(145.00)": MTSAT2,
}
SENSORS = {
MTSAT1R: "jami",
MTSAT2: "mtsat2_imager",
HIMAWARI8: "ahi"
}
[docs]
def mjd2datetime64(mjd):
"""Convert Modified Julian Day (MJD) to datetime64."""
epoch = np.datetime64("1858-11-17 00:00")
day2nsec = 24 * 3600 * 1E9
mjd_nsec = (mjd * day2nsec).astype(np.int64).astype("timedelta64[ns]")
return epoch + mjd_nsec
[docs]
class HRITJMAFileHandler(HRITFileHandler):
"""JMA HRIT format reader.
By default, the reader uses the start time parsed from the filename. To use exact time, computed
from the metadata, the user can define a keyword argument::
scene = Scene(filenames=filenames,
reader='ahi_hrit',
reader_kwargs={'use_acquisition_time_as_start_time': True})
As this time is different for every channel, time-dependent calculations like SZA correction
can be pretty slow when multiple channels are used.
The exact scanline times are always available as coordinates of an individual channels::
scene.load(["B03"])
print(scene["B03].coords["acq_time"].data)
would print something similar to::
array(['2021-12-08T06:00:20.131200000', '2021-12-08T06:00:20.191948000',
'2021-12-08T06:00:20.252695000', ...,
'2021-12-08T06:09:39.449390000', '2021-12-08T06:09:39.510295000',
'2021-12-08T06:09:39.571200000'], dtype='datetime64[ns]')
The first value represents the exact start time, and the last one the exact end time of the data
acquisition.
"""
def __init__(self, filename, filename_info, filetype_info, use_acquisition_time_as_start_time=False):
"""Initialize the reader."""
super(HRITJMAFileHandler, self).__init__(filename, filename_info,
filetype_info,
(jma_hdr_map,
jma_variable_length_headers,
jma_text_headers))
self._use_acquisition_time_as_start_time = use_acquisition_time_as_start_time
self.mda["segment_sequence_number"] = self.mda["image_segm_seq_no"]
self.mda["planned_end_segment_number"] = self.mda["total_no_image_segm"]
self.mda["planned_start_segment_number"] = 1
items = self.mda["image_data_function"].decode().split("\r")
if items[0].startswith("$HALFTONE"):
self.calibration_table = []
for item in items[1:]:
if item == "":
continue
key, value = item.split(":=")
if key.startswith("_UNIT"):
self.mda["unit"] = item.split(":=")[1]
elif key.startswith("_NAME"):
pass
elif key.isdigit():
key = int(key)
value = float(value)
self.calibration_table.append((key, value))
self.calibration_table = np.array(self.calibration_table)
self.projection_name = self.mda["projection_name"].decode().strip()
sublon = float(self.projection_name.split("(")[1][:-1])
self.mda["projection_parameters"]["SSP_longitude"] = sublon
self.platform = self._get_platform()
self.is_segmented = self.mda["segment_sequence_number"] > 0
self.area_id = filename_info.get("area", UNKNOWN_AREA)
if self.area_id not in AREA_NAMES:
self.area_id = UNKNOWN_AREA
self.area = self._get_area_def()
self.acq_time = self._get_acq_time()
[docs]
def _get_line_offset(self):
"""Get line offset for the current segment.
Read line offset from the file and adapt it to the current segment
or half disk scan so that
y(l) ~ l - loff
because this is what get_geostationary_area_extent() expects.
"""
# Get line offset from the file
nlines = int(self.mda["number_of_lines"])
loff = np.float32(self.mda["loff"])
# Adapt it to the current segment
if self.is_segmented:
# loff in the file specifies the offset of the full disk image
# centre (1375/2750 for VIS/IR)
segment_number = self.mda["segment_sequence_number"] - 1
loff -= (self.mda["total_no_image_segm"] - segment_number - 1) * nlines
elif self.area_id in (NORTH_HEMIS, SOUTH_HEMIS):
# loff in the file specifies the start line of the half disk image
# in the full disk image
loff = nlines - loff
elif self.area_id == UNKNOWN_AREA:
logger.error("Cannot compute line offset for unknown area")
return loff
[docs]
def _get_area_def(self):
"""Get the area definition of the band."""
pdict = {
"cfac": np.int32(self.mda["cfac"]),
"lfac": np.int32(self.mda["lfac"]),
"coff": np.float32(self.mda["coff"]),
"loff": self._get_line_offset(),
"ncols": int(self.mda["number_of_columns"]),
"nlines": int(self.mda["number_of_lines"]),
"scandir": "N2S",
"a": float(self.mda["projection_parameters"]["a"]),
"b": float(self.mda["projection_parameters"]["b"]),
"h": float(self.mda["projection_parameters"]["h"]),
"ssp_lon": float(self.mda["projection_parameters"]["SSP_longitude"]),
"a_name": AREA_NAMES[self.area_id]["short"],
"a_desc": AREA_NAMES[self.area_id]["long"],
"p_id": "geosmsg"
}
area_extent = get_area_extent(pdict)
return get_area_definition(pdict, area_extent)
[docs]
def get_area_def(self, dsid):
"""Get the area definition of the band."""
return self.area
[docs]
def get_dataset(self, key, info):
"""Get the dataset designated by *key*."""
res = super(HRITJMAFileHandler, self).get_dataset(key, info)
# Filenames of segmented data is identical for MTSAT-1R, MTSAT-2
# and Himawari-8/9. Make sure we have the correct reader for the data
# at hand.
self._check_sensor_platform_consistency(info["sensor"])
# Calibrate and mask space pixels
res = self._mask_space(self.calibrate(res, key["calibration"]))
# Add scanline acquisition time
res.coords["acq_time"] = ("y", self.acq_time)
res.coords["acq_time"].attrs["long_name"] = "Scanline acquisition time"
# Update attributes
res.attrs.update(info)
res.attrs["platform_name"] = self.platform
res.attrs["orbital_parameters"] = {
"projection_longitude": float(self.mda["projection_parameters"]["SSP_longitude"]),
"projection_latitude": 0.,
"projection_altitude": float(self.mda["projection_parameters"]["h"])}
return res
[docs]
def _mask_space(self, data):
"""Mask space pixels."""
geomask = get_geostationary_mask(area=self.area)
return data.where(geomask)
[docs]
def _get_acq_time(self):
r"""Get the acquisition times from the file.
Acquisition times for a subset of scanlines are stored in the header
as follows:
b'LINE:=1\rTIME:=54365.022558\rLINE:=21\rTIME:=54365.022664\r...'
Missing timestamps in between are computed using linear interpolation.
"""
buf_b = np.frombuffer(self.mda["image_observation_time"],
dtype=image_observation_time)
# Replace \r by \n before encoding, otherwise encoding will drop all
# elements except the last one
buf_s = b"".join(buf_b["times"]).replace(b"\r", b"\n").decode()
# Split into key:=value pairs; then extract line number and timestamp
splits = buf_s.strip().split("\n")
lines_sparse = [int(s.split(":=")[1]) for s in splits[0::2]]
times_sparse = [float(s.split(":=")[1]) for s in splits[1::2]]
if self.platform == HIMAWARI8:
# Only a couple of timestamps in the header, and only the first
# and last are usable (duplicates inbetween).
lines_sparse = [lines_sparse[0], lines_sparse[-1]]
times_sparse = [times_sparse[0], times_sparse[-1]]
# Compute missing timestamps using linear interpolation.
lines = np.arange(lines_sparse[0], lines_sparse[-1]+1)
times = np.interp(lines, lines_sparse, times_sparse)
# Convert to np.datetime64
times64 = mjd2datetime64(times)
return times64
[docs]
@staticmethod
def _interp(arr, cal):
return np.interp(arr.ravel(), cal[:, 0], cal[:, 1]).reshape(arr.shape)
[docs]
def calibrate(self, data, calibration):
"""Calibrate the data."""
tic = dt.datetime.now()
if calibration == "counts":
return data
if calibration == "radiance":
raise NotImplementedError("Can't calibrate to radiance.")
cal = self.calibration_table
res = data.data.map_blocks(self._interp, cal, dtype=cal[:, 0].dtype)
res = xr.DataArray(res,
dims=data.dims, attrs=data.attrs,
coords=data.coords)
res = res.where(data < 65535)
logger.debug("Calibration time " + str(dt.datetime.now() - tic))
return res
@property
def start_time(self):
"""Get start time of the scan."""
if self._use_acquisition_time_as_start_time:
return satpy.utils.datetime64_to_pydatetime(self.acq_time[0])
return self._start_time
@property
def end_time(self):
"""Get end time of the scan."""
return satpy.utils.datetime64_to_pydatetime(self.acq_time[-1])