Source code for satpy.readers.electrol_hrit

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2017 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.

"""HRIT format reader.

References:
    ELECTRO-L GROUND SEGMENT MSU-GS INSTRUMENT,
      LRIT/HRIT Mission Specific Implementation, February 2012

"""

import datetime as dt
import logging

import numpy as np
import xarray as xr

from satpy.readers._geos_area import get_area_definition, get_area_extent
from satpy.readers.hrit_base import (
    HRITFileHandler,
    ancillary_text,
    annotation_header,
    base_hdr_map,
    image_data_function,
    time_cds_short,
)

logger = logging.getLogger("hrit_electrol")


# goms implementation:
key_header = np.dtype([("key_number", "u1"),
                       ("seed", ">f8")])

segment_identification = np.dtype([("GP_SC_ID", ">i2"),
                                   ("spectral_channel_id", ">i1"),
                                   ("segment_sequence_number", ">u2"),
                                   ("planned_start_segment_number", ">u2"),
                                   ("planned_end_segment_number", ">u2"),
                                   ("data_field_representation", ">i1")])

image_segment_line_quality = np.dtype([("line_number_in_grid", ">i4"),
                                       ("line_mean_acquisition",
                                        [("days", ">u2"),
                                         ("milliseconds", ">u4")]),
                                       ("line_validity", "u1"),
                                       ("line_radiometric_quality", "u1"),
                                       ("line_geometric_quality", "u1")])

goms_variable_length_headers = {
    image_segment_line_quality: "image_segment_line_quality"}

goms_text_headers = {image_data_function: "image_data_function",
                     annotation_header: "annotation_header",
                     ancillary_text: "ancillary_text"}

goms_hdr_map = base_hdr_map.copy()
goms_hdr_map.update({7: key_header,
                     128: segment_identification,
                     129: image_segment_line_quality
                     })


orbit_coef = np.dtype([("StartTime", time_cds_short),
                       ("EndTime", time_cds_short),
                       ("X", ">f8", (8, )),
                       ("Y", ">f8", (8, )),
                       ("Z", ">f8", (8, )),
                       ("VX", ">f8", (8, )),
                       ("VY", ">f8", (8, )),
                       ("VZ", ">f8", (8, ))])

attitude_coef = np.dtype([("StartTime", time_cds_short),
                          ("EndTime", time_cds_short),
                          ("XofSpinAxis", ">f8", (8, )),
                          ("YofSpinAxis", ">f8", (8, )),
                          ("ZofSpinAxis", ">f8", (8, ))])

cuc_time = np.dtype([("coarse", "u1", (4, )),
                     ("fine", "u1", (3, ))])

time_cds_expanded = np.dtype([("days", ">u2"),
                              ("milliseconds", ">u4"),
                              ("microseconds", ">u2"),
                              ("nanoseconds", ">u2")])


satellite_status = np.dtype([("TagType", "<u4"),
                             ("TagLength", "<u4"),
                             ("SatelliteID", "<u8"),
                             ("SatelliteName", "S256"),
                             ("NominalLongitude", "<f8"),
                             ("SatelliteCondition", "<u4"),
                             ("TimeOffset", "<f8")])

image_acquisition = np.dtype([("TagType", "<u4"),
                              ("TagLength", "<u4"),
                              ("Status", "<u4"),
                              ("StartDelay", "<i4"),
                              ("Cel", "<f8")])


prologue = np.dtype([("SatelliteStatus", satellite_status),
                     ("ImageAcquisition", image_acquisition, (10, )),
                     ("ImageCalibration", "<i4", (10, 1024))])


[docs] def recarray2dict(arr): """Change record array to a dictionary.""" res = {} for dtuple in arr.dtype.descr: key = dtuple[0] ntype = dtuple[1] data = arr[key] if isinstance(ntype, list): res[key] = recarray2dict(data) else: res[key] = data return res
[docs] class HRITGOMSPrologueFileHandler(HRITFileHandler): """GOMS HRIT format reader.""" def __init__(self, filename, filename_info, filetype_info): """Initialize the reader.""" super(HRITGOMSPrologueFileHandler, self).__init__(filename, filename_info, filetype_info, (goms_hdr_map, goms_variable_length_headers, goms_text_headers)) self.prologue = {} self.read_prologue()
[docs] def read_prologue(self): """Read the prologue metadata.""" with open(self.filename, "rb") as fp_: fp_.seek(self.mda["total_header_length"]) data = np.fromfile(fp_, dtype=prologue, count=1)[0] self.prologue.update(recarray2dict(data)) self.process_prologue()
[docs] def process_prologue(self): """Reprocess prologue to correct types."""
radiometric_processing = np.dtype([("TagType", "<u4"), ("TagLength", "<u4"), ("RPSummary", [("Impulse", "<u4"), ("IsStrNoiseCorrection", "<u4"), ("IsOptic", "<u4"), ("IsBrightnessAligment", "<u4")]), ("OpticCorrection", [("Degree", "<i4"), ("A", "<f8", (16, ))]), ("RPQuality", [("EffDinRange", "<f8"), ("EathDarkening", "<f8"), ("Zone", "<f8"), ("Impulse", "<f8"), ("Group", "<f8"), ("DefectCount", "<u4"), ("DefectProcent", "<f8"), ("S_Noise_DT_Preflight", "<f8"), ("S_Noise_DT_Bort", "<f8"), ("S_Noise_DT_Video", "<f8"), ("S_Noise_DT_1_5", "<f8"), ("CalibrStability", "<f8"), ("TemnSKO", "<f8", (2, )), ("StructSKO", "<f8", (2, )), ("Struct_1_5", "<f8"), ("Zone_1_ 5", "<f8"), ("RadDif", "<f8")])]) geometric_processing = np.dtype([("TagType", "<u4"), ("TagLength", "<u4"), ("TGeomNormInfo", [("IsExist", "<u4"), ("IsNorm", "<u4"), ("SubLon", "<f8"), ("TypeProjection", "<u4"), ("PixInfo", "<f8", (4, ))]), ("SatInfo", [("TISO", [("T0", "<f8"), ("dT", "<f8"), ("ASb", "<f8"), ("Evsk", "<f8", (3, 3, 4)), ("ARx", "<f8", (4, )), ("ARy", "<f8", (4, )), ("ARz", "<f8", (4, )), ("AVx", "<f8", (4, )), ("AVy", "<f8", (4, )), ("AVz", "<f8", (4, ))]), ("Type", "<i4")]), ("TimeProcessing", "<f8"), ("ApriorAccuracy", "<f8"), ("RelativeAccuracy", "<f8", (2, ))]) epilogue = np.dtype([("RadiometricProcessing", radiometric_processing, (10, )), ("GeometricProcessing", geometric_processing, (10, ))]) # FIXME: Add rest of the epilogue
[docs] class HRITGOMSEpilogueFileHandler(HRITFileHandler): """GOMS HRIT format reader.""" def __init__(self, filename, filename_info, filetype_info): """Initialize the reader.""" super(HRITGOMSEpilogueFileHandler, self).__init__(filename, filename_info, filetype_info, (goms_hdr_map, goms_variable_length_headers, goms_text_headers)) self.epilogue = {} self.read_epilogue()
[docs] def read_epilogue(self): """Read the prologue metadata.""" with open(self.filename, "rb") as fp_: fp_.seek(self.mda["total_header_length"]) data = np.fromfile(fp_, dtype=epilogue, count=1)[0] self.epilogue.update(recarray2dict(data))
C1 = 1.19104273e-5 C2 = 1.43877523 # Defined in MSG Level 1.5 Image Data Format Description # https://www-cdn.eumetsat.int/files/2020-05/pdf_ten_05105_msg_img_data.pdf SPACECRAFTS = {19001: "Electro-L N1", 19002: "Electro-L N2", 19003: "Electro-L N3"}
[docs] class HRITGOMSFileHandler(HRITFileHandler): """GOMS HRIT format reader.""" def __init__(self, filename, filename_info, filetype_info, prologue, epilogue): """Initialize the reader.""" super(HRITGOMSFileHandler, self).__init__(filename, filename_info, filetype_info, (goms_hdr_map, goms_variable_length_headers, goms_text_headers)) self.prologue = prologue.prologue self.epilogue = epilogue.epilogue self.chid = self.mda["spectral_channel_id"] sublon = self.epilogue["GeometricProcessing"]["TGeomNormInfo"]["SubLon"] sublon = sublon[self.chid] self.mda["projection_parameters"]["SSP_longitude"] = np.rad2deg(sublon) self.mda["orbital_parameters"]["satellite_nominal_longitude"] = np.rad2deg( self.prologue["SatelliteStatus"]["NominalLongitude"]) satellite_id = self.prologue["SatelliteStatus"]["SatelliteID"] self.platform_name = SPACECRAFTS[satellite_id]
[docs] def get_dataset(self, key, info): """Get the data from the files.""" res = super(HRITGOMSFileHandler, self).get_dataset(key, info) res = self.calibrate(res, key["calibration"]) res.attrs["units"] = info["units"] res.attrs["standard_name"] = info["standard_name"] res.attrs["wavelength"] = info["wavelength"] res.attrs["platform_name"] = self.platform_name res.attrs["sensor"] = "msu-gs" res.attrs["orbital_parameters"] = { "satellite_nominal_longitude": self.mda["orbital_parameters"]["satellite_nominal_longitude"], "satellite_nominal_latitude": 0., "projection_longitude": self.mda["projection_parameters"]["SSP_longitude"], "projection_latitude": 0., "projection_altitude": 35785831.00 } return res
[docs] def calibrate(self, data, calibration): """Calibrate the data.""" tic = dt.datetime.now() if calibration == "counts": res = data elif calibration in ["radiance", "brightness_temperature"]: res = self._calibrate(data) else: raise NotImplementedError("Don't know how to calibrate to " + str(calibration)) res.attrs["standard_name"] = calibration res.attrs["calibration"] = calibration logger.debug("Calibration time " + str(dt.datetime.now() - tic)) return res
[docs] @staticmethod def _getitem(block, lut): return lut[block]
[docs] def _calibrate(self, data): """Visible/IR channel calibration.""" lut = self.prologue["ImageCalibration"][self.chid] if abs(lut).max() > 16777216: lut = lut.astype(np.float64) else: lut = lut.astype(np.float32) lut /= 1000 lut[0] = np.nan # Dask/XArray don't support indexing in 2D (yet). res = data.data.map_blocks(self._getitem, lut, dtype=lut.dtype) res = xr.DataArray(res, dims=data.dims, attrs=data.attrs, coords=data.coords) res = res.where(data > 0) return res
[docs] def get_area_def(self, dsid): """Get the area definition of the band.""" pdict = {} pdict["cfac"] = np.int32(self.mda["cfac"]) pdict["lfac"] = np.int32(self.mda["lfac"]) pdict["coff"] = np.float32(self.mda["coff"]) pdict["loff"] = np.float32(self.mda["loff"]) pdict["a"] = 6378169.00 pdict["b"] = 6356583.80 pdict["h"] = 35785831.00 pdict["scandir"] = "N2S" pdict["ssp_lon"] = self.mda["projection_parameters"]["SSP_longitude"] pdict["nlines"] = int(self.mda["number_of_lines"]) pdict["ncols"] = int(self.mda["number_of_columns"]) pdict["loff"] = pdict["nlines"] - pdict["loff"] pdict["a_name"] = "geosgoms" pdict["a_desc"] = "Electro-L/GOMS channel area" pdict["p_id"] = "goms" area_extent = get_area_extent(pdict) area = get_area_definition(pdict, area_extent) self.area = area return area