# -*- coding: utf-8 -*-
# Copyright (c) 2017 - 2022 Satpy developers
#
# This file is part of Satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Reader for the GHRSST level-2 formatted data."""
import datetime as dt
import os
import tarfile
from contextlib import suppress
from functools import cached_property
import xarray as xr
from satpy.readers.file_handlers import BaseFileHandler
from satpy.utils import get_legacy_chunk_size
CHUNK_SIZE = get_legacy_chunk_size()
[docs]
class GHRSSTL2FileHandler(BaseFileHandler):
"""File handler for GHRSST L2 netCDF files."""
def __init__(self, filename, filename_info, filetype_info, engine=None):
"""Initialize the file handler for GHRSST L2 netCDF data."""
super().__init__(filename, filename_info, filetype_info)
self._engine = engine
self._tarfile = None
self.filename_info["start_time"] = dt.datetime.strptime(
self.nc.start_time, "%Y%m%dT%H%M%SZ")
self.filename_info["end_time"] = dt.datetime.strptime(
self.nc.stop_time, "%Y%m%dT%H%M%SZ")
@cached_property
def nc(self):
"""Get the xarray Dataset for the filename."""
if os.fspath(self.filename).endswith("tar"):
file_obj = self._open_tarfile()
else:
file_obj = self.filename
nc = xr.open_dataset(file_obj,
decode_cf=True,
mask_and_scale=True,
engine=self._engine,
chunks={"ni": CHUNK_SIZE,
"nj": CHUNK_SIZE})
return nc.rename({"ni": "x", "nj": "y"})
[docs]
def _open_tarfile(self):
self._tarfile = tarfile.open(name=self.filename, mode="r")
sst_filename = next((name for name in self._tarfile.getnames()
if self._is_sst_file(name)))
file_obj = self._tarfile.extractfile(sst_filename)
return file_obj
[docs]
@staticmethod
def _is_sst_file(name):
"""Check if file in the tar archive is a valid SST file."""
return name.endswith("nc") and "GHRSST-SSTskin" in name
[docs]
def get_dataset(self, key, info):
"""Get any available dataset."""
stdname = info.get("standard_name")
return self.nc[stdname].squeeze()
@property
def start_time(self):
"""Get start time."""
return self.filename_info["start_time"]
@property
def end_time(self):
"""Get end time."""
return self.filename_info["end_time"]
@property
def sensor(self):
"""Get the sensor name."""
return self.nc.attrs["sensor"].lower()
def __del__(self):
"""Close the tarfile object."""
with suppress(AttributeError):
self._tarfile.close()