Source code for satpy.readers.hrpt

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2009-2021 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Reading and calibrating hrpt avhrr data.

Todo:
- AMSU
- Compare output with AAPP

Reading:
http://www.ncdc.noaa.gov/oa/pod-guide/ncdc/docs/klm/html/c4/sec4-1.htm#t413-1

Calibration:
http://www.ncdc.noaa.gov/oa/pod-guide/ncdc/docs/klm/html/c7/sec7-1.htm

"""

import logging
from datetime import datetime

import dask.array as da
import numpy as np
import xarray as xr
from geotiepoints import SatelliteInterpolator
from pyorbital.geoloc import compute_pixels, get_lonlatalt
from pyorbital.geoloc_instrument_definitions import avhrr
from pyorbital.orbital import Orbital

from satpy._compat import cached_property
from satpy.readers.aapp_l1b import get_avhrr_lac_chunks
from satpy.readers.file_handlers import BaseFileHandler

logger = logging.getLogger(__name__)

AVHRR_CHANNEL_NAMES = ("1", "2", "3a", "3b", "4", "5")

dtype = np.dtype([("frame_sync", ">u2", (6, )),
                  ("id", [("id", ">u2"),
                          ("spare", ">u2")]),
                  ("timecode", ">u2", (4, )),
                  ("telemetry", [("ramp_calibration", ">u2", (5, )),
                                 ("PRT", ">u2", (3, )),
                                 ("ch3_patch_temp", ">u2"),
                                 ("spare", ">u2"), ]),
                  ("back_scan", ">u2", (10, 3)),
                  ("space_data", ">u2", (10, 5)),
                  ("sync", ">u2"),
                  ("TIP_data", ">u2", (520, )),
                  ("spare", ">u2", (127, )),
                  ("image_data", ">u2", (2048, 5)),
                  ("aux_sync", ">u2", (100, ))])


[docs] def time_seconds(tc_array, year): """Return the time object from the timecodes.""" tc_array = np.array(tc_array, copy=True) word = tc_array[:, 0] day = word >> 1 word = tc_array[:, 1].astype(np.uint64) msecs = ((127) & word) * 1024 word = tc_array[:, 2] msecs += word & 1023 msecs *= 1024 word = tc_array[:, 3] msecs += word & 1023 return (np.datetime64( str(year) + "-01-01T00:00:00", "s") + msecs[:].astype("timedelta64[ms]") + (day - 1)[:].astype("timedelta64[D]"))
[docs] def bfield(array, bit): """Return the bit array.""" return (array & 2**(9 - bit + 1)).astype(bool)
spacecrafts = {7: "NOAA 15", 3: "NOAA 16", 13: "NOAA 18", 15: "NOAA 19"}
[docs] def geo_interpolate(lons32km, lats32km): """Interpolate geo data.""" cols32km = np.arange(0, 2048, 32) cols1km = np.arange(2048) lines = lons32km.shape[0] rows32km = np.arange(lines) rows1km = np.arange(lines) along_track_order = 1 cross_track_order = 3 satint = SatelliteInterpolator( (lons32km, lats32km), (rows32km, cols32km), (rows1km, cols1km), along_track_order, cross_track_order) lons, lats = satint.interpolate() return lons, lats
[docs] def _get_channel_index(key): """Get the avhrr channel index.""" avhrr_channel_index = {"1": 0, "2": 1, "3a": 2, "3b": 2, "4": 3, "5": 4} index = avhrr_channel_index[key["name"]] return index
[docs] class HRPTFile(BaseFileHandler): """Reader for HRPT Minor Frame, 10 bits data expanded to 16 bits.""" def __init__(self, filename, filename_info, filetype_info): """Init the file handler.""" super(HRPTFile, self).__init__(filename, filename_info, filetype_info) self.channels = {i: None for i in AVHRR_CHANNEL_NAMES} self.units = {i: "counts" for i in AVHRR_CHANNEL_NAMES} self.year = filename_info.get("start_time", datetime.utcnow()).year @cached_property def times(self): """Get the timestamps for each line.""" return time_seconds(self._data["timecode"], self.year) @cached_property def _chunks(self): """Get the best chunks for this data.""" return get_avhrr_lac_chunks((self._data.shape[0], 2048), float) @cached_property def _data(self): """Get the data.""" return self.read()
[docs] def read(self): """Read the file.""" with open(self.filename, "rb") as fp_: data = np.memmap(fp_, dtype=dtype, mode="r") if np.all(np.median(data["frame_sync"], axis=0) > 1024): data = self._data.newbyteorder() return data
@cached_property def platform_name(self): """Get the platform name.""" return spacecrafts[np.median((self._data["id"]["id"] >> 3) & 15)]
[docs] def get_dataset(self, key, info): """Get the dataset.""" attrs = info.copy() attrs["platform_name"] = self.platform_name if key["name"] in ["latitude", "longitude"]: data = self._get_navigation_data(key) else: data = self._get_channel_data(key) result = xr.DataArray(data, dims=["y", "x"], attrs=attrs) mask = self._get_ch3_mask_or_true(key) return result.where(mask)
[docs] def _get_channel_data(self, key): """Get channel data.""" data = da.from_array(self._data["image_data"][:, :, _get_channel_index(key)], chunks=self._chunks) if key["calibration"] != "counts": if key["name"] in ["1", "2", "3a"]: data = self.calibrate_solar_channel(data, key) if key["name"] in ["3b", "4", "5"]: data = self.calibrate_thermal_channel(data, key) return data
[docs] def _get_navigation_data(self, key): """Get navigation data.""" lons, lats = self.lons_lats if key["name"] == "latitude": data = da.from_array(lats, chunks=self._chunks) else: data = da.from_array(lons, chunks=self._chunks) return data
[docs] def _get_ch3_mask_or_true(self, key): mask = True if key["name"] == "3a": mask = np.tile(np.logical_not(self._is3b), (2048, 1)).T elif key["name"] == "3b": mask = np.tile(self._is3b, (2048, 1)).T return mask
@cached_property def _is3b(self): return bfield(self._data["id"]["id"], 10) == 0
[docs] def calibrate_thermal_channel(self, data, key): """Calibrate a thermal channel.""" from pygac.calibration import calibrate_thermal line_numbers = ( np.round((self.times - self.times[-1]) / np.timedelta64(166666667, "ns"))).astype(int) line_numbers -= line_numbers[0] prt, ict, space = self.telemetry index = _get_channel_index(key) data = calibrate_thermal(data, prt, ict[:, index - 2], space[:, index], line_numbers, index + 1, self.calibrator) return data
[docs] def calibrate_solar_channel(self, data, key): """Calibrate a solar channel.""" from pygac.calibration import calibrate_solar julian_days = ((np.datetime64(self.start_time) - np.datetime64(str(self.year) + "-01-01T00:00:00")) / np.timedelta64(1, "D")) data = calibrate_solar(data, _get_channel_index(key), self.year, julian_days, self.calibrator) return data
@cached_property def calibrator(self): """Create a calibrator for the data.""" from pygac.calibration import Calibrator pg_spacecraft = "".join(self.platform_name.split()).lower() return Calibrator(pg_spacecraft) @cached_property def telemetry(self): """Get the telemetry.""" # This isn't converted to dask arrays as it does not work with pygac prt = np.mean(self._data["telemetry"]["PRT"], axis=1) ict = np.mean(self._data["back_scan"], axis=1) space = np.mean(self._data["space_data"][:, :], axis=1) return prt, ict, space @cached_property def lons_lats(self): """Get the lons and lats.""" scanline_nb = len(self.times) scan_points = np.arange(0, 2048, 32) lons, lats = self._get_avhrr_tiepoints(scan_points, scanline_nb) lons, lats = geo_interpolate( lons.reshape((scanline_nb, -1)), lats.reshape((scanline_nb, -1))) return lons, lats
[docs] def _get_avhrr_tiepoints(self, scan_points, scanline_nb): sgeom = avhrr(scanline_nb, scan_points, apply_offset=False) # no attitude error rpy = [0, 0, 0] s_times = sgeom.times(self.times[:, np.newaxis]) orb = Orbital(self.platform_name) pixels_pos = compute_pixels(orb, sgeom, s_times, rpy) lons, lats, alts = get_lonlatalt(pixels_pos, s_times) return lons, lats
@property def start_time(self): """Get the start time.""" return time_seconds(self._data["timecode"][0, np.newaxis, :], self.year).astype(datetime)[0] @property def end_time(self): """Get the end time.""" return time_seconds(self._data["timecode"][-1, np.newaxis, :], self.year).astype(datetime)[0]