Source code for satpy.readers.hsaf_grib

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2019.
# This file is part of satpy.
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <>.

"""A reader for files produced by the Hydrology SAF.

Currently this reader depends on the `pygrib` python package. The `eccodes`
package from ECMWF is preferred, but does not support python 3 at the time
of writing.


import datetime as dt
import logging

import dask.array as da
import numpy as np
import pygrib
import xarray as xr
from pyresample import geometry

from satpy.readers.file_handlers import BaseFileHandler
from satpy.utils import get_legacy_chunk_size

LOG = logging.getLogger(__name__)

CHUNK_SIZE = get_legacy_chunk_size()

    "none": "1",

[docs] class HSAFFileHandler(BaseFileHandler): """File handler for HSAF grib files.""" def __init__(self, filename, filename_info, filetype_info): """Init the file handler.""" super(HSAFFileHandler, self).__init__(filename, filename_info, filetype_info) self._msg_datasets = {} self._start_time = None self._end_time = None try: with as grib_file: first_msg = grib_file.message(1) analysis_time = self._get_datetime(first_msg) self._analysis_time = analysis_time self.metadata = self.get_metadata(first_msg) except (RuntimeError, KeyError): raise IOError("Unknown GRIB file format: {}".format(self.filename))
[docs] @staticmethod def _get_datetime(msg): dtstr = str(msg["dataDate"]) + str(msg["dataTime"]).zfill(4) return dt.datetime.strptime(dtstr, "%Y%m%d%H%M")
@property def analysis_time(self): """Get validity time of this file.""" return self._analysis_time
[docs] def get_metadata(self, msg): """Get the metadata.""" try: center_description = msg["centreDescription"] except (RuntimeError, KeyError): center_description = None ds_info = { "filename": self.filename, "shortName": msg["shortName"], "long_name": msg["name"], "units": msg["units"], "centreDescription": center_description, "data_time": self._analysis_time, "nx": msg["Nx"], "ny": msg["Ny"], "projparams": msg.projparams } return ds_info
[docs] def get_area_def(self, dsid): """Get area definition for message.""" msg = self._get_message(1) try: return self._get_area_def(msg) except (RuntimeError, KeyError): raise RuntimeError("Unknown GRIB projection information")
[docs] def _get_area_def(self, msg): """Get the area definition of the datasets in the file.""" proj_param = msg.projparams.copy() Rx = 2 * np.arcsin(1. / msg["NrInRadiusOfEarth"]) / msg["dx"] Ry = 2 * np.arcsin(1. / msg["NrInRadiusOfEarth"]) / msg["dy"] x_0 = - msg["XpInGridLengths"] x_1 = msg["Nx"] - msg["XpInGridLengths"] y_0 = (msg["Ny"] - msg["YpInGridLengths"]) * -1 y_1 = msg["YpInGridLengths"] min_x = (x_0 * Rx) * proj_param["h"] max_x = (x_1 * Rx) * proj_param["h"] min_y = (y_0 * Ry) * proj_param["h"] max_y = (y_1 * Ry) * proj_param["h"] area_extent = (min_x, min_y, max_x, max_y) area = geometry.AreaDefinition("hsaf_region", "A region from H-SAF", "geos", proj_param, msg["Nx"], msg["Ny"], area_extent) return area
[docs] def _get_message(self, idx): with as grib_file: msg = grib_file.message(idx) return msg
[docs] def get_dataset(self, ds_id, ds_info): """Read a GRIB message into an xarray DataArray.""" if (ds_id["name"] not in self.filename): raise IOError("File does not contain {} data".format(ds_id["name"])) msg = self._get_message(1) ds_info = self.get_metadata(msg) ds_info["end_time"] = ds_info["data_time"] if (ds_id["name"] == "h05" or ds_id["name"] == "h05B"): flen = len(self.filename) timedelt = self.filename[flen-10:flen-8] ds_info["start_time"] = (ds_info["end_time"] - dt.timedelta(hours=int(timedelt))) else: ds_info["start_time"] = ds_info["end_time"] fill = msg["missingValue"] data = msg.values.astype(np.float32) if msg.valid_key("jScansPositively") and msg["jScansPositively"] == 1: data = data[::-1] if isinstance(data, data = data.filled(np.nan) data = da.from_array(data, chunks=CHUNK_SIZE) else: data[data == fill] = np.nan data = da.from_array(data, chunks=CHUNK_SIZE) return xr.DataArray(data, attrs=ds_info, dims=("y", "x"))