#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2011-2019 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Interface to VIIRS L1B format."""
import logging
from datetime import datetime
import numpy as np
from satpy.readers.netcdf_utils import NetCDF4FileHandler
LOG = logging.getLogger(__name__)
[docs]
class VIIRSL1BFileHandler(NetCDF4FileHandler):
"""VIIRS L1B File Reader."""
[docs]
def _parse_datetime(self, datestr):
"""Parse datetime."""
return datetime.strptime(datestr, "%Y-%m-%dT%H:%M:%S.000Z")
@property
def start_orbit_number(self):
"""Get start orbit number."""
try:
return int(self['/attr/orbit_number'])
except KeyError:
return int(self['/attr/OrbitNumber'])
@property
def end_orbit_number(self):
"""Get end orbit number."""
try:
return int(self['/attr/orbit_number'])
except KeyError:
return int(self['/attr/OrbitNumber'])
@property
def platform_name(self):
"""Get platform name."""
try:
res = self.get('/attr/platform',
self.filename_info['platform_shortname'])
except KeyError:
res = 'Unknown'
return {
'JPSS-1': 'NOAA-20',
'NP': 'Suomi-NPP',
'J1': 'NOAA-20',
'J2': 'NOAA-21',
'JPSS-2': 'NOAA-21',
}.get(res, res)
@property
def sensor_name(self):
"""Get sensor name."""
return self['/attr/instrument'].lower()
[docs]
def adjust_scaling_factors(self, factors, file_units, output_units):
"""Adjust scaling factors."""
if factors is None or factors[0] is None:
factors = [1, 0]
if file_units == output_units:
LOG.debug("File units and output units are the same (%s)", file_units)
return factors
factors = np.array(factors)
if file_units == "W cm-2 sr-1" and output_units == "W m-2 sr-1":
LOG.debug("Adjusting scaling factors to convert '%s' to '%s'", file_units, output_units)
factors[::2] = np.where(factors[::2] != -999, factors[::2] * 10000.0, -999)
factors[1::2] = np.where(factors[1::2] != -999, factors[1::2] * 10000.0, -999)
return factors
elif file_units == "1" and output_units == "%":
LOG.debug("Adjusting scaling factors to convert '%s' to '%s'", file_units, output_units)
factors[::2] = np.where(factors[::2] != -999, factors[::2] * 100.0, -999)
factors[1::2] = np.where(factors[1::2] != -999, factors[1::2] * 100.0, -999)
return factors
else:
return factors
[docs]
def get_shape(self, ds_id, ds_info):
"""Get shape."""
var_path = self._dataset_name_to_var_path(ds_id['name'], ds_info)
return self.get(var_path + '/shape', 1)
@property
def start_time(self):
"""Get start time."""
return self._parse_datetime(self['/attr/time_coverage_start'])
@property
def end_time(self):
"""Get end time."""
return self._parse_datetime(self['/attr/time_coverage_end'])
[docs]
def _get_dataset_file_units(self, dataset_id, ds_info, var_path):
file_units = ds_info.get('file_units')
if file_units is None:
file_units = self.get(var_path + '/attr/units')
# they were almost completely CF compliant...
if file_units == "none":
file_units = "1"
if dataset_id.get('calibration') == 'radiance' and ds_info['units'] == 'W m-2 um-1 sr-1':
rad_units_path = var_path + '/attr/radiance_units'
if rad_units_path in self:
if file_units is None:
file_units = self[var_path + '/attr/radiance_units']
if file_units == 'Watts/meter^2/steradian/micrometer':
file_units = 'W m-2 um-1 sr-1'
elif ds_info.get('units') == '%' and file_units is None:
# v1.1 and above of level 1 processing removed 'units' attribute
# for all reflectance channels
file_units = "1"
return file_units
[docs]
def _get_dataset_valid_range(self, dataset_id, ds_info, var_path):
if dataset_id.get('calibration') == 'radiance' and ds_info['units'] == 'W m-2 um-1 sr-1':
rad_units_path = var_path + '/attr/radiance_units'
if rad_units_path in self:
# we are getting a reflectance band but we want the radiance values
# special scaling parameters
scale_factor = self[var_path + '/attr/radiance_scale_factor']
scale_offset = self[var_path + '/attr/radiance_add_offset']
else:
# we are getting a btemp band but we want the radiance values
# these are stored directly in the primary variable
scale_factor = self[var_path + '/attr/scale_factor']
scale_offset = self[var_path + '/attr/add_offset']
valid_min = self[var_path + '/attr/valid_min']
valid_max = self[var_path + '/attr/valid_max']
elif ds_info.get('units') == '%':
# normal reflectance
valid_min = self[var_path + '/attr/valid_min']
valid_max = self[var_path + '/attr/valid_max']
scale_factor = self[var_path + '/attr/scale_factor']
scale_offset = self[var_path + '/attr/add_offset']
elif ds_info.get('units') == 'K':
# normal brightness temperature
# use a special LUT to get the actual values
lut_var_path = ds_info.get('lut', var_path + '_brightness_temperature_lut')
# we get the BT values from a look up table using the scaled radiance integers
valid_min = self[lut_var_path + '/attr/valid_min']
valid_max = self[lut_var_path + '/attr/valid_max']
scale_factor = scale_offset = None
else:
valid_min = self.get(var_path + '/attr/valid_min')
valid_max = self.get(var_path + '/attr/valid_max')
scale_factor = self.get(var_path + '/attr/scale_factor')
scale_offset = self.get(var_path + '/attr/add_offset')
return valid_min, valid_max, scale_factor, scale_offset
[docs]
def _is_scan_based_array(self, shape):
return '/dimension/number_of_scans' in self and isinstance(shape, tuple) and shape
[docs]
def get_dataset(self, dataset_id, ds_info):
"""Get dataset."""
var_path = self._dataset_name_to_var_path(dataset_id['name'], ds_info)
metadata = self.get_metadata(dataset_id, ds_info)
valid_min, valid_max, scale_factor, scale_offset = self._get_dataset_valid_range(dataset_id, ds_info, var_path)
if dataset_id.get('calibration') == 'radiance' and ds_info['units'] == 'W m-2 um-1 sr-1':
data = self[var_path]
elif ds_info.get('units') == '%':
data = self[var_path]
elif ds_info.get('units') == 'K':
# normal brightness temperature
# use a special LUT to get the actual values
lut_var_path = ds_info.get('lut', var_path + '_brightness_temperature_lut')
data = self[var_path]
# we get the BT values from a look up table using the scaled radiance integers
index_arr = data.data.astype(int)
coords = data.coords
data.data = self[lut_var_path].data[index_arr.ravel()].reshape(data.shape)
data = data.assign_coords(**coords)
else:
data = self[var_path]
data.attrs.update(metadata)
if valid_min is not None and valid_max is not None:
data = data.where((data >= valid_min) & (data <= valid_max))
if data.attrs.get('units') in ['%', 'K', '1', 'W m-2 um-1 sr-1'] and \
'flag_meanings' in data.attrs:
# flag meanings don't mean anything anymore for these variables
# these aren't category products
data.attrs.pop('flag_meanings', None)
data.attrs.pop('flag_values', None)
factors = (scale_factor, scale_offset)
factors = self.adjust_scaling_factors(factors, metadata['file_units'], ds_info.get("units"))
if factors[0] != 1 or factors[1] != 0:
data *= factors[0]
data += factors[1]
# rename dimensions to correspond to satpy's 'y' and 'x' standard
if 'number_of_lines' in data.dims:
data = data.rename({'number_of_lines': 'y', 'number_of_pixels': 'x'})
return data
[docs]
def available_datasets(self, configured_datasets=None):
"""Generate dataset info and their availablity.
See
:meth:`satpy.readers.file_handlers.BaseFileHandler.available_datasets`
for details.
"""
for is_avail, ds_info in (configured_datasets or []):
if is_avail is not None:
# some other file handler said it has this dataset
# we don't know any more information than the previous
# file handler so let's yield early
yield is_avail, ds_info
continue
ft_matches = self.file_type_matches(ds_info['file_type'])
var_path = self._dataset_name_to_var_path(ds_info['name'], ds_info)
is_in_file = var_path in self
yield ft_matches and is_in_file, ds_info
[docs]
@staticmethod
def _dataset_name_to_var_path(dataset_name: str, ds_info: dict) -> str:
return ds_info.get('file_key', 'observation_data/{}'.format(dataset_name))