#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2014-2018 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""GOES HRIT format reader.
References:
LRIT/HRIT Mission Specific Implementation, February 2012
GVARRDL98.pdf
05057_SPE_MSG_LRIT_HRI
"""
import logging
from datetime import datetime, timedelta
import dask.array as da
import numpy as np
import xarray as xr
from satpy.readers._geos_area import get_area_definition, get_area_extent, get_geos_area_naming
from satpy.readers.eum_base import recarray2dict, time_cds_short
from satpy.readers.hrit_base import (
HRITFileHandler,
ancillary_text,
annotation_header,
base_hdr_map,
image_data_function,
)
[docs]
class CalibrationError(Exception):
"""Dummy error-class."""
logger = logging.getLogger('hrit_goes')
# Geometric constants [meters]
EQUATOR_RADIUS = 6378169.00
POLE_RADIUS = 6356583.80
ALTITUDE = 35785831.00
# goes implementation:
key_header = np.dtype([('key_number', 'u1'),
('seed', '>f8')])
segment_identification = np.dtype([('GP_SC_ID', '>i2'),
('spectral_channel_id', '>i1'),
('segment_sequence_number', '>u2'),
('planned_start_segment_number', '>u2'),
('planned_end_segment_number', '>u2'),
('data_field_representation', '>i1')])
image_segment_line_quality = np.dtype([('line_number_in_grid', '>i4'),
('line_mean_acquisition',
[('days', '>u2'),
('milliseconds', '>u4')]),
('line_validity', 'u1'),
('line_radiometric_quality', 'u1'),
('line_geometric_quality', 'u1')])
goms_variable_length_headers = {
image_segment_line_quality: 'image_segment_line_quality'}
goms_text_headers = {image_data_function: 'image_data_function',
annotation_header: 'annotation_header',
ancillary_text: 'ancillary_text'}
goes_hdr_map = base_hdr_map.copy()
goes_hdr_map.update({7: key_header,
128: segment_identification,
129: image_segment_line_quality
})
orbit_coef = np.dtype([('StartTime', time_cds_short),
('EndTime', time_cds_short),
('X', '>f8', (8, )),
('Y', '>f8', (8, )),
('Z', '>f8', (8, )),
('VX', '>f8', (8, )),
('VY', '>f8', (8, )),
('VZ', '>f8', (8, ))])
attitude_coef = np.dtype([('StartTime', time_cds_short),
('EndTime', time_cds_short),
('XofSpinAxis', '>f8', (8, )),
('YofSpinAxis', '>f8', (8, )),
('ZofSpinAxis', '>f8', (8, ))])
cuc_time = np.dtype([('coarse', 'u1', (4, )),
('fine', 'u1', (3, ))])
sgs_time = np.dtype([('century', 'u1'),
('year', 'u1'),
('doy1', 'u1'),
('doy_hours', 'u1'),
('hours_mins', 'u1'),
('mins_secs', 'u1'),
('secs_msecs', 'u1'),
('msecs', 'u1')])
[docs]
def make_sgs_time(sgs_time_array):
"""Make sgs time."""
year = ((sgs_time_array['century'] >> 4) * 1000 +
(sgs_time_array['century'] & 15) * 100 +
(sgs_time_array['year'] >> 4) * 10 +
(sgs_time_array['year'] & 15))
doy = ((sgs_time_array['doy1'] >> 4) * 100 +
(sgs_time_array['doy1'] & 15) * 10 +
(sgs_time_array['doy_hours'] >> 4))
hours = ((sgs_time_array['doy_hours'] & 15) * 10 +
(sgs_time_array['hours_mins'] >> 4))
mins = ((sgs_time_array['hours_mins'] & 15) * 10 +
(sgs_time_array['mins_secs'] >> 4))
secs = ((sgs_time_array['mins_secs'] & 15) * 10 +
(sgs_time_array['secs_msecs'] >> 4))
msecs = ((sgs_time_array['secs_msecs'] & 15) * 100 +
(sgs_time_array['msecs'] >> 4) * 10 +
(sgs_time_array['msecs'] & 15))
return (datetime(int(year), 1, 1) +
timedelta(days=int(doy - 1),
hours=int(hours),
minutes=int(mins),
seconds=int(secs),
milliseconds=int(msecs)))
satellite_status = np.dtype([("TagType", "<u4"),
("TagLength", "<u4"),
("SatelliteID", "<u8"),
("SatelliteName", "S256"),
("NominalLongitude", "<f8"),
("SatelliteCondition", "<u4"),
("TimeOffset", "<f8")])
image_acquisition = np.dtype([("TagType", "<u4"),
("TagLength", "<u4"),
("Status", "<u4"),
("StartDelay", "<i4"),
("Cel", "<f8")])
gvar_float = '>i4'
[docs]
def make_gvar_float(float_val):
"""Make gvar float."""
sign = 1
if float_val < 0:
float_val = -float_val
sign = -1
exp = (float_val >> 24) - 64
mant = float_val & ((1 << 24) - 1)
if mant == 0:
return 0.
res = sign * mant * 2.0**(-24 + exp * 4)
return res
prologue = np.dtype([
# common generic header
("CommonHeaderVersion", "u1"),
("Junk1", "u1", 3),
("NominalSGSProductTime", time_cds_short),
("SGSProductQuality", "u1"),
("SGSProductCompleteness", "u1"),
("SGSProductTimeliness", "u1"),
("SGSProcessingInstanceId", "u1"),
("BaseAlgorithmVersion", "S1", 16),
("ProductAlgorithmVersion", "S1", 16),
# product header
("ImageProductHeaderVersion", "u1"),
("Junk2", "u1", 3),
("ImageProductHeaderLength", ">u4"),
("ImageProductVersion", "u1"),
# first block-0
("SatelliteID", "u1"),
("SPSID", "u1"),
("IScan", "u1", 4),
("IDSub", "u1", 16),
("TCurr", sgs_time),
("TCHED", sgs_time),
("TCTRL", sgs_time),
("TLHED", sgs_time),
("TLTRL", sgs_time),
("TIPFS", sgs_time),
("TINFS", sgs_time),
("TISPC", sgs_time),
("TIECL", sgs_time),
("TIBBC", sgs_time),
("TISTR", sgs_time),
("TLRAN", sgs_time),
("TIIRT", sgs_time),
("TIVIT", sgs_time),
("TCLMT", sgs_time),
("TIONA", sgs_time),
("RelativeScanCount", '>u2'),
("AbsoluteScanCount", '>u2'),
("NorthernmostScanLine", '>u2'),
("WesternmostPixel", '>u2'),
("EasternmostPixel", '>u2'),
("NorthernmostFrameLine", '>u2'),
("SouthernmostFrameLine", '>u2'),
("0Pixel", '>u2'),
("0ScanLine", '>u2'),
("0Scan", '>u2'),
("SubSatScan", '>u2'),
("SubSatPixel", '>u2'),
("SubSatLatitude", gvar_float),
("SubSatLongitude", gvar_float),
("Junk4", "u1", 96), # move to "word" 295
("IMCIdentifier", "S4"),
("Zeros", "u1", 12),
("ReferenceLongitude", gvar_float),
("ReferenceDistance", gvar_float),
("ReferenceLatitude", gvar_float)
])
[docs]
class HRITGOESPrologueFileHandler(HRITFileHandler):
"""GOES HRIT format reader."""
def __init__(self, filename, filename_info, filetype_info):
"""Initialize the reader."""
super(HRITGOESPrologueFileHandler, self).__init__(filename, filename_info,
filetype_info,
(goes_hdr_map,
goms_variable_length_headers,
goms_text_headers))
self.prologue = {}
self.read_prologue()
[docs]
def read_prologue(self):
"""Read the prologue metadata."""
with open(self.filename, "rb") as fp_:
fp_.seek(self.mda['total_header_length'])
data = np.fromfile(fp_, dtype=prologue, count=1)
self.prologue.update(recarray2dict(data))
self.process_prologue()
[docs]
def process_prologue(self):
"""Reprocess prologue to correct types."""
for key in ['TCurr', 'TCHED', 'TCTRL', 'TLHED', 'TLTRL', 'TIPFS',
'TINFS', 'TISPC', 'TIECL', 'TIBBC', 'TISTR', 'TLRAN',
'TIIRT', 'TIVIT', 'TCLMT', 'TIONA']:
try:
self.prologue[key] = make_sgs_time(self.prologue[key])
except ValueError:
self.prologue.pop(key, None)
logger.debug("Invalid data for %s", key)
for key in ['SubSatLatitude', "SubSatLongitude", "ReferenceLongitude",
"ReferenceDistance", "ReferenceLatitude"]:
self.prologue[key] = make_gvar_float(self.prologue[key])
radiometric_processing = np.dtype([("TagType", "<u4"),
("TagLength", "<u4"),
("RPSummary",
[("Impulse", "<u4"),
("IsStrNoiseCorrection", "<u4"),
("IsOptic", "<u4"),
("IsBrightnessAligment", "<u4")]),
("OpticCorrection",
[("Degree", "<i4"),
("A", "<f8", (16, ))]),
("RPQuality",
[("EffDinRange", "<f8"),
("EathDarkening", "<f8"),
("Zone", "<f8"),
("Impulse", "<f8"),
("Group", "<f8"),
("DefectCount", "<u4"),
("DefectProcent", "<f8"),
("S_Noise_DT_Preflight", "<f8"),
("S_Noise_DT_Bort", "<f8"),
("S_Noise_DT_Video", "<f8"),
("S_Noise_DT_1_5", "<f8"),
("CalibrStability", "<f8"),
("TemnSKO", "<f8", (2, )),
("StructSKO", "<f8", (2, )),
("Struct_1_5", "<f8"),
("Zone_1_ 5", "<f8"),
("RadDif", "<f8")])])
geometric_processing = np.dtype([("TagType", "<u4"),
("TagLength", "<u4"),
("TGeomNormInfo",
[("IsExist", "<u4"),
("IsNorm", "<u4"),
("SubLon", "<f8"),
("TypeProjection", "<u4"),
("PixInfo", "<f8", (4, ))]),
("SatInfo",
[("TISO",
[("T0", "<f8"),
("dT", "<f8"),
("ASb", "<f8"),
("Evsk", "<f8", (3, 3, 4)),
("ARx", "<f8", (4, )),
("ARy", "<f8", (4, )),
("ARz", "<f8", (4, )),
("AVx", "<f8", (4, )),
("AVy", "<f8", (4, )),
("AVz", "<f8", (4, ))]),
("Type", "<i4")]),
("TimeProcessing", "<f8"),
("ApriorAccuracy", "<f8"),
("RelativeAccuracy", "<f8", (2, ))])
C1 = 1.19104273e-5
C2 = 1.43877523
SPACECRAFTS = {
# these are GP_SC_ID
18007: "GOES-7",
18008: "GOES-8",
18009: "GOES-9",
18010: "GOES-10",
18011: "GOES-11",
18012: "GOES-12",
18013: "GOES-13",
18014: "GOES-14",
18015: "GOES-15",
# these are in block-0
7: "GOES-7",
8: "GOES-8",
9: "GOES-9",
10: "GOES-10",
11: "GOES-11",
12: "GOES-12",
13: "GOES-13",
14: "GOES-14",
15: "GOES-15"}
SENSOR_NAME = 'goes_imager'
[docs]
class HRITGOESFileHandler(HRITFileHandler):
"""GOES HRIT format reader."""
def __init__(self, filename, filename_info, filetype_info,
prologue):
"""Initialize the reader."""
super(HRITGOESFileHandler, self).__init__(filename, filename_info,
filetype_info,
(goes_hdr_map,
goms_variable_length_headers,
goms_text_headers))
self.prologue = prologue.prologue
self.chid = self.mda['spectral_channel_id']
sublon = self.prologue['SubSatLongitude']
self.mda['projection_parameters']['SSP_longitude'] = sublon
satellite_id = self.prologue['SatelliteID']
self.platform_name = SPACECRAFTS[satellite_id]
[docs]
def get_dataset(self, key, info):
"""Get the data from the files."""
logger.debug("Getting raw data")
res = super(HRITGOESFileHandler, self).get_dataset(key, info)
self.mda['calibration_parameters'] = self._get_calibration_params()
res = self.calibrate(res, key['calibration'])
new_attrs = info.copy()
new_attrs.update(res.attrs)
res.attrs = new_attrs
res.attrs['platform_name'] = self.platform_name
res.attrs['sensor'] = SENSOR_NAME
res.attrs['orbital_parameters'] = {'projection_longitude': self.mda['projection_parameters']['SSP_longitude'],
'projection_latitude': 0.0,
'projection_altitude': ALTITUDE}
return res
[docs]
def _get_calibration_params(self):
"""Get the calibration parameters from the metadata."""
params = {}
idx_table = []
val_table = []
for elt in self.mda['image_data_function'].split(b'\r\n'):
try:
key, val = elt.split(b':=')
try:
idx_table.append(int(key))
val_table.append(float(val))
except ValueError:
params[key] = val
except ValueError:
pass
params['indices'] = np.array(idx_table)
params['values'] = np.array(val_table, dtype=np.float32)
return params
[docs]
def calibrate(self, data, calibration):
"""Calibrate the data."""
logger.debug("Calibration")
tic = datetime.now()
if calibration == 'counts':
return data
if calibration == 'reflectance':
res = self._calibrate(data)
elif calibration == 'brightness_temperature':
res = self._calibrate(data)
else:
raise NotImplementedError("Don't know how to calibrate to " +
str(calibration))
logger.debug("Calibration time " + str(datetime.now() - tic))
return res
[docs]
def _calibrate(self, data):
"""Calibrate *data*."""
idx = self.mda['calibration_parameters']['indices']
val = self.mda['calibration_parameters']['values']
data.data = da.where(data.data == 0, np.nan, data.data)
ddata = data.data.map_blocks(np.interp, idx, val, dtype=val.dtype)
res = xr.DataArray(ddata,
dims=data.dims, attrs=data.attrs,
coords=data.coords)
res = res.clip(min=0)
units = {b'percent': '%', b'degree Kelvin': 'K'}
unit = self.mda['calibration_parameters'][b'_UNIT']
res.attrs['units'] = units.get(unit, unit)
return res
[docs]
def get_area_def(self, dataset_id):
"""Get the area definition of the band."""
proj_dict = self._get_proj_dict(dataset_id)
area_extent = get_area_extent(proj_dict)
area = get_area_definition(proj_dict, area_extent)
self.area = area
return area
[docs]
def _get_proj_dict(self, dataset_id):
loff = np.float32(self.mda['loff'])
nlines = np.int32(self.mda['number_of_lines'])
loff = nlines - loff
name_dict = get_geos_area_naming({
'platform_name': self.platform_name,
'instrument_name': SENSOR_NAME,
# Partial scans are padded to full disk
'service_name': 'FD',
'service_desc': 'Full Disk',
'resolution': dataset_id['resolution']
})
return {
'a': EQUATOR_RADIUS,
'b': POLE_RADIUS,
'ssp_lon': float(self.prologue['SubSatLongitude']),
'h': ALTITUDE,
'proj': 'geos',
'units': 'm',
'a_name': name_dict['area_id'],
'a_desc': name_dict['description'],
'p_id': '',
'nlines': nlines,
'ncols': np.int32(self.mda['number_of_columns']),
'cfac': np.int32(self.mda['cfac']),
'lfac': np.int32(self.mda['lfac']),
'coff': np.float32(self.mda['coff']),
'loff': loff,
'scandir': 'N2S'
}