Source code for satpy.readers.goes_imager_hrit

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2014-2018 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""GOES HRIT format reader.

References:
      LRIT/HRIT Mission Specific Implementation, February 2012
      GVARRDL98.pdf
      05057_SPE_MSG_LRIT_HRI

"""

import logging
from datetime import datetime, timedelta

import dask.array as da
import numpy as np
import xarray as xr

from satpy._compat import ArrayLike
from satpy.readers._geos_area import get_area_definition, get_area_extent, get_geos_area_naming
from satpy.readers.eum_base import recarray2dict, time_cds_short
from satpy.readers.hrit_base import (
    HRITFileHandler,
    ancillary_text,
    annotation_header,
    base_hdr_map,
    image_data_function,
)


[docs] class CalibrationError(Exception): """Dummy error-class."""
logger = logging.getLogger("hrit_goes") # Geometric constants [meters] EQUATOR_RADIUS = 6378169.00 POLE_RADIUS = 6356583.80 ALTITUDE = 35785831.00 # goes implementation: key_header = np.dtype([("key_number", "u1"), ("seed", ">f8")]) segment_identification = np.dtype([("GP_SC_ID", ">i2"), ("spectral_channel_id", ">i1"), ("segment_sequence_number", ">u2"), ("planned_start_segment_number", ">u2"), ("planned_end_segment_number", ">u2"), ("data_field_representation", ">i1")]) image_segment_line_quality = np.dtype([("line_number_in_grid", ">i4"), ("line_mean_acquisition", [("days", ">u2"), ("milliseconds", ">u4")]), ("line_validity", "u1"), ("line_radiometric_quality", "u1"), ("line_geometric_quality", "u1")]) goms_variable_length_headers = { image_segment_line_quality: "image_segment_line_quality"} goms_text_headers = {image_data_function: "image_data_function", annotation_header: "annotation_header", ancillary_text: "ancillary_text"} goes_hdr_map = base_hdr_map.copy() goes_hdr_map.update({7: key_header, 128: segment_identification, 129: image_segment_line_quality }) orbit_coef = np.dtype([("StartTime", time_cds_short), ("EndTime", time_cds_short), ("X", ">f8", (8, )), ("Y", ">f8", (8, )), ("Z", ">f8", (8, )), ("VX", ">f8", (8, )), ("VY", ">f8", (8, )), ("VZ", ">f8", (8, ))]) attitude_coef = np.dtype([("StartTime", time_cds_short), ("EndTime", time_cds_short), ("XofSpinAxis", ">f8", (8, )), ("YofSpinAxis", ">f8", (8, )), ("ZofSpinAxis", ">f8", (8, ))]) cuc_time = np.dtype([("coarse", "u1", (4, )), ("fine", "u1", (3, ))]) sgs_time = np.dtype([("century", "u1"), ("year", "u1"), ("doy1", "u1"), ("doy_hours", "u1"), ("hours_mins", "u1"), ("mins_secs", "u1"), ("secs_msecs", "u1"), ("msecs", "u1")])
[docs] def make_sgs_time(sgs_time_array: ArrayLike) -> datetime: """Make sgs time.""" epoch_year = _epoch_year_from_sgs_time(sgs_time_array) doy_offset = _epoch_doy_offset_from_sgs_time(sgs_time_array) return epoch_year + doy_offset
[docs] def _epoch_year_from_sgs_time(sgs_time_array: ArrayLike) -> datetime: century = sgs_time_array["century"].astype(np.int64) year = sgs_time_array["year"].astype(np.int64) year = ((century >> 4) * 1000 + (century & 15) * 100 + (year >> 4) * 10 + (year & 15)) return datetime(int(year), 1, 1)
[docs] def _epoch_doy_offset_from_sgs_time(sgs_time_array: ArrayLike) -> timedelta: doy1 = sgs_time_array["doy1"].astype(np.int64) doy_hours = sgs_time_array["doy_hours"].astype(np.int64) hours_mins = sgs_time_array["hours_mins"].astype(np.int64) mins_secs = sgs_time_array["mins_secs"].astype(np.int64) secs_msecs = sgs_time_array["secs_msecs"].astype(np.int64) msecs = sgs_time_array["msecs"].astype(np.int64) doy = ((doy1 >> 4) * 100 + (doy1 & 15) * 10 + (doy_hours >> 4)) hours = ((doy_hours & 15) * 10 + (hours_mins >> 4)) mins = ((hours_mins & 15) * 10 + (mins_secs >> 4)) secs = ((mins_secs & 15) * 10 + (secs_msecs >> 4)) msecs = ((secs_msecs & 15) * 100 + (msecs >> 4) * 10 + (msecs & 15)) return timedelta( days=int(doy - 1), hours=int(hours), minutes=int(mins), seconds=int(secs), milliseconds=int(msecs) )
satellite_status = np.dtype([("TagType", "<u4"), ("TagLength", "<u4"), ("SatelliteID", "<u8"), ("SatelliteName", "S256"), ("NominalLongitude", "<f8"), ("SatelliteCondition", "<u4"), ("TimeOffset", "<f8")]) image_acquisition = np.dtype([("TagType", "<u4"), ("TagLength", "<u4"), ("Status", "<u4"), ("StartDelay", "<i4"), ("Cel", "<f8")]) gvar_float = ">i4"
[docs] def make_gvar_float(float_val): """Make gvar float.""" sign = 1 if float_val < 0: float_val = -float_val sign = -1 exp = (float_val >> 24) - 64 mant = float_val & ((1 << 24) - 1) if mant == 0: return 0. res = sign * mant * 2.0**(-24 + exp * 4) return res
prologue = np.dtype([ # common generic header ("CommonHeaderVersion", "u1"), ("Junk1", "u1", 3), ("NominalSGSProductTime", time_cds_short), ("SGSProductQuality", "u1"), ("SGSProductCompleteness", "u1"), ("SGSProductTimeliness", "u1"), ("SGSProcessingInstanceId", "u1"), ("BaseAlgorithmVersion", "S1", 16), ("ProductAlgorithmVersion", "S1", 16), # product header ("ImageProductHeaderVersion", "u1"), ("Junk2", "u1", 3), ("ImageProductHeaderLength", ">u4"), ("ImageProductVersion", "u1"), # first block-0 ("SatelliteID", "u1"), ("SPSID", "u1"), ("IScan", "u1", 4), ("IDSub", "u1", 16), ("TCurr", sgs_time), ("TCHED", sgs_time), ("TCTRL", sgs_time), ("TLHED", sgs_time), ("TLTRL", sgs_time), ("TIPFS", sgs_time), ("TINFS", sgs_time), ("TISPC", sgs_time), ("TIECL", sgs_time), ("TIBBC", sgs_time), ("TISTR", sgs_time), ("TLRAN", sgs_time), ("TIIRT", sgs_time), ("TIVIT", sgs_time), ("TCLMT", sgs_time), ("TIONA", sgs_time), ("RelativeScanCount", ">u2"), ("AbsoluteScanCount", ">u2"), ("NorthernmostScanLine", ">u2"), ("WesternmostPixel", ">u2"), ("EasternmostPixel", ">u2"), ("NorthernmostFrameLine", ">u2"), ("SouthernmostFrameLine", ">u2"), ("0Pixel", ">u2"), ("0ScanLine", ">u2"), ("0Scan", ">u2"), ("SubSatScan", ">u2"), ("SubSatPixel", ">u2"), ("SubSatLatitude", gvar_float), ("SubSatLongitude", gvar_float), ("Junk4", "u1", 96), # move to "word" 295 ("IMCIdentifier", "S4"), ("Zeros", "u1", 12), ("ReferenceLongitude", gvar_float), ("ReferenceDistance", gvar_float), ("ReferenceLatitude", gvar_float) ])
[docs] class HRITGOESPrologueFileHandler(HRITFileHandler): """GOES HRIT format reader.""" def __init__(self, filename, filename_info, filetype_info): """Initialize the reader.""" super(HRITGOESPrologueFileHandler, self).__init__(filename, filename_info, filetype_info, (goes_hdr_map, goms_variable_length_headers, goms_text_headers)) self.prologue = {} self.read_prologue()
[docs] def read_prologue(self): """Read the prologue metadata.""" with open(self.filename, "rb") as fp_: fp_.seek(self.mda["total_header_length"]) data = np.fromfile(fp_, dtype=prologue, count=1) self.prologue.update(recarray2dict(data)) self.process_prologue()
[docs] def process_prologue(self): """Reprocess prologue to correct types.""" for key in ["TCurr", "TCHED", "TCTRL", "TLHED", "TLTRL", "TIPFS", "TINFS", "TISPC", "TIECL", "TIBBC", "TISTR", "TLRAN", "TIIRT", "TIVIT", "TCLMT", "TIONA"]: try: self.prologue[key] = make_sgs_time(self.prologue[key]) except ValueError: self.prologue.pop(key, None) logger.debug("Invalid data for %s", key) for key in ["SubSatLatitude", "SubSatLongitude", "ReferenceLongitude", "ReferenceDistance", "ReferenceLatitude"]: self.prologue[key] = make_gvar_float(self.prologue[key])
radiometric_processing = np.dtype([("TagType", "<u4"), ("TagLength", "<u4"), ("RPSummary", [("Impulse", "<u4"), ("IsStrNoiseCorrection", "<u4"), ("IsOptic", "<u4"), ("IsBrightnessAligment", "<u4")]), ("OpticCorrection", [("Degree", "<i4"), ("A", "<f8", (16, ))]), ("RPQuality", [("EffDinRange", "<f8"), ("EathDarkening", "<f8"), ("Zone", "<f8"), ("Impulse", "<f8"), ("Group", "<f8"), ("DefectCount", "<u4"), ("DefectProcent", "<f8"), ("S_Noise_DT_Preflight", "<f8"), ("S_Noise_DT_Bort", "<f8"), ("S_Noise_DT_Video", "<f8"), ("S_Noise_DT_1_5", "<f8"), ("CalibrStability", "<f8"), ("TemnSKO", "<f8", (2, )), ("StructSKO", "<f8", (2, )), ("Struct_1_5", "<f8"), ("Zone_1_ 5", "<f8"), ("RadDif", "<f8")])]) geometric_processing = np.dtype([("TagType", "<u4"), ("TagLength", "<u4"), ("TGeomNormInfo", [("IsExist", "<u4"), ("IsNorm", "<u4"), ("SubLon", "<f8"), ("TypeProjection", "<u4"), ("PixInfo", "<f8", (4, ))]), ("SatInfo", [("TISO", [("T0", "<f8"), ("dT", "<f8"), ("ASb", "<f8"), ("Evsk", "<f8", (3, 3, 4)), ("ARx", "<f8", (4, )), ("ARy", "<f8", (4, )), ("ARz", "<f8", (4, )), ("AVx", "<f8", (4, )), ("AVy", "<f8", (4, )), ("AVz", "<f8", (4, ))]), ("Type", "<i4")]), ("TimeProcessing", "<f8"), ("ApriorAccuracy", "<f8"), ("RelativeAccuracy", "<f8", (2, ))]) C1 = 1.19104273e-5 C2 = 1.43877523 SPACECRAFTS = { # these are GP_SC_ID 18007: "GOES-7", 18008: "GOES-8", 18009: "GOES-9", 18010: "GOES-10", 18011: "GOES-11", 18012: "GOES-12", 18013: "GOES-13", 18014: "GOES-14", 18015: "GOES-15", # these are in block-0 7: "GOES-7", 8: "GOES-8", 9: "GOES-9", 10: "GOES-10", 11: "GOES-11", 12: "GOES-12", 13: "GOES-13", 14: "GOES-14", 15: "GOES-15"} SENSOR_NAME = "goes_imager"
[docs] class HRITGOESFileHandler(HRITFileHandler): """GOES HRIT format reader.""" def __init__(self, filename, filename_info, filetype_info, prologue): """Initialize the reader.""" super(HRITGOESFileHandler, self).__init__(filename, filename_info, filetype_info, (goes_hdr_map, goms_variable_length_headers, goms_text_headers)) self.prologue = prologue.prologue self.chid = self.mda["spectral_channel_id"] sublon = self.prologue["SubSatLongitude"] self.mda["projection_parameters"]["SSP_longitude"] = sublon satellite_id = self.prologue["SatelliteID"] self.platform_name = SPACECRAFTS[satellite_id]
[docs] def get_dataset(self, key, info): """Get the data from the files.""" logger.debug("Getting raw data") res = super(HRITGOESFileHandler, self).get_dataset(key, info) self.mda["calibration_parameters"] = self._get_calibration_params() res = self.calibrate(res, key["calibration"]) new_attrs = info.copy() new_attrs.update(res.attrs) res.attrs = new_attrs res.attrs["platform_name"] = self.platform_name res.attrs["sensor"] = SENSOR_NAME res.attrs["orbital_parameters"] = {"projection_longitude": self.mda["projection_parameters"]["SSP_longitude"], "projection_latitude": 0.0, "projection_altitude": ALTITUDE} return res
[docs] def _get_calibration_params(self): """Get the calibration parameters from the metadata.""" params = {} idx_table = [] val_table = [] for elt in self.mda["image_data_function"].split(b"\r\n"): try: key, val = elt.split(b":=") try: idx_table.append(int(key)) val_table.append(float(val)) except ValueError: params[key] = val except ValueError: pass params["indices"] = np.array(idx_table) params["values"] = np.array(val_table, dtype=np.float32) return params
[docs] def calibrate(self, data, calibration): """Calibrate the data.""" logger.debug("Calibration") tic = datetime.now() if calibration == "counts": return data if calibration == "reflectance": res = self._calibrate(data) elif calibration == "brightness_temperature": res = self._calibrate(data) else: raise NotImplementedError("Don't know how to calibrate to " + str(calibration)) logger.debug("Calibration time " + str(datetime.now() - tic)) return res
[docs] def _calibrate(self, data): """Calibrate *data*.""" idx = self.mda["calibration_parameters"]["indices"] val = self.mda["calibration_parameters"]["values"] data.data = da.where(data.data == 0, np.nan, data.data) ddata = data.data.map_blocks(np.interp, idx, val, dtype=val.dtype) res = xr.DataArray(ddata, dims=data.dims, attrs=data.attrs, coords=data.coords) res = res.clip(min=0) units = {b"percent": "%", b"degree Kelvin": "K"} unit = self.mda["calibration_parameters"][b"_UNIT"] res.attrs["units"] = units.get(unit, unit) return res
[docs] def get_area_def(self, dataset_id): """Get the area definition of the band.""" proj_dict = self._get_proj_dict(dataset_id) area_extent = get_area_extent(proj_dict) area = get_area_definition(proj_dict, area_extent) self.area = area return area
[docs] def _get_proj_dict(self, dataset_id): loff = np.float32(self.mda["loff"]) nlines = np.int32(self.mda["number_of_lines"]) loff = nlines - loff name_dict = get_geos_area_naming({ "platform_name": self.platform_name, "instrument_name": SENSOR_NAME, # Partial scans are padded to full disk "service_name": "FD", "service_desc": "Full Disk", "resolution": dataset_id["resolution"] }) return { "a": EQUATOR_RADIUS, "b": POLE_RADIUS, "ssp_lon": float(self.prologue["SubSatLongitude"]), "h": ALTITUDE, "proj": "geos", "units": "m", "a_name": name_dict["area_id"], "a_desc": name_dict["description"], "p_id": "", "nlines": nlines, "ncols": np.int32(self.mda["number_of_columns"]), "cfac": np.int32(self.mda["cfac"]), "lfac": np.int32(self.mda["lfac"]), "coff": np.float32(self.mda["coff"]), "loff": loff, "scandir": "N2S" }