#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2012-2021 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Reader for aapp level 1b data.
Options for loading:
- pre_launch_coeffs (False): use pre-launch coefficients if True, operational
otherwise (if available).
https://nwp-saf.eumetsat.int/site/download/documentation/aapp/NWPSAF-MF-UD-003_Formats_v8.0.pdf
"""
import datetime as dt
import functools
import logging
import dask.array as da
import numpy as np
import xarray as xr
from dask import delayed
from satpy.readers.file_handlers import BaseFileHandler
from satpy.utils import get_chunk_size_limit
CHANNEL_DTYPE = np.float32
[docs]
def get_avhrr_lac_chunks(shape, dtype):
"""Get chunks from a given shape adapted for full-resolution AVHRR data."""
limit = get_chunk_size_limit(dtype)
return da.core.normalize_chunks(("auto", 2048), shape=shape, limit=limit, dtype=dtype)
[docs]
def get_aapp_chunks(shape):
"""Get chunks from a given shape adapted for AAPP data."""
return get_avhrr_lac_chunks(shape, dtype=CHANNEL_DTYPE)
logger = logging.getLogger(__name__)
AVHRR_CHANNEL_NAMES = ["1", "2", "3a", "3b", "4", "5"]
AVHRR_ANGLE_NAMES = ["sensor_zenith_angle",
"solar_zenith_angle",
"sun_sensor_azimuth_difference_angle"]
AVHRR_PLATFORM_IDS2NAMES = {4: "NOAA-15",
2: "NOAA-16",
6: "NOAA-17",
7: "NOAA-18",
8: "NOAA-19",
11: "Metop-B",
12: "Metop-A",
13: "Metop-C",
14: "Metop simulator"}
[docs]
def create_xarray(arr):
"""Create an `xarray.DataArray`."""
res = xr.DataArray(arr, dims=["y", "x"])
return res
[docs]
class AAPPL1BaseFileHandler(BaseFileHandler):
"""A base file handler for the AAPP level-1 formats."""
def __init__(self, filename, filename_info, filetype_info):
"""Initialize AAPP level-1 file handler object."""
super().__init__(filename, filename_info, filetype_info)
self.channels = None
self.units = None
self.sensor = "unknown"
self._data = None
self._header = None
self.area = None
self._channel_names = []
self._angle_names = []
[docs]
def _set_filedata_layout(self):
"""Set the file data type/layout."""
self._header_offset = 0
self._scan_type = np.dtype([("siteid", "<i2")])
self._header_type = np.dtype([("siteid", "<i2")])
@property
def start_time(self):
"""Get the time of the first observation."""
return dt.datetime(self._data["scnlinyr"][0], 1, 1) + dt.timedelta(
days=int(self._data["scnlindy"][0]) - 1,
milliseconds=int(self._data["scnlintime"][0]))
@property
def end_time(self):
"""Get the time of the final observation."""
return dt.datetime(self._data["scnlinyr"][-1], 1, 1) + dt.timedelta(
days=int(self._data["scnlindy"][-1]) - 1,
milliseconds=int(self._data["scnlintime"][-1]))
[docs]
def _update_dataset_attributes(self, dataset, key, info):
dataset.attrs.update({"platform_name": self.platform_name,
"sensor": self.sensor})
dataset.attrs.update(key.to_dict())
for meta_key in ("standard_name", "units"):
if meta_key in info:
dataset.attrs.setdefault(meta_key, info[meta_key])
[docs]
def read(self):
"""Read the data."""
tic = dt.datetime.now()
header = np.memmap(self.filename, dtype=self._header_type, mode="r", shape=(1, ))
data = np.memmap(self.filename, dtype=self._scan_type, offset=self._header_offset, mode="r")
logger.debug("Reading time %s", str(dt.datetime.now() - tic))
self._header = header
self._data = data
[docs]
def _calibrate_active_channel_data(self, key):
"""Calibrate active channel data only."""
raise NotImplementedError("This should be implemented in the sub class")
[docs]
def get_dataset(self, key, info):
"""Get a dataset from the file."""
if key["name"] in self._channel_names:
dataset = self._calibrate_active_channel_data(key)
if dataset is None:
return None
elif key["name"] in ["longitude", "latitude"]:
dataset = self.navigate(key["name"])
dataset.attrs = info
elif key["name"] in self._angle_names:
dataset = self.get_angles(key["name"])
else:
raise ValueError("Not a supported dataset: %s", key["name"])
self._update_dataset_attributes(dataset, key, info)
return dataset
[docs]
class AVHRRAAPPL1BFile(AAPPL1BaseFileHandler):
"""Reader for AVHRR L1B files created from the AAPP software."""
def __init__(self, filename, filename_info, filetype_info):
"""Initialize object information by reading the input file."""
super(AVHRRAAPPL1BFile, self).__init__(filename, filename_info,
filetype_info)
self.channels = {i: None for i in AVHRR_CHANNEL_NAMES}
self.units = {i: "counts" for i in AVHRR_CHANNEL_NAMES}
self._is3b = None
self._is3a = None
self._channel_names = AVHRR_CHANNEL_NAMES
self._angle_names = AVHRR_ANGLE_NAMES
self._set_filedata_layout()
self.read()
self.active_channels = self._get_active_channels()
self._get_platform_name(AVHRR_PLATFORM_IDS2NAMES)
self.sensor = "avhrr-3"
self._get_all_interpolated_angles = functools.lru_cache(maxsize=10)(
self._get_all_interpolated_angles_uncached
)
self._get_all_interpolated_coordinates = functools.lru_cache(maxsize=10)(
self._get_all_interpolated_coordinates_uncached
)
[docs]
def _set_filedata_layout(self):
"""Set the file data type/layout."""
self._header_offset = 22016
self._scan_type = _SCANTYPE
self._header_type = _HEADERTYPE
[docs]
def _get_active_channels(self):
status = self._get_channel_binary_status_from_header()
return self._convert_binary_channel_status_to_activation_dict(status)
[docs]
def _calibrate_active_channel_data(self, key):
"""Calibrate active channel data only."""
if self.active_channels[key["name"]]:
return self.calibrate(key)
return None
[docs]
@staticmethod
def _convert_binary_channel_status_to_activation_dict(status):
bits_channels = ((13, "1"),
(12, "2"),
(11, "3a"),
(10, "3b"),
(9, "4"),
(8, "5"))
activated = dict()
for bit, channel_name in bits_channels:
activated[channel_name] = bool(status >> bit & 1)
return activated
[docs]
def available_datasets(self, configured_datasets=None):
"""Get the available datasets."""
for _, mda in configured_datasets:
if mda["name"] in self._channel_names:
yield self.active_channels[mda["name"]], mda
else:
yield True, mda
[docs]
def get_angles(self, angle_id):
"""Get sun-satellite viewing angles."""
sunz, satz, azidiff = self._get_all_interpolated_angles()
name_to_variable = dict(zip(self._angle_names, (satz, sunz, azidiff)))
return create_xarray(name_to_variable[angle_id])
[docs]
def _get_all_interpolated_angles_uncached(self):
sunz40km, satz40km, azidiff40km = self._get_tiepoint_angles_in_degrees()
return self._interpolate_arrays(sunz40km, satz40km, azidiff40km)
[docs]
def _get_tiepoint_angles_in_degrees(self):
angles = self._data["ang"].astype(np.float32)
sunz40km = angles[:, :, 0] * 1e-2
satz40km = angles[:, :, 1] * 1e-2
azidiff40km = angles[:, :, 2] * 1e-2
return sunz40km, satz40km, azidiff40km
[docs]
def _interpolate_arrays(self, *input_arrays, geolocation=False):
lines = input_arrays[0].shape[0]
try:
interpolator = self._create_40km_interpolator(lines, *input_arrays, geolocation=geolocation)
except ImportError:
logger.warning("Could not interpolate, python-geotiepoints missing.")
output_arrays = input_arrays
else:
output_delayed = delayed(interpolator.interpolate, nout=3)()
output_arrays = [da.from_delayed(out_array, (lines, 2048), in_array.dtype)
for in_array, out_array in zip(input_arrays, output_delayed)]
return output_arrays
[docs]
@staticmethod
def _create_40km_interpolator(lines, *arrays_40km, geolocation=False):
if geolocation:
# Slower but accurate at datum line
from geotiepoints.geointerpolator import GeoInterpolator as Interpolator
else:
from geotiepoints.interpolator import Interpolator
cols40km = np.arange(24, 2048, 40)
cols1km = np.arange(2048)
rows40km = np.arange(lines)
rows1km = np.arange(lines)
along_track_order = 1
cross_track_order = 3
satint = Interpolator(
arrays_40km, (rows40km, cols40km),
(rows1km, cols1km), along_track_order, cross_track_order)
return satint
[docs]
def navigate(self, coordinate_id):
"""Get the longitudes and latitudes of the scene."""
lons, lats = self._get_all_interpolated_coordinates()
if coordinate_id == "longitude":
return create_xarray(lons)
if coordinate_id == "latitude":
return create_xarray(lats)
raise KeyError("Coordinate {} unknown.".format(coordinate_id))
[docs]
def _get_all_interpolated_coordinates_uncached(self):
lons40km, lats40km = self._get_coordinates_in_degrees()
return self._interpolate_arrays(lons40km, lats40km, geolocation=True)
[docs]
def _get_coordinates_in_degrees(self):
position_data = self._data["pos"].astype(np.float32)
lons40km = position_data[:, :, 1] * 1e-4
lats40km = position_data[:, :, 0] * 1e-4
return lons40km, lats40km
[docs]
def calibrate(self,
dataset_id,
pre_launch_coeffs=False,
calib_coeffs=None):
"""Calibrate the data."""
if calib_coeffs is None:
calib_coeffs = {}
units = {"reflectance": "%",
"brightness_temperature": "K",
"counts": "",
"radiance": "W*m-2*sr-1*cm ?"}
if dataset_id["name"] in ("3a", "3b") and self._is3b is None:
# Is it 3a or 3b:
line_chunks = get_aapp_chunks((self._data.shape[0], 2048))[0]
self._is3a = da.bitwise_and(da.from_array(self._data["scnlinbit"],
chunks=line_chunks), 3) == 0
self._is3b = da.bitwise_and(da.from_array(self._data["scnlinbit"],
chunks=line_chunks), 3) == 1
try:
vis_idx = ["1", "2", "3a"].index(dataset_id["name"])
ir_idx = None
except ValueError:
vis_idx = None
ir_idx = ["3b", "4", "5"].index(dataset_id["name"])
mask = True
if vis_idx is not None:
coeffs = calib_coeffs.get("ch" + dataset_id["name"])
if dataset_id["name"] == "3a":
mask = self._is3a[:, None]
ds = create_xarray(
_vis_calibrate(self._data,
vis_idx,
dataset_id["calibration"],
pre_launch_coeffs,
coeffs,
mask=mask))
else:
if dataset_id["name"] == "3b":
mask = self._is3b[:, None]
ds = create_xarray(
_ir_calibrate(self._header,
self._data,
ir_idx,
dataset_id["calibration"],
mask=mask))
ds.attrs["units"] = units[dataset_id["calibration"]]
ds.attrs.update(dataset_id._asdict())
return ds
# AAPP 1b header
_HEADERTYPE = np.dtype([("siteid", "S3"),
("blank", "S1"),
("l1bversnb", "<i2"),
("l1bversyr", "<i2"),
("l1bversdy", "<i2"),
("reclg", "<i2"),
("blksz", "<i2"),
("hdrcnt", "<i2"),
("filler0", "S6"),
("dataname", "S42"),
("prblkid", "S8"),
("satid", "<i2"),
("instid", "<i2"),
("datatype", "<i2"),
("tipsrc", "<i2"),
("startdatajd", "<i4"),
("startdatayr", "<i2"),
("startdatady", "<i2"),
("startdatatime", "<i4"),
("enddatajd", "<i4"),
("enddatayr", "<i2"),
("enddatady", "<i2"),
("enddatatime", "<i4"),
("cpidsyr", "<i2"),
("cpidsdy", "<i2"),
("filler1", "S8"),
# data set quality indicators
("inststat1", "<i4"),
("filler2", "S2"),
("statchrecnb", "<i2"),
("inststat2", "<i4"),
("scnlin", "<i2"),
("callocscnlin", "<i2"),
("misscnlin", "<i2"),
("datagaps", "<i2"),
("okdatafr", "<i2"),
("pacsparityerr", "<i2"),
("auxsyncerrsum", "<i2"),
("timeseqerr", "<i2"),
("timeseqerrcode", "<i2"),
("socclockupind", "<i2"),
("locerrind", "<i2"),
("locerrcode", "<i2"),
("pacsstatfield", "<i2"),
("pacsdatasrc", "<i2"),
("filler3", "S4"),
("spare1", "S8"),
("spare2", "S8"),
("filler4", "S10"),
# Calibration
("racalind", "<i2"),
("solarcalyr", "<i2"),
("solarcaldy", "<i2"),
("pcalalgind", "<i2"),
("pcalalgopt", "<i2"),
("scalalgind", "<i2"),
("scalalgopt", "<i2"),
("irttcoef", "<i2", (4, 6)),
("filler5", "<i4", (2, )),
# radiance to temperature conversion
("albcnv", "<i4", (2, 3)),
("radtempcnv", "<i4", (3, 3)),
("filler6", "<i4", (3, )),
# Navigation
("modelid", "S8"),
("nadloctol", "<i2"),
("locbit", "<i2"),
("filler7", "S2"),
("rollerr", "<i2"),
("pitcherr", "<i2"),
("yawerr", "<i2"),
("epoyr", "<i2"),
("epody", "<i2"),
("epotime", "<i4"),
("smaxis", "<i4"),
("eccen", "<i4"),
("incli", "<i4"),
("argper", "<i4"),
("rascnod", "<i4"),
("manom", "<i4"),
("xpos", "<i4"),
("ypos", "<i4"),
("zpos", "<i4"),
("xvel", "<i4"),
("yvel", "<i4"),
("zvel", "<i4"),
("earthsun", "<i4"),
("filler8", "S16"),
# analog telemetry conversion
("pchtemp", "<i2", (5, )),
("reserved1", "<i2"),
("pchtempext", "<i2", (5, )),
("reserved2", "<i2"),
("pchpow", "<i2", (5, )),
("reserved3", "<i2"),
("rdtemp", "<i2", (5, )),
("reserved4", "<i2"),
("bbtemp1", "<i2", (5, )),
("reserved5", "<i2"),
("bbtemp2", "<i2", (5, )),
("reserved6", "<i2"),
("bbtemp3", "<i2", (5, )),
("reserved7", "<i2"),
("bbtemp4", "<i2", (5, )),
("reserved8", "<i2"),
("eleccur", "<i2", (5, )),
("reserved9", "<i2"),
("motorcur", "<i2", (5, )),
("reserved10", "<i2"),
("earthpos", "<i2", (5, )),
("reserved11", "<i2"),
("electemp", "<i2", (5, )),
("reserved12", "<i2"),
("chtemp", "<i2", (5, )),
("reserved13", "<i2"),
("bptemp", "<i2", (5, )),
("reserved14", "<i2"),
("mhtemp", "<i2", (5, )),
("reserved15", "<i2"),
("adcontemp", "<i2", (5, )),
("reserved16", "<i2"),
("d4bvolt", "<i2", (5, )),
("reserved17", "<i2"),
("d5bvolt", "<i2", (5, )),
("reserved18", "<i2"),
("bbtempchn3b", "<i2", (5, )),
("reserved19", "<i2"),
("bbtempchn4", "<i2", (5, )),
("reserved20", "<i2"),
("bbtempchn5", "<i2", (5, )),
("reserved21", "<i2"),
("refvolt", "<i2", (5, )),
("reserved22", "<i2"), ])
# AAPP 1b scanline
_SCANTYPE = np.dtype([("scnlin", "<i2"),
("scnlinyr", "<i2"),
("scnlindy", "<i2"),
("clockdrift", "<i2"),
("scnlintime", "<i4"),
("scnlinbit", "<i2"),
("filler0", "S10"),
("qualind", "<i4"),
("scnlinqual", "<i4"),
("calqual", "<i2", (3, )),
("cbiterr", "<i2"),
("filler1", "S8"),
# Calibration
("calvis", "<i4", (3, 3, 5)),
("calir", "<i4", (3, 2, 3)),
("filler2", "<i4", (3, )),
# Navigation
("navstat", "<i4"),
("attangtime", "<i4"),
("rollang", "<i2"),
("pitchang", "<i2"),
("yawang", "<i2"),
("scalti", "<i2"),
("ang", "<i2", (51, 3)),
("filler3", "<i2", (3, )),
("pos", "<i4", (51, 2)),
("filler4", "<i4", (2, )),
("telem", "<i2", (103, )),
("filler5", "<i2"),
("hrpt", "<i2", (2048, 5)),
("filler6", "<i4", (2, )),
# tip minor frame header
("tipmfhd", "<i2", (7, 5)),
# cpu telemetry
("cputel", "S6", (2, 5)),
("filler7", "<i2", (67, )), ])
[docs]
def _vis_calibrate(data,
chn,
calib_type,
pre_launch_coeffs=False,
calib_coeffs=None,
mask=True):
"""Calibrate visible channel data.
*calib_type* in count, reflectance, radiance.
"""
# Calibration count to albedo, the calibration is performed separately for
# two value ranges.
if calib_type not in ["counts", "radiance", "reflectance"]:
raise ValueError("Calibration " + calib_type + " unknown!")
channel_data = data["hrpt"][:, :, chn]
chunks = get_aapp_chunks(channel_data.shape)
line_chunks = chunks[0]
channel = da.from_array(channel_data, chunks=chunks)
mask &= channel != 0
if calib_type == "counts":
return channel
channel = channel.astype(CHANNEL_DTYPE)
if calib_type == "radiance":
logger.info("Radiances are not yet supported for " +
"the VIS/NIR channels!")
if pre_launch_coeffs:
coeff_idx = 2
else:
# check that coeffs are valid
if np.all(data["calvis"][:, chn, 0, 4] == 0):
logger.info(
"No valid operational coefficients, fall back to pre-launch")
coeff_idx = 2
else:
coeff_idx = 0
intersection = da.from_array(data["calvis"][:, chn, coeff_idx, 4],
chunks=line_chunks)
if calib_coeffs is not None:
logger.info("Updating from external calibration coefficients.")
slope1 = da.from_array(calib_coeffs[0], chunks=line_chunks)
intercept1 = da.from_array(calib_coeffs[1], chunks=line_chunks)
slope2 = da.from_array(calib_coeffs[2], chunks=line_chunks)
intercept2 = da.from_array(calib_coeffs[3], chunks=line_chunks)
else:
calvis = data["calvis"]
slope1 = da.from_array(calvis[:, chn, coeff_idx, 0], chunks=line_chunks).astype(np.float32) * 1e-10
intercept1 = da.from_array(calvis[:, chn, coeff_idx, 1], chunks=line_chunks).astype(np.float32) * 1e-7
slope2 = da.from_array(calvis[:, chn, coeff_idx, 2], chunks=line_chunks).astype(np.float32) * 1e-10
intercept2 = da.from_array(calvis[:, chn, coeff_idx, 3], chunks=line_chunks).astype(np.float32) * 1e-7
# In the level 1b file, the visible coefficients are stored as 4-byte integers. Scaling factors then convert
# them to real numbers which are applied to the measured counts. The coefficient is different depending on
# whether the counts are less than or greater than the high-gain/low-gain transition value (nominally 500).
# The slope for visible channels should always be positive (reflectance increases with count). With the
# pre-launch coefficients the channel 2, 3a slope is always positive but with the operational coefs the stored
# number in the high-reflectance regime overflows the maximum 2147483647, i.e. it is negative when
# interpreted as a signed integer. So you have to modify it. Also chanel 1 is treated the same way in AAPP.
slope2 = da.where(slope2 < 0, slope2 + 0.4294967296, slope2)
channel = da.where(channel <= intersection[:, None],
channel * slope1[:, None] + intercept1[:, None],
channel * slope2[:, None] + intercept2[:, None])
channel = channel.clip(min=0)
return da.where(mask, channel, np.nan)
[docs]
def _ir_calibrate(header, data, irchn, calib_type, mask=True):
"""Calibrate for IR bands.
*calib_type* in brightness_temperature, radiance, count
"""
channel_data = data["hrpt"][:, :, irchn + 2]
chunks = get_aapp_chunks(channel_data.shape)
line_chunks = chunks[0]
count = da.from_array(channel_data, chunks=chunks)
if calib_type == 0:
return count
# Mask unnaturally low values
mask &= count != 0
count = count.astype(CHANNEL_DTYPE)
calir = data["calir"]
k1_ = da.from_array(calir[:, irchn, 0, 0], chunks=line_chunks).astype(np.float32) * 1.0e-9
k2_ = da.from_array(calir[:, irchn, 0, 1], chunks=line_chunks).astype(np.float32) * 1.0e-6
k3_ = da.from_array(calir[:, irchn, 0, 2], chunks=line_chunks).astype(np.float32) * 1.0e-6
# Count to radiance conversion:
rad = k1_[:, None] * count * count + k2_[:, None] * count + k3_[:, None]
# Suspicious lines
mask &= ((k1_ != 0) | (k2_ != 0) | (k3_ != 0))[:, None]
if calib_type == 2:
mask &= rad > 0.0
return da.where(mask, rad, np.nan)
radtempcnv = header["radtempcnv"].astype(np.float32)
# Central wavenumber:
cwnum = radtempcnv[0, irchn, 0]
if irchn == 0:
cwnum = cwnum / 1.0e2
else:
cwnum = cwnum / 1.0e3
bandcor_2 = radtempcnv[0, irchn, 1] / 1e5
bandcor_3 = radtempcnv[0, irchn, 2] / 1e6
ir_const_1 = 1.1910659e-5
ir_const_2 = 1.438833
t_planck = (ir_const_2 * cwnum) / \
np.log(1 + ir_const_1 * cwnum * cwnum * cwnum / rad)
# Band corrections applied to t_planck to get correct
# brightness temperature for channel:
if bandcor_2 < 0: # Post AAPP-v4
tb_ = bandcor_2 + bandcor_3 * t_planck
else: # AAPP 1 to 4
tb_ = (t_planck - bandcor_2) / bandcor_3
# Mask unnaturally low values
return da.where(mask, tb_, np.nan)