Source code for satpy.readers.acspo

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2017 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""ACSPO SST Reader.

See the following page for more information:

https://podaac.jpl.nasa.gov/dataset/VIIRS_NPP-OSPO-L2P-v2.3

"""
import logging
from datetime import datetime

import numpy as np

from satpy.readers.netcdf_utils import NetCDF4FileHandler

LOG = logging.getLogger(__name__)


ROWS_PER_SCAN = {
    'modis': 10,
    'viirs': 16,
    'avhrr': None,
}


[docs] class ACSPOFileHandler(NetCDF4FileHandler): """ACSPO L2P SST File Reader.""" @property def platform_name(self): """Get satellite name for this file's data.""" res = self['/attr/platform'] if isinstance(res, np.ndarray): return str(res.astype(str)) return res @property def sensor_name(self): """Get instrument name for this file's data.""" res = self['/attr/sensor'] if isinstance(res, np.ndarray): res = str(res.astype(str)) return res.lower()
[docs] def get_shape(self, ds_id, ds_info): """Get numpy array shape for the specified dataset. Args: ds_id (DataID): ID of dataset that will be loaded ds_info (dict): Dictionary of dataset information from config file Returns: tuple: (rows, cols) """ var_path = ds_info.get('file_key', '{}'.format(ds_id['name'])) if var_path + '/shape' not in self: # loading a scalar value shape = 1 else: shape = self[var_path + '/shape'] if len(shape) == 3: if shape[0] != 1: raise ValueError("Not sure how to load 3D Dataset with more than 1 time") shape = shape[1:] return shape
[docs] @staticmethod def _parse_datetime(datestr): return datetime.strptime(datestr, "%Y%m%dT%H%M%SZ")
@property def start_time(self): """Get first observation time of data.""" return self._parse_datetime(self['/attr/time_coverage_start']) @property def end_time(self): """Get final observation time of data.""" return self._parse_datetime(self['/attr/time_coverage_end'])
[docs] def get_metadata(self, dataset_id, ds_info): """Collect various metadata about the specified dataset.""" var_path = ds_info.get('file_key', '{}'.format(dataset_id['name'])) shape = self.get_shape(dataset_id, ds_info) units = self[var_path + '/attr/units'] info = getattr(self[var_path], 'attrs', {}) standard_name = self[var_path + '/attr/standard_name'] resolution = float(self['/attr/spatial_resolution'].split(' ')[0]) rows_per_scan = ROWS_PER_SCAN.get(self.sensor_name) or 0 info.update(dataset_id.to_dict()) info.update({ 'shape': shape, 'units': units, 'platform_name': self.platform_name, 'sensor': self.sensor_name, 'standard_name': standard_name, 'resolution': resolution, 'rows_per_scan': rows_per_scan, 'long_name': self.get(var_path + '/attr/long_name'), 'comment': self.get(var_path + '/attr/comment'), }) return info
[docs] def get_dataset(self, dataset_id, ds_info): """Load data array and metadata from file on disk.""" var_path = ds_info.get('file_key', '{}'.format(dataset_id['name'])) metadata = self.get_metadata(dataset_id, ds_info) shape = metadata['shape'] file_shape = self[var_path + '/shape'] metadata['shape'] = shape valid_min = self[var_path + '/attr/valid_min'] valid_max = self[var_path + '/attr/valid_max'] # no need to check fill value since we are using valid min/max scale_factor = self.get(var_path + '/attr/scale_factor') add_offset = self.get(var_path + '/attr/add_offset') data = self[var_path] data = data.rename({"ni": "x", "nj": "y"}) if isinstance(file_shape, tuple) and len(file_shape) == 3: # can only read 3D arrays with size 1 in the first dimension data = data[0] data = data.where((data >= valid_min) & (data <= valid_max)) if scale_factor is not None: data = data * scale_factor + add_offset if ds_info.get('cloud_clear', False): # clear-sky if bit 15-16 are 00 clear_sky_mask = (self['l2p_flags'][0] & 0b1100000000000000) != 0 clear_sky_mask = clear_sky_mask.rename({"ni": "x", "nj": "y"}) data = data.where(~clear_sky_mask) data.attrs.update(metadata) # Remove these attributes since they are no longer valid and can cause invalid value filling. data.attrs.pop('_FillValue', None) data.attrs.pop('valid_max', None) data.attrs.pop('valid_min', None) return data