#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2025- Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""NWC SAF GEO v2021 HRW.
This module contains the :class:`NWCSAFGEOHRWFileHandler` file handler for the High Resolution Winds (HRW)
data produced with NWC SAF GEO.
There are 37 different quantities in the files, and predicted trajectories for each
observation. Reading of the trajectory data are not currently supported.
The data in the files are grouped separately for each original imaging channel, which there are up to
seven. The number of channels depends on the configuration of NWC SAF GEO software.
By default the reader treats each imaging channel separately, and thus lists
7 (channels) * 37 (variables) = 259 distinct datasets::
import pprint
from satpy import Scene
filename = "S_NWC_HRW_MSG3_MSG-N-BS_20250206T130000Z.nc"
scn = Scene(reader="nwcsaf-geo", filenames=[filename])
pprint.pprint(scn.available_dataset_names())
This prints all the available datasets. The truncated output of this is::
['wind_hrvis_air_pressure',
'wind_hrvis_air_pressure_correction',
'wind_hrvis_air_pressure_error',
'wind_hrvis_air_pressure_nwp_at_best_fit_level',
'wind_hrvis_air_temperature',
...
'wind_wv073_wind_speed',
'wind_wv073_wind_speed_difference_nwp_at_amv_level',
'wind_wv073_wind_speed_difference_nwp_at_best_fit_level',
'wind_wv073_wind_speed_nwp_at_amv_level',
'wind_wv073_wind_speed_nwp_at_best_fit_level']
The channel name is used as a prefix for the datasets.
It is also possible to merge all of these channel-by-channel datasets with a reader key-word argument::
scn = Scene(reader="nwcsaf-geo", filenames=[filename], reader_kwargs={"merge_channels": True})
pprint.pprint(scn.available_dataset_names())
Full list of the printed datasets is::
['air_pressure',
'air_pressure_correction',
'air_pressure_error',
'air_pressure_nwp_at_best_fit_level',
'air_temperature',
'cloud_type',
'correlation',
'correlation_test',
'height_assignment_method',
'latitude',
'latitude_increment',
'longitude',
'longitude_increment',
'number_of_winds',
'orographic_index',
'previous_wind_idx',
'quality_index_iwwg_value',
'quality_index_with_forecast',
'quality_index_without_forecast',
'quality_test',
'segment_x',
'segment_x_pix',
'segment_y',
'segment_y_pix',
'tracer_correlation_method',
'tracer_type',
'wind_from_direction',
'wind_from_direction_difference_nwp_at_amv_level',
'wind_from_direction_difference_nwp_at_best_fit_level',
'wind_from_direction_nwp_at_amv_level',
'wind_from_direction_nwp_at_best_fit_level',
'wind_idx',
'wind_speed',
'wind_speed_difference_nwp_at_amv_level',
'wind_speed_difference_nwp_at_best_fit_level',
'wind_speed_nwp_at_amv_level',
'wind_speed_nwp_at_best_fit_level']
"""
import datetime as dt
import logging
from contextlib import suppress
import dask.array as da
import h5py
import numpy as np
import xarray as xr
from satpy.readers.core.file_handlers import BaseFileHandler
from satpy.readers.nwcsaf_nc import PLATFORM_NAMES, SENSOR, read_nwcsaf_time
from satpy.utils import get_chunk_size_limit
logger = logging.getLogger(__name__)
CHUNK_SIZE = get_chunk_size_limit()
WIND_CHANNELS = [
"wind_hrvis",
"wind_ir108",
"wind_ir120",
"wind_vis06",
"wind_vis08",
"wind_wv062",
"wind_wv073",
]
# Source: NWC/CDOP3/GEO/AEMET/SW/DOF
DATASET_UNITS = {
"air_pressure": "Pa",
"air_pressure_correction": "Pa",
"air_pressure_error": "Pa",
"air_pressure_nwp_at_best_fit_level": "Pa",
"air_temperature": "K",
"cloud_type": "1",
"correlation": "%",
"correlation_test": "1",
"height_assignment_method": "1",
"latitude": "degree_north",
"latitude_increment": "degree_north",
"longitude": "degree_east",
"longitude_increment": "degree_east",
"number_of_winds": "1",
"orographic_index": "1",
"previous_wind_idx": "1",
"quality_index_iwwg_value": "%",
"quality_index_with_forecast": "%",
"quality_index_without_forecast": "%",
"quality_test": "1",
"segment_x": "m",
"segment_x_pix": "1",
"segment_y": "m",
"segment_y_pix": "1",
"tracer_correlation_method": "1",
"tracer_type": "1",
"wind_from_direction": "degree",
"wind_from_direction_difference_nwp_at_amv_level": "degree",
"wind_from_direction_difference_nwp_at_best_fit_level": "degree",
"wind_from_direction_nwp_at_amv_level": "degree",
"wind_from_direction_nwp_at_best_fit_level": "degree",
"wind_idx": "1",
"wind_speed": "m/s",
"wind_speed_difference_nwp_at_amv_level": "m/s",
"wind_speed_difference_nwp_at_best_fit_level": "m/s",
"wind_speed_nwp_at_amv_level": "m/s",
"wind_speed_nwp_at_best_fit_level": "m/s"
}
[docs]
class NWCSAFGEOHRWFileHandler(BaseFileHandler):
"""A file handler class for NWC SAF GEO HRW files."""
[docs]
def __init__(self, filename, filename_info, filetype_info, merge_channels=False):
"""Initialize the file handler."""
super().__init__(filename, filename_info, filetype_info)
self.h5f = h5py.File(self.filename, "r")
self.filename_info = filename_info
self.filetype_info = filetype_info
self.merge_channels = merge_channels
self.platform_name = PLATFORM_NAMES.get(self.h5f.attrs["satellite_identifier"].astype(str))
self.sensor = SENSOR.get(self.platform_name, "seviri")
self.lons = {}
self.lats = {}
# Imaging period, which is set after reading any data, and used to calculate end time
self.period = None
# The resolution is given in kilometers, convert to meters
self.resolution = 1000 * self.h5f.attrs["spatial_resolution"].item()
def __del__(self):
"""Close file handlers when we are done."""
with suppress(OSError):
self.h5f.close()
[docs]
def available_datasets(self, configured_datasets=None):
"""Form the names for the available datasets."""
for channel in WIND_CHANNELS:
prefix = self._get_channel_prefix(channel)
dset = self.h5f[channel]
for measurand in dset.dtype.fields.keys():
if measurand == "trajectory":
continue
ds_info = self._measurand_ds_info(prefix, measurand)
yield True, ds_info
if self.merge_channels:
break
[docs]
def _get_channel_prefix(self, channel):
if self.merge_channels:
return ""
return channel + "_"
[docs]
def _measurand_ds_info(self, prefix, measurand):
ds_info = {
"file_type": self.filetype_info["file_type"],
"resolution": self.resolution,
"name": prefix + measurand,
}
if measurand not in ("longitude", "latitude"):
ds_info["coordinates"] = (prefix + "longitude", prefix + "latitude")
if measurand == "longitude":
ds_info["standard_name"] = "longitude"
if measurand == "latitude":
ds_info["standard_name"] = "latitude"
return ds_info
[docs]
def get_dataset(self, key, info):
"""Load a dataset."""
logger.debug("Reading %s.", key["name"])
if self.merge_channels:
data = self._read_merged_dataset(key)
else:
data = self._read_dataset(key)
data.attrs.update(info)
return data
[docs]
def _read_merged_dataset(self, dataset_key):
"""Read a dataset merged from every channel."""
dataset_name = dataset_key["name"]
data = []
collect_coords = True
if "merged" in self.lons:
collect_coords = False
for channel in WIND_CHANNELS:
if collect_coords:
self._read_channel_coordinates(channel)
self._append_merged_coordinates(channel)
if self.period is None:
self.period = self.h5f[channel].attrs["time_period"].item()
try:
data.append(self.h5f[channel][dataset_name])
except ValueError:
logger.warning("Reading %s is not supported.", dataset_name)
units = DATASET_UNITS[dataset_name]
return self._create_xarray(
data,
dataset_name,
units,
"merged"
)
[docs]
def _append_merged_coordinates(self, channel):
if "merged" not in self.lons:
self.lons["merged"] = []
self.lats["merged"] = []
self.lons["merged"].append(self.lons[channel])
self.lats["merged"].append(self.lats[channel])
[docs]
def _create_xarray(self, data, dataset_name, units, channel):
lons = self.lons[channel]
lats = self.lats[channel]
prefix = channel + "_"
if channel == "merged":
data = np.concat(data)
lons = np.concat(lons)
lats = np.concat(lats)
prefix = ""
xr_data = xr.DataArray(da.from_array(data, chunks=CHUNK_SIZE),
name=dataset_name,
dims=["y"])
xr_data[prefix + "longitude"] = ("y", lons)
xr_data[prefix + "latitude"] = ("y", lats)
xr_data.attrs["units"] = units
return xr_data
[docs]
def _read_dataset(self, dataset_key):
"""Read a dataset."""
dataset_name = dataset_key["name"]
key_parts = dataset_name.split("_")
channel = "_".join(key_parts[:2])
self._read_channel_coordinates(channel)
if self.period is None:
self.period = self.h5f[channel].attrs["time_period"].item()
measurand = "_".join(key_parts[2:])
try:
data = self.h5f[channel][measurand]
except ValueError:
logger.warning("Reading %s is not supported.", dataset_name)
units = DATASET_UNITS[measurand]
return self._create_xarray(
data, dataset_name, units, channel)
[docs]
def _read_channel_coordinates(self, channel):
if channel not in self.lons:
self.lons[channel] = self.h5f[channel]["longitude"]
self.lats[channel] = self.h5f[channel]["latitude"]
@property
def start_time(self):
"""Get the start time."""
return read_nwcsaf_time(self.h5f.attrs["nominal_product_time"])
@property
def end_time(self):
"""Get the end time."""
if self.period is None:
return self.start_time
return self.start_time + dt.timedelta(minutes=self.period)