Source code for satpy.readers.viirs_l1b

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2011-2019 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Interface to VIIRS L1B format."""

import logging
from datetime import datetime

import numpy as np

from satpy.readers.netcdf_utils import NetCDF4FileHandler

LOG = logging.getLogger(__name__)


[docs] class VIIRSL1BFileHandler(NetCDF4FileHandler): """VIIRS L1B File Reader."""
[docs] def _parse_datetime(self, datestr): """Parse datetime.""" return datetime.strptime(datestr, "%Y-%m-%dT%H:%M:%S.000Z")
@property def start_orbit_number(self): """Get start orbit number.""" try: return int(self['/attr/orbit_number']) except KeyError: return int(self['/attr/OrbitNumber']) @property def end_orbit_number(self): """Get end orbit number.""" try: return int(self['/attr/orbit_number']) except KeyError: return int(self['/attr/OrbitNumber']) @property def platform_name(self): """Get platform name.""" try: res = self.get('/attr/platform', self.filename_info['platform_shortname']) except KeyError: res = 'Unknown' return { 'JPSS-1': 'NOAA-20', 'NP': 'Suomi-NPP', 'J1': 'NOAA-20', 'J2': 'NOAA-21', 'JPSS-2': 'NOAA-21', }.get(res, res) @property def sensor_name(self): """Get sensor name.""" return self['/attr/instrument'].lower()
[docs] def adjust_scaling_factors(self, factors, file_units, output_units): """Adjust scaling factors.""" if factors is None or factors[0] is None: factors = [1, 0] if file_units == output_units: LOG.debug("File units and output units are the same (%s)", file_units) return factors factors = np.array(factors) if file_units == "W cm-2 sr-1" and output_units == "W m-2 sr-1": LOG.debug("Adjusting scaling factors to convert '%s' to '%s'", file_units, output_units) factors[::2] = np.where(factors[::2] != -999, factors[::2] * 10000.0, -999) factors[1::2] = np.where(factors[1::2] != -999, factors[1::2] * 10000.0, -999) return factors elif file_units == "1" and output_units == "%": LOG.debug("Adjusting scaling factors to convert '%s' to '%s'", file_units, output_units) factors[::2] = np.where(factors[::2] != -999, factors[::2] * 100.0, -999) factors[1::2] = np.where(factors[1::2] != -999, factors[1::2] * 100.0, -999) return factors else: return factors
[docs] def get_shape(self, ds_id, ds_info): """Get shape.""" var_path = self._dataset_name_to_var_path(ds_id['name'], ds_info) return self.get(var_path + '/shape', 1)
@property def start_time(self): """Get start time.""" return self._parse_datetime(self['/attr/time_coverage_start']) @property def end_time(self): """Get end time.""" return self._parse_datetime(self['/attr/time_coverage_end'])
[docs] def _get_dataset_file_units(self, dataset_id, ds_info, var_path): file_units = ds_info.get('file_units') if file_units is None: file_units = self.get(var_path + '/attr/units') # they were almost completely CF compliant... if file_units == "none": file_units = "1" if dataset_id.get('calibration') == 'radiance' and ds_info['units'] == 'W m-2 um-1 sr-1': rad_units_path = var_path + '/attr/radiance_units' if rad_units_path in self: if file_units is None: file_units = self[var_path + '/attr/radiance_units'] if file_units == 'Watts/meter^2/steradian/micrometer': file_units = 'W m-2 um-1 sr-1' elif ds_info.get('units') == '%' and file_units is None: # v1.1 and above of level 1 processing removed 'units' attribute # for all reflectance channels file_units = "1" return file_units
[docs] def _get_dataset_valid_range(self, dataset_id, ds_info, var_path): if dataset_id.get('calibration') == 'radiance' and ds_info['units'] == 'W m-2 um-1 sr-1': rad_units_path = var_path + '/attr/radiance_units' if rad_units_path in self: # we are getting a reflectance band but we want the radiance values # special scaling parameters scale_factor = self[var_path + '/attr/radiance_scale_factor'] scale_offset = self[var_path + '/attr/radiance_add_offset'] else: # we are getting a btemp band but we want the radiance values # these are stored directly in the primary variable scale_factor = self[var_path + '/attr/scale_factor'] scale_offset = self[var_path + '/attr/add_offset'] valid_min = self[var_path + '/attr/valid_min'] valid_max = self[var_path + '/attr/valid_max'] elif ds_info.get('units') == '%': # normal reflectance valid_min = self[var_path + '/attr/valid_min'] valid_max = self[var_path + '/attr/valid_max'] scale_factor = self[var_path + '/attr/scale_factor'] scale_offset = self[var_path + '/attr/add_offset'] elif ds_info.get('units') == 'K': # normal brightness temperature # use a special LUT to get the actual values lut_var_path = ds_info.get('lut', var_path + '_brightness_temperature_lut') # we get the BT values from a look up table using the scaled radiance integers valid_min = self[lut_var_path + '/attr/valid_min'] valid_max = self[lut_var_path + '/attr/valid_max'] scale_factor = scale_offset = None else: valid_min = self.get(var_path + '/attr/valid_min') valid_max = self.get(var_path + '/attr/valid_max') scale_factor = self.get(var_path + '/attr/scale_factor') scale_offset = self.get(var_path + '/attr/add_offset') return valid_min, valid_max, scale_factor, scale_offset
[docs] def get_metadata(self, dataset_id, ds_info): """Get metadata.""" var_path = self._dataset_name_to_var_path(dataset_id['name'], ds_info) shape = self.get_shape(dataset_id, ds_info) file_units = self._get_dataset_file_units(dataset_id, ds_info, var_path) # Get extra metadata if self._is_scan_based_array(shape): rows_per_scan = int(shape[0] / self['/dimension/number_of_scans']) ds_info.setdefault('rows_per_scan', rows_per_scan) i = getattr(self[var_path], 'attrs', {}) i.update(ds_info) i.update(dataset_id.to_dict()) i.update({ "shape": shape, "units": ds_info.get("units", file_units), "file_units": file_units, "platform_name": self.platform_name, "sensor": self.sensor_name, "start_orbit": self.start_orbit_number, "end_orbit": self.end_orbit_number, }) i.update(dataset_id.to_dict()) return i
[docs] def _is_scan_based_array(self, shape): return '/dimension/number_of_scans' in self and isinstance(shape, tuple) and shape
[docs] def get_dataset(self, dataset_id, ds_info): """Get dataset.""" var_path = self._dataset_name_to_var_path(dataset_id['name'], ds_info) metadata = self.get_metadata(dataset_id, ds_info) valid_min, valid_max, scale_factor, scale_offset = self._get_dataset_valid_range(dataset_id, ds_info, var_path) if dataset_id.get('calibration') == 'radiance' and ds_info['units'] == 'W m-2 um-1 sr-1': data = self[var_path] elif ds_info.get('units') == '%': data = self[var_path] elif ds_info.get('units') == 'K': # normal brightness temperature # use a special LUT to get the actual values lut_var_path = ds_info.get('lut', var_path + '_brightness_temperature_lut') data = self[var_path] # we get the BT values from a look up table using the scaled radiance integers index_arr = data.data.astype(int) coords = data.coords data.data = self[lut_var_path].data[index_arr.ravel()].reshape(data.shape) data = data.assign_coords(**coords) else: data = self[var_path] data.attrs.update(metadata) if valid_min is not None and valid_max is not None: data = data.where((data >= valid_min) & (data <= valid_max)) if data.attrs.get('units') in ['%', 'K', '1', 'W m-2 um-1 sr-1'] and \ 'flag_meanings' in data.attrs: # flag meanings don't mean anything anymore for these variables # these aren't category products data.attrs.pop('flag_meanings', None) data.attrs.pop('flag_values', None) factors = (scale_factor, scale_offset) factors = self.adjust_scaling_factors(factors, metadata['file_units'], ds_info.get("units")) if factors[0] != 1 or factors[1] != 0: data *= factors[0] data += factors[1] # rename dimensions to correspond to satpy's 'y' and 'x' standard if 'number_of_lines' in data.dims: data = data.rename({'number_of_lines': 'y', 'number_of_pixels': 'x'}) return data
[docs] def available_datasets(self, configured_datasets=None): """Generate dataset info and their availablity. See :meth:`satpy.readers.file_handlers.BaseFileHandler.available_datasets` for details. """ for is_avail, ds_info in (configured_datasets or []): if is_avail is not None: # some other file handler said it has this dataset # we don't know any more information than the previous # file handler so let's yield early yield is_avail, ds_info continue ft_matches = self.file_type_matches(ds_info['file_type']) var_path = self._dataset_name_to_var_path(ds_info['name'], ds_info) is_in_file = var_path in self yield ft_matches and is_in_file, ds_info
[docs] @staticmethod def _dataset_name_to_var_path(dataset_name: str, ds_info: dict) -> str: return ds_info.get('file_key', 'observation_data/{}'.format(dataset_name))