#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2009-2021 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Reading and calibrating hrpt avhrr data.
Todo:
- AMSU
- Compare output with AAPP
Reading:
http://www.ncdc.noaa.gov/oa/pod-guide/ncdc/docs/klm/html/c4/sec4-1.htm#t413-1
Calibration:
http://www.ncdc.noaa.gov/oa/pod-guide/ncdc/docs/klm/html/c7/sec7-1.htm
"""
import datetime as dt
import logging
import dask.array as da
import numpy as np
import xarray as xr
from geotiepoints import SatelliteInterpolator
from pyorbital.geoloc import compute_pixels, get_lonlatalt
from pyorbital.geoloc_instrument_definitions import avhrr
from pyorbital.orbital import Orbital
from satpy._compat import cached_property
from satpy.readers.aapp_l1b import get_avhrr_lac_chunks
from satpy.readers.file_handlers import BaseFileHandler
logger = logging.getLogger(__name__)
AVHRR_CHANNEL_NAMES = ("1", "2", "3a", "3b", "4", "5")
dtype = np.dtype([("frame_sync", ">u2", (6, )),
("id", [("id", ">u2"),
("spare", ">u2")]),
("timecode", ">u2", (4, )),
("telemetry", [("ramp_calibration", ">u2", (5, )),
("PRT", ">u2", (3, )),
("ch3_patch_temp", ">u2"),
("spare", ">u2"), ]),
("back_scan", ">u2", (10, 3)),
("space_data", ">u2", (10, 5)),
("sync", ">u2"),
("TIP_data", ">u2", (520, )),
("spare", ">u2", (127, )),
("image_data", ">u2", (2048, 5)),
("aux_sync", ">u2", (100, ))])
[docs]
def time_seconds(tc_array, year):
"""Return the time object from the timecodes."""
tc_array = np.array(tc_array, copy=True)
word = tc_array[:, 0]
day = word >> 1
word = tc_array[:, 1].astype(np.uint64)
msecs = ((127) & word) * 1024
word = tc_array[:, 2]
msecs += word & 1023
msecs *= 1024
word = tc_array[:, 3]
msecs += word & 1023
return (np.datetime64(
str(year) + "-01-01T00:00:00", "s") +
msecs[:].astype("timedelta64[ms]") +
(day - 1)[:].astype("timedelta64[D]"))
[docs]
def bfield(array, bit):
"""Return the bit array."""
return (array & 2**(9 - bit + 1)).astype(bool)
spacecrafts = {7: "NOAA 15", 3: "NOAA 16", 13: "NOAA 18", 15: "NOAA 19"}
[docs]
def geo_interpolate(lons32km, lats32km):
"""Interpolate geo data."""
cols32km = np.arange(0, 2048, 32)
cols1km = np.arange(2048)
lines = lons32km.shape[0]
rows32km = np.arange(lines)
rows1km = np.arange(lines)
along_track_order = 1
cross_track_order = 3
satint = SatelliteInterpolator(
(lons32km, lats32km), (rows32km, cols32km), (rows1km, cols1km),
along_track_order, cross_track_order)
lons, lats = satint.interpolate()
return lons, lats
[docs]
def _get_channel_index(key):
"""Get the avhrr channel index."""
avhrr_channel_index = {"1": 0,
"2": 1,
"3a": 2,
"3b": 2,
"4": 3,
"5": 4}
index = avhrr_channel_index[key["name"]]
return index
[docs]
class HRPTFile(BaseFileHandler):
"""Reader for HRPT Minor Frame, 10 bits data expanded to 16 bits."""
def __init__(self, filename, filename_info, filetype_info):
"""Init the file handler."""
super(HRPTFile, self).__init__(filename, filename_info, filetype_info)
self.channels = {i: None for i in AVHRR_CHANNEL_NAMES}
self.units = {i: "counts" for i in AVHRR_CHANNEL_NAMES}
self.year = filename_info.get("start_time", dt.datetime.utcnow()).year
@cached_property
def times(self):
"""Get the timestamps for each line."""
return time_seconds(self._data["timecode"], self.year)
@cached_property
def _chunks(self):
"""Get the best chunks for this data."""
return get_avhrr_lac_chunks((self._data.shape[0], 2048), float)
@cached_property
def _data(self):
"""Get the data."""
return self.read()
[docs]
def read(self):
"""Read the file."""
with open(self.filename, "rb") as fp_:
data = np.memmap(fp_, dtype=dtype, mode="r")
if np.all(np.median(data["frame_sync"], axis=0) > 1024):
data = self._data.newbyteorder()
return data
@cached_property
def platform_name(self):
"""Get the platform name."""
return spacecrafts[np.median((self._data["id"]["id"] >> 3) & 15)]
[docs]
def get_dataset(self, key, info):
"""Get the dataset."""
attrs = info.copy()
attrs["platform_name"] = self.platform_name
if key["name"] in ["latitude", "longitude"]:
data = self._get_navigation_data(key)
else:
data = self._get_channel_data(key)
result = xr.DataArray(data, dims=["y", "x"], attrs=attrs)
mask = self._get_ch3_mask_or_true(key)
return result.where(mask)
[docs]
def _get_channel_data(self, key):
"""Get channel data."""
data = da.from_array(self._data["image_data"][:, :, _get_channel_index(key)], chunks=self._chunks)
if key["calibration"] != "counts":
if key["name"] in ["1", "2", "3a"]:
data = self.calibrate_solar_channel(data, key)
if key["name"] in ["3b", "4", "5"]:
data = self.calibrate_thermal_channel(data, key)
return data
[docs]
def _get_navigation_data(self, key):
"""Get navigation data."""
lons, lats = self.lons_lats
if key["name"] == "latitude":
data = da.from_array(lats, chunks=self._chunks)
else:
data = da.from_array(lons, chunks=self._chunks)
return data
[docs]
def _get_ch3_mask_or_true(self, key):
mask = True
if key["name"] == "3a":
mask = np.tile(np.logical_not(self._is3b), (2048, 1)).T
elif key["name"] == "3b":
mask = np.tile(self._is3b, (2048, 1)).T
return mask
@cached_property
def _is3b(self):
return bfield(self._data["id"]["id"], 10) == 0
[docs]
def calibrate_thermal_channel(self, data, key):
"""Calibrate a thermal channel."""
from pygac.calibration import calibrate_thermal
line_numbers = (
np.round((self.times - self.times[-1]) /
np.timedelta64(166666667, "ns"))).astype(int)
line_numbers -= line_numbers[0]
prt, ict, space = self.telemetry
index = _get_channel_index(key)
data = calibrate_thermal(data, prt, ict[:, index - 2],
space[:, index], line_numbers,
index + 1, self.calibrator)
return data
[docs]
def calibrate_solar_channel(self, data, key):
"""Calibrate a solar channel."""
from pygac.calibration import calibrate_solar
julian_days = ((np.datetime64(self.start_time)
- np.datetime64(str(self.year) + "-01-01T00:00:00"))
/ np.timedelta64(1, "D"))
data = calibrate_solar(data, _get_channel_index(key), self.year, julian_days,
self.calibrator)
return data
@cached_property
def calibrator(self):
"""Create a calibrator for the data."""
from pygac.calibration import Calibrator
pg_spacecraft = "".join(self.platform_name.split()).lower()
return Calibrator(pg_spacecraft)
@cached_property
def telemetry(self):
"""Get the telemetry."""
# This isn't converted to dask arrays as it does not work with pygac
prt = np.mean(self._data["telemetry"]["PRT"], axis=1)
ict = np.mean(self._data["back_scan"], axis=1)
space = np.mean(self._data["space_data"][:, :], axis=1)
return prt, ict, space
@cached_property
def lons_lats(self):
"""Get the lons and lats."""
scanline_nb = len(self.times)
scan_points = np.arange(0, 2048, 32)
lons, lats = self._get_avhrr_tiepoints(scan_points, scanline_nb)
lons, lats = geo_interpolate(
lons.reshape((scanline_nb, -1)), lats.reshape((scanline_nb, -1)))
return lons, lats
[docs]
def _get_avhrr_tiepoints(self, scan_points, scanline_nb):
sgeom = avhrr(scanline_nb, scan_points, apply_offset=False)
# no attitude error
rpy = [0, 0, 0]
s_times = sgeom.times(self.times[:, np.newaxis])
orb = Orbital(self.platform_name)
pixels_pos = compute_pixels(orb, sgeom, s_times, rpy)
lons, lats, alts = get_lonlatalt(pixels_pos, s_times)
return lons, lats
@property
def start_time(self):
"""Get the start time."""
return time_seconds(self._data["timecode"][0, np.newaxis, :],
self.year).astype(dt.datetime)[0]
@property
def end_time(self):
"""Get the end time."""
return time_seconds(self._data["timecode"][-1, np.newaxis, :],
self.year).astype(dt.datetime)[0]