Source code for satpy.readers.aapp_l1b

"""Reader for aapp level 1b data.

Options for loading:

 - pre_launch_coeffs (False): use pre-launch coefficients if True, operational
   otherwise (if available).
import functools
import logging
from datetime import datetime, timedelta

import dask.array as da
import numpy as np
import xarray as xr
from dask import delayed

from satpy.readers.file_handlers import BaseFileHandler
from satpy.utils import get_chunk_size_limit

CHANNEL_DTYPE = np.float64

[docs] def get_avhrr_lac_chunks(shape, dtype): """Get chunks from a given shape adapted for full-resolution AVHRR data.""" limit = get_chunk_size_limit(dtype) return da.core.normalize_chunks(("auto", 2048), shape=shape, limit=limit, dtype=dtype)
[docs] def get_aapp_chunks(shape): """Get chunks from a given shape adapted for AAPP data.""" return get_avhrr_lac_chunks(shape, dtype=CHANNEL_DTYPE)
logger = logging.getLogger(__name__) AVHRR_CHANNEL_NAMES = ["1", "2", "3a", "3b", "4", "5"] AVHRR_ANGLE_NAMES = ["sensor_zenith_angle", "solar_zenith_angle", "sun_sensor_azimuth_difference_angle"] AVHRR_PLATFORM_IDS2NAMES = {4: "NOAA-15", 2: "NOAA-16", 6: "NOAA-17", 7: "NOAA-18", 8: "NOAA-19", 11: "Metop-B", 12: "Metop-A", 13: "Metop-C", 14: "Metop simulator"}
[docs] def create_xarray(arr): """Create an `xarray.DataArray`.""" res = xr.DataArray(arr, dims=["y", "x"]) return res
[docs] class AAPPL1BaseFileHandler(BaseFileHandler): """A base file handler for the AAPP level-1 formats.""" def __init__(self, filename, filename_info, filetype_info): """Initialize AAPP level-1 file handler object.""" super().__init__(filename, filename_info, filetype_info) self.channels = None self.units = None self.sensor = "unknown" self._data = None self._header = None self.area = None self._channel_names = [] self._angle_names = []
[docs] def _set_filedata_layout(self): """Set the file data type/layout.""" self._header_offset = 0 self._scan_type = np.dtype([("siteid", "<i2")]) self._header_type = np.dtype([("siteid", "<i2")])
@property def start_time(self): """Get the time of the first observation.""" return datetime(self._data["scnlinyr"][0], 1, 1) + timedelta( days=int(self._data["scnlindy"][0]) - 1, milliseconds=int(self._data["scnlintime"][0])) @property def end_time(self): """Get the time of the final observation.""" return datetime(self._data["scnlinyr"][-1], 1, 1) + timedelta( days=int(self._data["scnlindy"][-1]) - 1, milliseconds=int(self._data["scnlintime"][-1]))
[docs] def _update_dataset_attributes(self, dataset, key, info): dataset.attrs.update({"platform_name": self.platform_name, "sensor": self.sensor}) dataset.attrs.update(key.to_dict()) for meta_key in ("standard_name", "units"): if meta_key in info: dataset.attrs.setdefault(meta_key, info[meta_key])
[docs] def _get_platform_name(self, platform_names_lookup): """Get the platform name from the file header.""" self.platform_name = platform_names_lookup.get(self._header["satid"][0], None) if self.platform_name is None: raise ValueError("Unsupported platform ID: %d" % self.header["satid"])
[docs] def read(self): """Read the data.""" tic = header = np.memmap(self.filename, dtype=self._header_type, mode="r", shape=(1, )) data = np.memmap(self.filename, dtype=self._scan_type, offset=self._header_offset, mode="r") logger.debug("Reading time %s", str( - tic)) self._header = header self._data = data
[docs] def _calibrate_active_channel_data(self, key): """Calibrate active channel data only.""" raise NotImplementedError("This should be implemented in the sub class")
[docs] def get_dataset(self, key, info): """Get a dataset from the file.""" if key["name"] in self._channel_names: dataset = self._calibrate_active_channel_data(key) if dataset is None: return None elif key["name"] in ["longitude", "latitude"]: dataset = self.navigate(key["name"]) dataset.attrs = info elif key["name"] in self._angle_names: dataset = self.get_angles(key["name"]) else: raise ValueError("Not a supported dataset: %s", key["name"]) self._update_dataset_attributes(dataset, key, info) return dataset
[docs] class AVHRRAAPPL1BFile(AAPPL1BaseFileHandler): """Reader for AVHRR L1B files created from the AAPP software.""" def __init__(self, filename, filename_info, filetype_info): """Initialize object information by reading the input file.""" super(AVHRRAAPPL1BFile, self).__init__(filename, filename_info, filetype_info) self.channels = {i: None for i in AVHRR_CHANNEL_NAMES} self.units = {i: "counts" for i in AVHRR_CHANNEL_NAMES} self._is3b = None self._is3a = None self._channel_names = AVHRR_CHANNEL_NAMES self._angle_names = AVHRR_ANGLE_NAMES self._set_filedata_layout() self.active_channels = self._get_active_channels() self._get_platform_name(AVHRR_PLATFORM_IDS2NAMES) self.sensor = "avhrr-3" self._get_all_interpolated_angles = functools.lru_cache(maxsize=10)( self._get_all_interpolated_angles_uncached ) self._get_all_interpolated_coordinates = functools.lru_cache(maxsize=10)( self._get_all_interpolated_coordinates_uncached )
[docs] def _set_filedata_layout(self): """Set the file data type/layout.""" self._header_offset = 22016 self._scan_type = _SCANTYPE self._header_type = _HEADERTYPE
[docs] def _get_active_channels(self): status = self._get_channel_binary_status_from_header() return self._convert_binary_channel_status_to_activation_dict(status)
[docs] def _calibrate_active_channel_data(self, key): """Calibrate active channel data only.""" if self.active_channels[key["name"]]: return self.calibrate(key) return None
[docs] def _get_channel_binary_status_from_header(self): status = self._header["inststat1"].item() change_line = self._header["statchrecnb"] if change_line > 0: status |= self._header["inststat2"].item() return status
[docs] @staticmethod def _convert_binary_channel_status_to_activation_dict(status): bits_channels = ((13, "1"), (12, "2"), (11, "3a"), (10, "3b"), (9, "4"), (8, "5")) activated = dict() for bit, channel_name in bits_channels: activated[channel_name] = bool(status >> bit & 1) return activated
[docs] def available_datasets(self, configured_datasets=None): """Get the available datasets.""" for _, mda in configured_datasets: if mda["name"] in self._channel_names: yield self.active_channels[mda["name"]], mda else: yield True, mda
[docs] def get_angles(self, angle_id): """Get sun-satellite viewing angles.""" sunz, satz, azidiff = self._get_all_interpolated_angles() name_to_variable = dict(zip(self._angle_names, (satz, sunz, azidiff))) return create_xarray(name_to_variable[angle_id])
[docs] def _get_all_interpolated_angles_uncached(self): sunz40km, satz40km, azidiff40km = self._get_tiepoint_angles_in_degrees() return self._interpolate_arrays(sunz40km, satz40km, azidiff40km)
[docs] def _get_tiepoint_angles_in_degrees(self): sunz40km = self._data["ang"][:, :, 0] * 1e-2 satz40km = self._data["ang"][:, :, 1] * 1e-2 azidiff40km = self._data["ang"][:, :, 2] * 1e-2 return sunz40km, satz40km, azidiff40km
[docs] def _interpolate_arrays(self, *input_arrays, geolocation=False): lines = input_arrays[0].shape[0] try: interpolator = self._create_40km_interpolator(lines, *input_arrays, geolocation=geolocation) except ImportError: logger.warning("Could not interpolate, python-geotiepoints missing.") output_arrays = input_arrays else: output_delayed = delayed(interpolator.interpolate, nout=3)() output_arrays = [da.from_delayed(out_array, (lines, 2048), in_array.dtype) for in_array, out_array in zip(input_arrays, output_delayed)] return output_arrays
[docs] @staticmethod def _create_40km_interpolator(lines, *arrays_40km, geolocation=False): if geolocation: # Slower but accurate at datum line from geotiepoints.geointerpolator import GeoInterpolator as Interpolator else: from geotiepoints.interpolator import Interpolator cols40km = np.arange(24, 2048, 40) cols1km = np.arange(2048) rows40km = np.arange(lines) rows1km = np.arange(lines) along_track_order = 1 cross_track_order = 3 satint = Interpolator( arrays_40km, (rows40km, cols40km), (rows1km, cols1km), along_track_order, cross_track_order) return satint
[docs] def navigate(self, coordinate_id): """Get the longitudes and latitudes of the scene.""" lons, lats = self._get_all_interpolated_coordinates() if coordinate_id == "longitude": return create_xarray(lons) if coordinate_id == "latitude": return create_xarray(lats) raise KeyError("Coordinate {} unknown.".format(coordinate_id))
[docs] def _get_all_interpolated_coordinates_uncached(self): lons40km, lats40km = self._get_coordinates_in_degrees() return self._interpolate_arrays(lons40km, lats40km, geolocation=True)
[docs] def _get_coordinates_in_degrees(self): lons40km = self._data["pos"][:, :, 1] * 1e-4 lats40km = self._data["pos"][:, :, 0] * 1e-4 return lons40km, lats40km
[docs] def calibrate(self, dataset_id, pre_launch_coeffs=False, calib_coeffs=None): """Calibrate the data.""" if calib_coeffs is None: calib_coeffs = {} units = {"reflectance": "%", "brightness_temperature": "K", "counts": "", "radiance": "W*m-2*sr-1*cm ?"} if dataset_id["name"] in ("3a", "3b") and self._is3b is None: # Is it 3a or 3b: line_chunks = get_aapp_chunks((self._data.shape[0], 2048))[0] self._is3a = da.bitwise_and(da.from_array(self._data["scnlinbit"], chunks=line_chunks), 3) == 0 self._is3b = da.bitwise_and(da.from_array(self._data["scnlinbit"], chunks=line_chunks), 3) == 1 try: vis_idx = ["1", "2", "3a"].index(dataset_id["name"]) ir_idx = None except ValueError: vis_idx = None ir_idx = ["3b", "4", "5"].index(dataset_id["name"]) mask = True if vis_idx is not None: coeffs = calib_coeffs.get("ch" + dataset_id["name"]) if dataset_id["name"] == "3a": mask = self._is3a[:, None] ds = create_xarray( _vis_calibrate(self._data, vis_idx, dataset_id["calibration"], pre_launch_coeffs, coeffs, mask=mask)) else: if dataset_id["name"] == "3b": mask = self._is3b[:, None] ds = create_xarray( _ir_calibrate(self._header, self._data, ir_idx, dataset_id["calibration"], mask=mask)) ds.attrs["units"] = units[dataset_id["calibration"]] ds.attrs.update(dataset_id._asdict()) return ds
# AAPP 1b header _HEADERTYPE = np.dtype([("siteid", "S3"), ("blank", "S1"), ("l1bversnb", "<i2"), ("l1bversyr", "<i2"), ("l1bversdy", "<i2"), ("reclg", "<i2"), ("blksz", "<i2"), ("hdrcnt", "<i2"), ("filler0", "S6"), ("dataname", "S42"), ("prblkid", "S8"), ("satid", "<i2"), ("instid", "<i2"), ("datatype", "<i2"), ("tipsrc", "<i2"), ("startdatajd", "<i4"), ("startdatayr", "<i2"), ("startdatady", "<i2"), ("startdatatime", "<i4"), ("enddatajd", "<i4"), ("enddatayr", "<i2"), ("enddatady", "<i2"), ("enddatatime", "<i4"), ("cpidsyr", "<i2"), ("cpidsdy", "<i2"), ("filler1", "S8"), # data set quality indicators ("inststat1", "<i4"), ("filler2", "S2"), ("statchrecnb", "<i2"), ("inststat2", "<i4"), ("scnlin", "<i2"), ("callocscnlin", "<i2"), ("misscnlin", "<i2"), ("datagaps", "<i2"), ("okdatafr", "<i2"), ("pacsparityerr", "<i2"), ("auxsyncerrsum", "<i2"), ("timeseqerr", "<i2"), ("timeseqerrcode", "<i2"), ("socclockupind", "<i2"), ("locerrind", "<i2"), ("locerrcode", "<i2"), ("pacsstatfield", "<i2"), ("pacsdatasrc", "<i2"), ("filler3", "S4"), ("spare1", "S8"), ("spare2", "S8"), ("filler4", "S10"), # Calibration ("racalind", "<i2"), ("solarcalyr", "<i2"), ("solarcaldy", "<i2"), ("pcalalgind", "<i2"), ("pcalalgopt", "<i2"), ("scalalgind", "<i2"), ("scalalgopt", "<i2"), ("irttcoef", "<i2", (4, 6)), ("filler5", "<i4", (2, )), # radiance to temperature conversion ("albcnv", "<i4", (2, 3)), ("radtempcnv", "<i4", (3, 3)), ("filler6", "<i4", (3, )), # Navigation ("modelid", "S8"), ("nadloctol", "<i2"), ("locbit", "<i2"), ("filler7", "S2"), ("rollerr", "<i2"), ("pitcherr", "<i2"), ("yawerr", "<i2"), ("epoyr", "<i2"), ("epody", "<i2"), ("epotime", "<i4"), ("smaxis", "<i4"), ("eccen", "<i4"), ("incli", "<i4"), ("argper", "<i4"), ("rascnod", "<i4"), ("manom", "<i4"), ("xpos", "<i4"), ("ypos", "<i4"), ("zpos", "<i4"), ("xvel", "<i4"), ("yvel", "<i4"), ("zvel", "<i4"), ("earthsun", "<i4"), ("filler8", "S16"), # analog telemetry conversion ("pchtemp", "<i2", (5, )), ("reserved1", "<i2"), ("pchtempext", "<i2", (5, )), ("reserved2", "<i2"), ("pchpow", "<i2", (5, )), ("reserved3", "<i2"), ("rdtemp", "<i2", (5, )), ("reserved4", "<i2"), ("bbtemp1", "<i2", (5, )), ("reserved5", "<i2"), ("bbtemp2", "<i2", (5, )), ("reserved6", "<i2"), ("bbtemp3", "<i2", (5, )), ("reserved7", "<i2"), ("bbtemp4", "<i2", (5, )), ("reserved8", "<i2"), ("eleccur", "<i2", (5, )), ("reserved9", "<i2"), ("motorcur", "<i2", (5, )), ("reserved10", "<i2"), ("earthpos", "<i2", (5, )), ("reserved11", "<i2"), ("electemp", "<i2", (5, )), ("reserved12", "<i2"), ("chtemp", "<i2", (5, )), ("reserved13", "<i2"), ("bptemp", "<i2", (5, )), ("reserved14", "<i2"), ("mhtemp", "<i2", (5, )), ("reserved15", "<i2"), ("adcontemp", "<i2", (5, )), ("reserved16", "<i2"), ("d4bvolt", "<i2", (5, )), ("reserved17", "<i2"), ("d5bvolt", "<i2", (5, )), ("reserved18", "<i2"), ("bbtempchn3b", "<i2", (5, )), ("reserved19", "<i2"), ("bbtempchn4", "<i2", (5, )), ("reserved20", "<i2"), ("bbtempchn5", "<i2", (5, )), ("reserved21", "<i2"), ("refvolt", "<i2", (5, )), ("reserved22", "<i2"), ]) # AAPP 1b scanline _SCANTYPE = np.dtype([("scnlin", "<i2"), ("scnlinyr", "<i2"), ("scnlindy", "<i2"), ("clockdrift", "<i2"), ("scnlintime", "<i4"), ("scnlinbit", "<i2"), ("filler0", "S10"), ("qualind", "<i4"), ("scnlinqual", "<i4"), ("calqual", "<i2", (3, )), ("cbiterr", "<i2"), ("filler1", "S8"), # Calibration ("calvis", "<i4", (3, 3, 5)), ("calir", "<i4", (3, 2, 3)), ("filler2", "<i4", (3, )), # Navigation ("navstat", "<i4"), ("attangtime", "<i4"), ("rollang", "<i2"), ("pitchang", "<i2"), ("yawang", "<i2"), ("scalti", "<i2"), ("ang", "<i2", (51, 3)), ("filler3", "<i2", (3, )), ("pos", "<i4", (51, 2)), ("filler4", "<i4", (2, )), ("telem", "<i2", (103, )), ("filler5", "<i2"), ("hrpt", "<i2", (2048, 5)), ("filler6", "<i4", (2, )), # tip minor frame header ("tipmfhd", "<i2", (7, 5)), # cpu telemetry ("cputel", "S6", (2, 5)), ("filler7", "<i2", (67, )), ])
[docs] def _vis_calibrate(data, chn, calib_type, pre_launch_coeffs=False, calib_coeffs=None, mask=True): """Calibrate visible channel data. *calib_type* in count, reflectance, radiance. """ # Calibration count to albedo, the calibration is performed separately for # two value ranges. if calib_type not in ["counts", "radiance", "reflectance"]: raise ValueError("Calibration " + calib_type + " unknown!") channel_data = data["hrpt"][:, :, chn] chunks = get_aapp_chunks(channel_data.shape) line_chunks = chunks[0] channel = da.from_array(channel_data, chunks=chunks) mask &= channel != 0 if calib_type == "counts": return channel channel = channel.astype(CHANNEL_DTYPE) if calib_type == "radiance":"Radiances are not yet supported for " + "the VIS/NIR channels!") if pre_launch_coeffs: coeff_idx = 2 else: # check that coeffs are valid if np.all(data["calvis"][:, chn, 0, 4] == 0): "No valid operational coefficients, fall back to pre-launch") coeff_idx = 2 else: coeff_idx = 0 intersection = da.from_array(data["calvis"][:, chn, coeff_idx, 4], chunks=line_chunks) if calib_coeffs is not None:"Updating from external calibration coefficients.") slope1 = da.from_array(calib_coeffs[0], chunks=line_chunks) intercept1 = da.from_array(calib_coeffs[1], chunks=line_chunks) slope2 = da.from_array(calib_coeffs[2], chunks=line_chunks) intercept2 = da.from_array(calib_coeffs[3], chunks=line_chunks) else: slope1 = da.from_array(data["calvis"][:, chn, coeff_idx, 0], chunks=line_chunks) * 1e-10 intercept1 = da.from_array(data["calvis"][:, chn, coeff_idx, 1], chunks=line_chunks) * 1e-7 slope2 = da.from_array(data["calvis"][:, chn, coeff_idx, 2], chunks=line_chunks) * 1e-10 intercept2 = da.from_array(data["calvis"][:, chn, coeff_idx, 3], chunks=line_chunks) * 1e-7 # In the level 1b file, the visible coefficients are stored as 4-byte integers. Scaling factors then convert # them to real numbers which are applied to the measured counts. The coefficient is different depending on # whether the counts are less than or greater than the high-gain/low-gain transition value (nominally 500). # The slope for visible channels should always be positive (reflectance increases with count). With the # pre-launch coefficients the channel 2, 3a slope is always positive but with the operational coefs the stored # number in the high-reflectance regime overflows the maximum 2147483647, i.e. it is negative when # interpreted as a signed integer. So you have to modify it. Also chanel 1 is treated the same way in AAPP. slope2 = da.where(slope2 < 0, slope2 + 0.4294967296, slope2) channel = da.where(channel <= intersection[:, None], channel * slope1[:, None] + intercept1[:, None], channel * slope2[:, None] + intercept2[:, None]) channel = channel.clip(min=0) return da.where(mask, channel, np.nan)
[docs] def _ir_calibrate(header, data, irchn, calib_type, mask=True): """Calibrate for IR bands. *calib_type* in brightness_temperature, radiance, count """ channel_data = data["hrpt"][:, :, irchn + 2] chunks = get_aapp_chunks(channel_data.shape) line_chunks = chunks[0] count = da.from_array(channel_data, chunks=chunks) if calib_type == 0: return count # Mask unnaturally low values mask &= count != 0 count = count.astype(CHANNEL_DTYPE) k1_ = da.from_array(data["calir"][:, irchn, 0, 0], chunks=line_chunks) / 1.0e9 k2_ = da.from_array(data["calir"][:, irchn, 0, 1], chunks=line_chunks) / 1.0e6 k3_ = da.from_array(data["calir"][:, irchn, 0, 2], chunks=line_chunks) / 1.0e6 # Count to radiance conversion: rad = k1_[:, None] * count * count + k2_[:, None] * count + k3_[:, None] # Suspicious lines mask &= ((k1_ != 0) | (k2_ != 0) | (k3_ != 0))[:, None] if calib_type == 2: mask &= rad > 0.0 return da.where(mask, rad, np.nan) # Central wavenumber: cwnum = header["radtempcnv"][0, irchn, 0] if irchn == 0: cwnum = cwnum / 1.0e2 else: cwnum = cwnum / 1.0e3 bandcor_2 = header["radtempcnv"][0, irchn, 1] / 1e5 bandcor_3 = header["radtempcnv"][0, irchn, 2] / 1e6 ir_const_1 = 1.1910659e-5 ir_const_2 = 1.438833 t_planck = (ir_const_2 * cwnum) / \ np.log(1 + ir_const_1 * cwnum * cwnum * cwnum / rad) # Band corrections applied to t_planck to get correct # brightness temperature for channel: if bandcor_2 < 0: # Post AAPP-v4 tb_ = bandcor_2 + bandcor_3 * t_planck else: # AAPP 1 to 4 tb_ = (t_planck - bandcor_2) / bandcor_3 # Mask unnaturally low values return da.where(mask, tb_, np.nan)