Source code for satpy.readers.core.config

#!/usr/bin/env python
# Copyright (c) 2015-2025 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Reader configuration."""
from __future__ import annotations

import logging
import os
import warnings

import yaml
from yaml.loader import BaseLoader, FullLoader, UnsafeLoader

from satpy._config import config_search_paths, get_entry_points_config_dirs, glob_config
from satpy.readers.core.yaml_reader import load_yaml_configs as load_yaml_reader_configs

LOG = logging.getLogger(__name__)

# Old Name -> New Name
PENDING_OLD_READER_NAMES = {"fci_l1c_fdhsi": "fci_l1c_nc", "viirs_l2_cloud_mask_nc": "viirs_edr"}
OLD_READER_NAMES: dict[str, str] = {
    "slstr_l2": "ghrsst_l2",
}


[docs] def read_reader_config(config_files, loader=UnsafeLoader): """Read the reader `config_files` and return the extracted reader metadata.""" reader_config = load_yaml_reader_configs(*config_files, loader=loader) return reader_config["reader"]
[docs] def configs_for_reader(reader=None): """Generate reader configuration files for one or more readers. Args: reader (Optional[str]): Yield configs only for this reader Returns: Generator of lists of configuration files """ config_files = _get_configs(reader) for config_file in config_files: config_basename = os.path.basename(config_file) reader_name = os.path.splitext(config_basename)[0] paths = get_entry_points_config_dirs("satpy.readers") reader_configs = config_search_paths( os.path.join("readers", config_basename), search_dirs=paths, check_exists=True) if not reader_configs: # either the reader they asked for does not exist # or satpy is improperly configured and can't find its own readers raise ValueError("No reader named: {}".format(reader_name)) yield reader_configs
[docs] def _get_configs(reader): if reader is not None: if not isinstance(reader, (list, tuple)): reader = [reader] reader = get_valid_reader_names(reader) # given a config filename or reader name return [r if r.endswith(".yaml") else r + ".yaml" for r in reader] paths = get_entry_points_config_dirs("satpy.readers") reader_configs = glob_config(os.path.join("readers", "*.yaml"), search_dirs=paths) return set(reader_configs)
[docs] def available_readers( as_dict: bool = False, yaml_loader: type[BaseLoader] | type[FullLoader] | type[UnsafeLoader] = yaml.loader.UnsafeLoader, ) -> list[str] | list[dict]: """Available readers based on current configuration. Args: as_dict: Optionally return reader information as a dictionary. Default: False. yaml_loader: The yaml loader type. Default: ``yaml.loader.UnsafeLoader``. Returns: List of available reader names. If `as_dict` is `True` then a list of dictionaries including additionally reader information is returned. """ readers = [] for reader_configs in configs_for_reader(): try: reader_info = read_reader_config(reader_configs, loader=yaml_loader) except (KeyError, IOError, yaml.YAMLError): LOG.debug("Could not import reader config from: %s", reader_configs) LOG.debug("Error loading YAML", exc_info=True) continue readers.append(reader_info if as_dict else reader_info["name"]) if as_dict: readers = sorted(readers, key=lambda reader_info: reader_info["name"]) else: readers = sorted(readers) return readers
[docs] def get_valid_reader_names(reader): """Check for old reader names or readers pending deprecation.""" new_readers = [] for reader_name in reader: if reader_name in OLD_READER_NAMES: raise ValueError( "Reader name '{}' has been deprecated, " "use '{}' instead.".format(reader_name, OLD_READER_NAMES[reader_name])) if reader_name in PENDING_OLD_READER_NAMES: new_name = PENDING_OLD_READER_NAMES[reader_name] warnings.warn( "Reader name '{}' is being deprecated and will be removed soon." "Please use '{}' instead.".format(reader_name, new_name), FutureWarning, stacklevel=2 ) new_readers.append(new_name) else: new_readers.append(reader_name) return new_readers