Source code for satpy.readers.fci_l2_nc

# Copyright (c) 2019-2023 Satpy developers
#
# satpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# satpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with satpy.  If not, see <http://www.gnu.org/licenses/>.

"""Reader for the FCI L2 products in NetCDF4 format."""

import logging
from contextlib import suppress

import numpy as np
import xarray as xr
from pyresample import geometry

from satpy.readers._geos_area import get_geos_area_naming, make_ext
from satpy.readers.eum_base import get_service_mode
from satpy.readers.file_handlers import BaseFileHandler
from satpy.resample import get_area_def
from satpy.utils import get_legacy_chunk_size

logger = logging.getLogger(__name__)

CHUNK_SIZE = get_legacy_chunk_size()

SSP_DEFAULT = 0.0


[docs] class FciL2CommonFunctions(object): """Shared operations for file handlers.""" @property def spacecraft_name(self): """Return spacecraft name.""" return self.nc.attrs['platform'] @property def sensor_name(self): """Return instrument name.""" return self.nc.attrs['data_source'] @property def ssp_lon(self): """Return longitude at subsatellite point.""" try: return float(self.nc['mtg_geos_projection'].attrs['longitude_of_projection_origin']) except (KeyError, AttributeError): logger.warning(f"ssp_lon could not be obtained from file content, using default value " f"of {SSP_DEFAULT} degrees east instead") return SSP_DEFAULT
[docs] def _get_global_attributes(self): """Create a dictionary of global attributes to be added to all datasets. Returns: dict: A dictionary of global attributes. filename: name of the product file spacecraft_name: name of the spacecraft ssp_lon: longitude of subsatellite point sensor: name of sensor platform_name: name of the platform """ attributes = { 'filename': self.filename, 'spacecraft_name': self.spacecraft_name, 'ssp_lon': self.ssp_lon, 'sensor': self.sensor_name, 'platform_name': self.spacecraft_name, } return attributes
[docs] def _set_attributes(self, variable, dataset_info, segmented=False): """Set dataset attributes.""" if segmented: xdim, ydim = "number_of_FoR_cols", "number_of_FoR_rows" else: xdim, ydim = "number_of_columns", "number_of_rows" if dataset_info['file_key'] not in ['product_quality', 'product_completeness', 'product_timeliness']: variable = variable.rename({ydim: 'y', xdim: 'x'}) variable.attrs.setdefault('units', None) variable.attrs.update(dataset_info) variable.attrs.update(self._get_global_attributes()) return variable
[docs] def _slice_dataset(self, variable, dataset_info, dimensions): """Slice data if dimension layers have been provided in yaml-file.""" slice_dict = {dim: dataset_info[dim_id] for (dim, dim_id) in dimensions.items() if dim_id in dataset_info.keys() and dim in variable.dims} for dim, dim_ind in slice_dict.items(): logger.debug(f"Extracting {dimensions[dim]}-index {dim_ind} from dimension {dim!r}.") variable = variable.sel(slice_dict) return variable
[docs] @staticmethod def _mask_data(variable, fill_value): """Set fill_values, as defined in yaml-file, to NaN. Set data points in variable to NaN if they are equal to fill_value or any of the values in fill_value if fill_value is a list. """ if not isinstance(fill_value, list): fill_value = [fill_value] for val in fill_value: variable = variable.where(variable != val).astype('float32') return variable
def __del__(self): """Close the NetCDF file that may still be open.""" with suppress(AttributeError, OSError): self.nc.close()
[docs] class FciL2NCFileHandler(FciL2CommonFunctions, BaseFileHandler): """Reader class for FCI L2 products in NetCDF4 format.""" def __init__(self, filename, filename_info, filetype_info, with_area_definition=True): """Open the NetCDF file with xarray and prepare for dataset reading.""" super().__init__(filename, filename_info, filetype_info) # Use xarray's default netcdf4 engine to open the fileq self.nc = xr.open_dataset( self.filename, decode_cf=True, mask_and_scale=True, chunks={ 'number_of_columns': CHUNK_SIZE, 'number_of_rows': CHUNK_SIZE } ) if with_area_definition is False: logger.info("Setting `with_area_defintion=False` has no effect on pixel-based products.") # Read metadata which are common to all datasets self.nlines = self.nc['y'].size self.ncols = self.nc['x'].size self._projection = self.nc['mtg_geos_projection'] self.multi_dims = {'maximum_number_of_layers': 'layer', 'number_of_vis_channels': 'vis_channel_id'}
[docs] def get_area_def(self, key): """Return the area definition.""" try: return self._area_def except AttributeError: raise NotImplementedError
[docs] def get_dataset(self, dataset_id, dataset_info): """Get dataset using the file_key in dataset_info.""" var_key = dataset_info['file_key'] par_name = dataset_info['name'] logger.debug('Reading in file to get dataset with key %s.', var_key) try: variable = self.nc[var_key] except KeyError: logger.warning("Could not find key %s in NetCDF file, no valid Dataset created", var_key) return None # Compute the area definition if var_key not in ['product_quality', 'product_completeness', 'product_timeliness']: self._area_def = self._compute_area_def(dataset_id) if any(dim_id in dataset_info.keys() for dim_id in self.multi_dims.values()): variable = self._slice_dataset(variable, dataset_info, self.multi_dims) if par_name == 'retrieved_cloud_optical_thickness': variable = self.get_total_cot(variable) if dataset_info['file_type'] == 'nc_fci_test_clm': variable = self._decode_clm_test_data(variable, dataset_info) if 'fill_value' in dataset_info: variable = self._mask_data(variable, dataset_info['fill_value']) variable = self._set_attributes(variable, dataset_info) return variable
[docs] @staticmethod def _decode_clm_test_data(variable, dataset_info): if dataset_info['file_key'] != 'cloud_mask_cmrt6_test_result': variable = variable.astype('uint32') variable.values = (variable.values >> dataset_info['extract_byte'] << 31 >> 31).astype('int8') return variable
[docs] def _compute_area_def(self, dataset_id): """Compute the area definition. Returns: AreaDefinition: A pyresample AreaDefinition object containing the area definition. """ area_extent = self._get_area_extent() area_naming, proj_dict = self._get_proj_area(dataset_id) area_def = geometry.AreaDefinition( area_naming['area_id'], area_naming['description'], "", proj_dict, self.ncols, self.nlines, area_extent) return area_def
[docs] def _get_area_extent(self): """Calculate area extent of dataset.""" # Load and convert x/y coordinates to degrees as required by the make_ext function x = self.nc['x'] y = self.nc['y'] x_deg = np.degrees(x) y_deg = np.degrees(y) # Select the extreme points and calcualte area extent (not: these refer to pixel center) ll_x, ur_x = -x_deg.values[0], -x_deg.values[-1] ll_y, ur_y = y_deg.values[-1], y_deg.values[0] h = float(self._projection.attrs['perspective_point_height']) area_extent_pixel_center = make_ext(ll_x, ur_x, ll_y, ur_y, h) # Shift area extent by half a pixel to get the area extent w.r.t. the dataset/pixel corners scale_factor = (x[1:]-x[0:-1]).values.mean() res = abs(scale_factor) * h area_extent = tuple(i + res/2 if i > 0 else i - res/2 for i in area_extent_pixel_center) return area_extent
[docs] def _get_proj_area(self, dataset_id): """Extract projection and area information.""" # Read the projection data from the mtg_geos_projection variable a = float(self._projection.attrs['semi_major_axis']) h = float(self._projection.attrs['perspective_point_height']) # Some L2PF test data files have a typo in the keyname for the inverse flattening parameter. Use a default value # as fallback until all L2PF test files are correctly formatted. rf = float(self._projection.attrs.get('inverse_flattening', 298.257223563)) res = dataset_id["resolution"] area_naming_input_dict = {'platform_name': 'mtg', 'instrument_name': 'fci', 'resolution': res, } area_naming = get_geos_area_naming({**area_naming_input_dict, **get_service_mode('fci', self.ssp_lon)}) proj_dict = {'a': a, 'lon_0': self.ssp_lon, 'h': h, "rf": rf, 'proj': 'geos', 'units': 'm', "sweep": 'y'} return area_naming, proj_dict
[docs] @staticmethod def get_total_cot(variable): """Sum the cloud optical thickness from the two OCA layers. The optical thickness has to be transformed to linear space before adding the values from the two layers. The combined/total optical thickness is then transformed back to logarithmic space. """ attrs = variable.attrs variable = 10 ** variable variable = variable.fillna(0.) variable = variable.sum(dim='maximum_number_of_layers', keep_attrs=True) variable = variable.where(variable != 0., np.nan) variable = np.log10(variable) variable.attrs = attrs return variable
[docs] class FciL2NCSegmentFileHandler(FciL2CommonFunctions, BaseFileHandler): """Reader class for FCI L2 Segmented products in NetCDF4 format.""" def __init__(self, filename, filename_info, filetype_info, with_area_definition=False): """Open the NetCDF file with xarray and prepare for dataset reading.""" super().__init__(filename, filename_info, filetype_info) # Use xarray's default netcdf4 engine to open the file self.nc = xr.open_dataset( self.filename, decode_cf=True, mask_and_scale=True, chunks={ 'number_of_FoR_cols': CHUNK_SIZE, 'number_of_FoR_rows': CHUNK_SIZE } ) # Read metadata which are common to all datasets self.nlines = self.nc['number_of_FoR_rows'].size self.ncols = self.nc['number_of_FoR_cols'].size self.with_adef = with_area_definition self.multi_dims = { 'number_of_categories': 'category_id', 'number_of_channels': 'channel_id', 'number_of_vis_channels': 'vis_channel_id', 'number_of_ir_channels': 'ir_channel_id', 'number_test': 'test_id', }
[docs] def get_area_def(self, key): """Return the area definition.""" try: return self._area_def except AttributeError: raise NotImplementedError
[docs] def get_dataset(self, dataset_id, dataset_info): """Get dataset using the file_key in dataset_info.""" var_key = dataset_info['file_key'] logger.debug('Reading in file to get dataset with key %s.', var_key) try: variable = self.nc[var_key] except KeyError: logger.warning("Could not find key %s in NetCDF file, no valid Dataset created", var_key) return None if any(dim_id in dataset_info.keys() for dim_id in self.multi_dims.values()): variable = self._slice_dataset(variable, dataset_info, self.multi_dims) if self.with_adef and var_key not in ['longitude', 'latitude', 'product_quality', 'product_completeness', 'product_timeliness']: self._area_def = self._construct_area_def(dataset_id) # coordinates are not relevant when returning data with an AreaDefinition if 'coordinates' in dataset_info.keys(): del dataset_info['coordinates'] if 'fill_value' in dataset_info: variable = self._mask_data(variable, dataset_info['fill_value']) variable = self._set_attributes(variable, dataset_info, segmented=True) return variable
[docs] def _construct_area_def(self, dataset_id): """Construct the area definition. Returns: AreaDefinition: A pyresample AreaDefinition object containing the area definition. """ res = dataset_id["resolution"] area_naming_input_dict = {'platform_name': 'mtg', 'instrument_name': 'fci', 'resolution': res, } area_naming = get_geos_area_naming({**area_naming_input_dict, **get_service_mode('fci', self.ssp_lon)}) # Construct area definition from standardized area definition. stand_area_def = get_area_def(area_naming['area_id']) if (stand_area_def.x_size != self.ncols) | (stand_area_def.y_size != self.nlines): raise NotImplementedError('Unrecognised AreaDefinition.') mod_area_extent = self._modify_area_extent(stand_area_def.area_extent) area_def = geometry.AreaDefinition( stand_area_def.area_id, stand_area_def.description, "", stand_area_def.proj_dict, stand_area_def.x_size, stand_area_def.y_size, mod_area_extent) return area_def
[docs] @staticmethod def _modify_area_extent(stand_area_extent): """Modify area extent to macth satellite projection. Area extent has to be modified since the L2 products are stored with the south-east in the upper-right corner (as opposed to north-east in the standardized area definitions). """ ll_x, ll_y, ur_x, ur_y = stand_area_extent ll_y *= -1. ur_y *= -1. area_extent = tuple([ll_x, ll_y, ur_x, ur_y]) return area_extent