#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2017 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""HRIT format reader.
References:
ELECTRO-L GROUND SEGMENT MSU-GS INSTRUMENT,
LRIT/HRIT Mission Specific Implementation, February 2012
"""
import datetime as dt
import logging
import numpy as np
import xarray as xr
from satpy.readers._geos_area import get_area_definition, get_area_extent
from satpy.readers.hrit_base import (
HRITFileHandler,
ancillary_text,
annotation_header,
base_hdr_map,
image_data_function,
time_cds_short,
)
logger = logging.getLogger("hrit_electrol")
# goms implementation:
key_header = np.dtype([("key_number", "u1"),
("seed", ">f8")])
segment_identification = np.dtype([("GP_SC_ID", ">i2"),
("spectral_channel_id", ">i1"),
("segment_sequence_number", ">u2"),
("planned_start_segment_number", ">u2"),
("planned_end_segment_number", ">u2"),
("data_field_representation", ">i1")])
image_segment_line_quality = np.dtype([("line_number_in_grid", ">i4"),
("line_mean_acquisition",
[("days", ">u2"),
("milliseconds", ">u4")]),
("line_validity", "u1"),
("line_radiometric_quality", "u1"),
("line_geometric_quality", "u1")])
goms_variable_length_headers = {
image_segment_line_quality: "image_segment_line_quality"}
goms_text_headers = {image_data_function: "image_data_function",
annotation_header: "annotation_header",
ancillary_text: "ancillary_text"}
goms_hdr_map = base_hdr_map.copy()
goms_hdr_map.update({7: key_header,
128: segment_identification,
129: image_segment_line_quality
})
orbit_coef = np.dtype([("StartTime", time_cds_short),
("EndTime", time_cds_short),
("X", ">f8", (8, )),
("Y", ">f8", (8, )),
("Z", ">f8", (8, )),
("VX", ">f8", (8, )),
("VY", ">f8", (8, )),
("VZ", ">f8", (8, ))])
attitude_coef = np.dtype([("StartTime", time_cds_short),
("EndTime", time_cds_short),
("XofSpinAxis", ">f8", (8, )),
("YofSpinAxis", ">f8", (8, )),
("ZofSpinAxis", ">f8", (8, ))])
cuc_time = np.dtype([("coarse", "u1", (4, )),
("fine", "u1", (3, ))])
time_cds_expanded = np.dtype([("days", ">u2"),
("milliseconds", ">u4"),
("microseconds", ">u2"),
("nanoseconds", ">u2")])
satellite_status = np.dtype([("TagType", "<u4"),
("TagLength", "<u4"),
("SatelliteID", "<u8"),
("SatelliteName", "S256"),
("NominalLongitude", "<f8"),
("SatelliteCondition", "<u4"),
("TimeOffset", "<f8")])
image_acquisition = np.dtype([("TagType", "<u4"),
("TagLength", "<u4"),
("Status", "<u4"),
("StartDelay", "<i4"),
("Cel", "<f8")])
prologue = np.dtype([("SatelliteStatus", satellite_status),
("ImageAcquisition", image_acquisition, (10, )),
("ImageCalibration", "<i4", (10, 1024))])
[docs]
def recarray2dict(arr):
"""Change record array to a dictionary."""
res = {}
for dtuple in arr.dtype.descr:
key = dtuple[0]
ntype = dtuple[1]
data = arr[key]
if isinstance(ntype, list):
res[key] = recarray2dict(data)
else:
res[key] = data
return res
[docs]
class HRITGOMSPrologueFileHandler(HRITFileHandler):
"""GOMS HRIT format reader."""
def __init__(self, filename, filename_info, filetype_info):
"""Initialize the reader."""
super(HRITGOMSPrologueFileHandler, self).__init__(filename, filename_info,
filetype_info,
(goms_hdr_map,
goms_variable_length_headers,
goms_text_headers))
self.prologue = {}
self.read_prologue()
[docs]
def read_prologue(self):
"""Read the prologue metadata."""
with open(self.filename, "rb") as fp_:
fp_.seek(self.mda["total_header_length"])
data = np.fromfile(fp_, dtype=prologue, count=1)[0]
self.prologue.update(recarray2dict(data))
self.process_prologue()
[docs]
def process_prologue(self):
"""Reprocess prologue to correct types."""
radiometric_processing = np.dtype([("TagType", "<u4"),
("TagLength", "<u4"),
("RPSummary",
[("Impulse", "<u4"),
("IsStrNoiseCorrection", "<u4"),
("IsOptic", "<u4"),
("IsBrightnessAligment", "<u4")]),
("OpticCorrection",
[("Degree", "<i4"),
("A", "<f8", (16, ))]),
("RPQuality",
[("EffDinRange", "<f8"),
("EathDarkening", "<f8"),
("Zone", "<f8"),
("Impulse", "<f8"),
("Group", "<f8"),
("DefectCount", "<u4"),
("DefectProcent", "<f8"),
("S_Noise_DT_Preflight", "<f8"),
("S_Noise_DT_Bort", "<f8"),
("S_Noise_DT_Video", "<f8"),
("S_Noise_DT_1_5", "<f8"),
("CalibrStability", "<f8"),
("TemnSKO", "<f8", (2, )),
("StructSKO", "<f8", (2, )),
("Struct_1_5", "<f8"),
("Zone_1_ 5", "<f8"),
("RadDif", "<f8")])])
geometric_processing = np.dtype([("TagType", "<u4"),
("TagLength", "<u4"),
("TGeomNormInfo",
[("IsExist", "<u4"),
("IsNorm", "<u4"),
("SubLon", "<f8"),
("TypeProjection", "<u4"),
("PixInfo", "<f8", (4, ))]),
("SatInfo",
[("TISO",
[("T0", "<f8"),
("dT", "<f8"),
("ASb", "<f8"),
("Evsk", "<f8", (3, 3, 4)),
("ARx", "<f8", (4, )),
("ARy", "<f8", (4, )),
("ARz", "<f8", (4, )),
("AVx", "<f8", (4, )),
("AVy", "<f8", (4, )),
("AVz", "<f8", (4, ))]),
("Type", "<i4")]),
("TimeProcessing", "<f8"),
("ApriorAccuracy", "<f8"),
("RelativeAccuracy", "<f8", (2, ))])
epilogue = np.dtype([("RadiometricProcessing", radiometric_processing, (10, )),
("GeometricProcessing", geometric_processing, (10, ))])
# FIXME: Add rest of the epilogue
[docs]
class HRITGOMSEpilogueFileHandler(HRITFileHandler):
"""GOMS HRIT format reader."""
def __init__(self, filename, filename_info, filetype_info):
"""Initialize the reader."""
super(HRITGOMSEpilogueFileHandler, self).__init__(filename, filename_info,
filetype_info,
(goms_hdr_map,
goms_variable_length_headers,
goms_text_headers))
self.epilogue = {}
self.read_epilogue()
[docs]
def read_epilogue(self):
"""Read the prologue metadata."""
with open(self.filename, "rb") as fp_:
fp_.seek(self.mda["total_header_length"])
data = np.fromfile(fp_, dtype=epilogue, count=1)[0]
self.epilogue.update(recarray2dict(data))
C1 = 1.19104273e-5
C2 = 1.43877523
# Defined in MSG Level 1.5 Image Data Format Description
# https://www-cdn.eumetsat.int/files/2020-05/pdf_ten_05105_msg_img_data.pdf
SPACECRAFTS = {19001: "Electro-L N1",
19002: "Electro-L N2",
19003: "Electro-L N3"}
[docs]
class HRITGOMSFileHandler(HRITFileHandler):
"""GOMS HRIT format reader."""
def __init__(self, filename, filename_info, filetype_info,
prologue, epilogue):
"""Initialize the reader."""
super(HRITGOMSFileHandler, self).__init__(filename, filename_info,
filetype_info,
(goms_hdr_map,
goms_variable_length_headers,
goms_text_headers))
self.prologue = prologue.prologue
self.epilogue = epilogue.epilogue
self.chid = self.mda["spectral_channel_id"]
sublon = self.epilogue["GeometricProcessing"]["TGeomNormInfo"]["SubLon"]
sublon = sublon[self.chid]
self.mda["projection_parameters"]["SSP_longitude"] = np.rad2deg(sublon)
self.mda["orbital_parameters"]["satellite_nominal_longitude"] = np.rad2deg(
self.prologue["SatelliteStatus"]["NominalLongitude"])
satellite_id = self.prologue["SatelliteStatus"]["SatelliteID"]
self.platform_name = SPACECRAFTS[satellite_id]
[docs]
def get_dataset(self, key, info):
"""Get the data from the files."""
res = super(HRITGOMSFileHandler, self).get_dataset(key, info)
res = self.calibrate(res, key["calibration"])
res.attrs["units"] = info["units"]
res.attrs["standard_name"] = info["standard_name"]
res.attrs["wavelength"] = info["wavelength"]
res.attrs["platform_name"] = self.platform_name
res.attrs["sensor"] = "msu-gs"
res.attrs["orbital_parameters"] = {
"satellite_nominal_longitude": self.mda["orbital_parameters"]["satellite_nominal_longitude"],
"satellite_nominal_latitude": 0.,
"projection_longitude": self.mda["projection_parameters"]["SSP_longitude"],
"projection_latitude": 0.,
"projection_altitude": 35785831.00
}
return res
[docs]
def calibrate(self, data, calibration):
"""Calibrate the data."""
tic = dt.datetime.now()
if calibration == "counts":
res = data
elif calibration in ["radiance", "brightness_temperature"]:
res = self._calibrate(data)
else:
raise NotImplementedError("Don't know how to calibrate to " +
str(calibration))
res.attrs["standard_name"] = calibration
res.attrs["calibration"] = calibration
logger.debug("Calibration time " + str(dt.datetime.now() - tic))
return res
[docs]
@staticmethod
def _getitem(block, lut):
return lut[block]
[docs]
def _calibrate(self, data):
"""Visible/IR channel calibration."""
lut = self.prologue["ImageCalibration"][self.chid]
if abs(lut).max() > 16777216:
lut = lut.astype(np.float64)
else:
lut = lut.astype(np.float32)
lut /= 1000
lut[0] = np.nan
# Dask/XArray don't support indexing in 2D (yet).
res = data.data.map_blocks(self._getitem, lut, dtype=lut.dtype)
res = xr.DataArray(res, dims=data.dims,
attrs=data.attrs, coords=data.coords)
res = res.where(data > 0)
return res
[docs]
def get_area_def(self, dsid):
"""Get the area definition of the band."""
pdict = {}
pdict["cfac"] = np.int32(self.mda["cfac"])
pdict["lfac"] = np.int32(self.mda["lfac"])
pdict["coff"] = np.float32(self.mda["coff"])
pdict["loff"] = np.float32(self.mda["loff"])
pdict["a"] = 6378169.00
pdict["b"] = 6356583.80
pdict["h"] = 35785831.00
pdict["scandir"] = "N2S"
pdict["ssp_lon"] = self.mda["projection_parameters"]["SSP_longitude"]
pdict["nlines"] = int(self.mda["number_of_lines"])
pdict["ncols"] = int(self.mda["number_of_columns"])
pdict["loff"] = pdict["nlines"] - pdict["loff"]
pdict["a_name"] = "geosgoms"
pdict["a_desc"] = "Electro-L/GOMS channel area"
pdict["p_id"] = "goms"
area_extent = get_area_extent(pdict)
area = get_area_definition(pdict, area_extent)
self.area = area
return area