Source code for satpy.readers.li_l2_nc

# Copyright (c) 2022 Satpy developers
#
# satpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# satpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with satpy.  If not, see <http://www.gnu.org/licenses/>.

"""MTG Lighting Imager (LI) L2 unified reader.

This reader supports reading all the products from the LI L2
processing level:

  * L2-LE
  * L2-LGR
  * L2-AFA
  * L2-LEF
  * L2-LFL
  * L2-AF
  * L2-AFR

"""

import logging

import dask.array as da
import numpy as np
import xarray as xr

from satpy.readers.li_base_nc import LINCFileHandler
from satpy.resample import get_area_def
from satpy.utils import get_legacy_chunk_size

logger = logging.getLogger(__name__)
LI_GRID_SHAPE = (5568, 5568)
CHUNK_SIZE = get_legacy_chunk_size()


[docs] class LIL2NCFileHandler(LINCFileHandler): """Implementation class for the unified LI L2 satpy reader.""" def __init__(self, filename, filename_info, filetype_info, with_area_definition=False): """Initialize LIL2NCFileHandler.""" super(LIL2NCFileHandler, self).__init__(filename, filename_info, filetype_info) if with_area_definition and not self.prod_in_accumulation_grid: logger.debug(f"The current product {filetype_info['file_desc']['product_type']} " f"is not an accumulated product so it will not be regridded.") self.with_area_def = False else: self.with_area_def = with_area_definition
[docs] def get_dataset(self, dataset_id, ds_info=None): """Get the dataset and apply gridding if requested.""" data_array = super().get_dataset(dataset_id, ds_info) # variable_patterns are compiled to regex patterns # hence search variable name from swath_coordinate var_with_swath_coord = self.is_var_with_swath_coord(dataset_id) if var_with_swath_coord and self.with_area_def: data_array = self.get_array_on_fci_grid(data_array) return data_array
[docs] def get_area_def(self, dsid): """Compute area definition for a dataset, only supported for accumulated products.""" var_with_swath_coord = self.is_var_with_swath_coord(dsid) if var_with_swath_coord and self.with_area_def: return get_area_def("mtg_fci_fdss_2km") raise NotImplementedError("Area definition is not supported for accumulated products.")
[docs] def is_var_with_swath_coord(self, dsid): """Check if the variable corresponding to this dataset is listed as variable with swath coordinates.""" # since the patterns are compiled to regex we use the search() method below to find matches with_swath_coords = any([p.search(dsid["name"]) is not None for p in self.swath_coordinates["patterns"]]) return with_swath_coords
[docs] def get_array_on_fci_grid(self, data_array: xr.DataArray): """Obtain the accumulated products as a (sparse) 2-d array. The array has the shape of the FCI 2 km grid (5568x5568px), and will have an AreaDefinition attached. """ # Integer values without the application of scale_factor and add_offset # hence no projection/index calculation. # Note that x and y have origin in the south-west corner of the image # and start with index 1. rows = self.get_measured_variable("y") cols = self.get_measured_variable("x") attrs = data_array.attrs rows, cols = da.compute(rows, cols) # origin is in the south-west corner, so we flip the rows (applying # offset of 1 implicitly) # And we manually offset the columns by 1 too: rows = (LI_GRID_SHAPE[0] - rows.astype(int)) cols = cols.astype(int) - 1 # Create an empyt 1-D array for the results flattened_result = np.nan * da.zeros((LI_GRID_SHAPE[0] * LI_GRID_SHAPE[1]), dtype=data_array.dtype) # Insert the data. Dask doesn't support this for more than one dimension at a time, so ... flattened_result[rows * LI_GRID_SHAPE[0] + cols] = data_array # ... reshape to final 2D grid data_2d = da.reshape(flattened_result, LI_GRID_SHAPE) xarr = xr.DataArray(da.asarray(data_2d, CHUNK_SIZE), dims=("y", "x")) xarr.attrs = attrs return xarr