Source code for satpy.readers.smos_l2_wind

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2020 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.

"""SMOS L2 wind Reader.

Data can be found here after register:
https://www.smosstorm.org/Data2/SMOS-NRT-wind-Products-access
Format documentation at the same site after register:
SMOS_WIND_DS_PDD_20191107_signed.pdf
"""

import logging
from datetime import datetime

import numpy as np
from pyresample.geometry import AreaDefinition

from satpy.readers.netcdf_utils import NetCDF4FileHandler, netCDF4

logger = logging.getLogger(__name__)


[docs] class SMOSL2WINDFileHandler(NetCDF4FileHandler): """File handler for SMOS L2 wind netCDF files.""" @property def start_time(self): """Get start time.""" return datetime.strptime(self["/attr/time_coverage_start"], "%Y-%m-%dT%H:%M:%S Z") @property def end_time(self): """Get end time.""" return datetime.strptime(self["/attr/time_coverage_end"], "%Y-%m-%dT%H:%M:%S Z") @property def platform_shortname(self): """Get platform shortname.""" return self.filename_info["platform_shortname"] @property def platform_name(self): """Get platform.""" return self["/attr/platform"]
[docs] def get_metadata(self, data, ds_info): """Get metadata.""" metadata = {} metadata.update(data.attrs) metadata.update(ds_info) metadata.update({ "platform_shortname": self.platform_shortname, "platform_name": self.platform_name, "sensor": self["/attr/instrument"], "start_time": self.start_time, "end_time": self.end_time, "level": self["/attr/processing_level"], }) return metadata
[docs] def available_datasets(self, configured_datasets=None): """Automatically determine datasets provided by this file.""" handled_variables = set() # Iterate over dataset contents for var_name, val in self.file_content.items(): # Only evaluate variables if not isinstance(val, netCDF4.Variable): continue if (var_name in handled_variables): logger.debug("Already handled, skipping: %s", var_name) continue handled_variables.add(var_name) new_info = { "name": var_name, "file_type": self.filetype_info["file_type"], } yield True, new_info
[docs] def _mask_dataset(self, data): """Mask out fill values.""" try: fill = data.attrs["_FillValue"] data.attrs["_FillValue"] = np.nan return data.where(data != fill) except KeyError: return data
[docs] def _adjust_lon_coord(self, data): """Adjust lon coordinate to -180 .. 180 ( not 0 .. 360).""" data = data.assign_coords(lon=(((data.lon + 180) % 360) - 180)) return data.where(data < 180., data - 360.)
[docs] def _rename_coords(self, data): """Rename coords.""" rename_dict = {} if "lon" in data.dims: data = self._adjust_lon_coord(data) rename_dict["lon"] = "x" if "lat" in data.dims: rename_dict["lat"] = "y" # Rename the coordinates to x and y return data.rename(rename_dict)
[docs] def _remove_time_coordinate(self, data): """Remove time coordinate.""" # Remove dimension where size is 1, eg. time data = data.squeeze() # Remove if exists time as coordinate if "time" in data.coords: data = data.drop_vars("time") return data
[docs] def _roll_dataset_lon_coord(self, data): """Roll dataset along the lon coordinate.""" if "lon" in data.dims: data = data.roll(lon=720, roll_coords=True) return data
[docs] def get_dataset(self, ds_id, ds_info): """Get dataset.""" data = self[ds_id["name"]] data.attrs = self.get_metadata(data, ds_info) data = self._remove_time_coordinate(data) data = self._roll_dataset_lon_coord(data) data = self._rename_coords(data) data = self._mask_dataset(data) if len(data.dims) >= 2 and all([dim in data.dims for dim in ["x", "y"]]): # Remove the first and last row as these values extends beyond +-90 latitude # if the dataset contains the y dimmension. # As this is data over open sea these has no values. data = data.where((data.y > -90.0) & (data.y < 90.0), drop=True) elif len(data.dims) == 1 and "y" in data.dims: data = data.where((data.y > 0) & (data.y < len(data.y) - 1), drop=True) return data
[docs] def _create_area_extent(self, width, height): """Create area extent.""" # Creating a meshgrid, not needed actually, but makes it easy to find extremes _lon = self._adjust_lon_coord(self["lon"]) _lon = self._roll_dataset_lon_coord(_lon) latlon = np.meshgrid(_lon, self["lat"][1:self["lat/shape"][0] - 1]) lower_left_x = latlon[0][height - 1][0] - 0.125 lower_left_y = latlon[1][height - 1][0] + 0.125 upper_right_y = latlon[1][1][width - 1] - 0.125 upper_right_x = latlon[0][1][width - 1] + 0.125 return (lower_left_x, lower_left_y, upper_right_x, upper_right_y)
[docs] def get_area_def(self, dsid): """Define AreaDefintion.""" width = self["lon/shape"][0] height = self["lat/shape"][0] - 2 area_extent = self._create_area_extent(width, height) description = "SMOS L2 Wind Equirectangular Projection" area_id = "smos_eqc" proj_id = "equirectangular" proj_str = self["/attr/geospatial_bounds_vertical_crs"] area_def = AreaDefinition(area_id, description, proj_id, proj_str, width, height, area_extent, ) return area_def