Source code for satpy.readers.maia

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2017 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Reader for NWPSAF AAPP MAIA Cloud product.

https://nwpsaf.eu/site/software/aapp/

Documentation reference:

    [NWPSAF-MF-UD-003] DATA Formats
    [NWPSAF-MF-UD-009] MAIA version 4 Scientific User Manual

"""
import logging

import dask.array as da
import h5py
import numpy as np
from xarray import DataArray

from satpy.readers.file_handlers import BaseFileHandler
from satpy.utils import get_legacy_chunk_size

logger = logging.getLogger(__name__)
CHUNK_SIZE = get_legacy_chunk_size()


[docs] class MAIAFileHandler(BaseFileHandler): """File handler for Maia files.""" def __init__(self, filename, filename_info, filetype_info): """Init the file handler.""" super(MAIAFileHandler, self).__init__( filename, filename_info, filetype_info) self.finfo = filename_info # set the day date part for end_time from the file name self.finfo["end_time"] = self.finfo["end_time"].replace( year=self.finfo["start_time"].year, month=self.finfo["start_time"].month, day=self.finfo["start_time"].day) if self.finfo["end_time"] < self.finfo["start_time"]: myday = self.finfo["end_time"].day self.finfo["end_time"] = self.finfo["end_time"].replace( day=myday + 1) self.selected = None self.read(self.filename)
[docs] def read(self, filename): """Read the file.""" self.h5 = h5py.File(filename, "r") missing = -9999. self.Lat = da.from_array(self.h5[u"DATA/Latitude"], chunks=CHUNK_SIZE) / 10000. self.Lon = da.from_array(self.h5[u"DATA/Longitude"], chunks=CHUNK_SIZE) / 10000. self.selected = (self.Lon > missing) self.file_content = {} for key in self.h5["DATA"].keys(): self.file_content[key] = da.from_array(self.h5[u"DATA/" + key], chunks=CHUNK_SIZE) for key in self.h5[u"HEADER"].keys(): self.file_content[key] = self.h5[u"HEADER/" + key][:] # Cloud Mask on pixel mask = 2**0 + 2**1 + 2**2 lst = self.file_content[u"CloudMask"] & mask lst = lst / 2**0 self.file_content[u"cma"] = lst # Cloud Mask confidence mask = 2**5 + 2**6 lst = self.file_content[u"CloudMask"] & mask lst = lst / 2**5 self.file_content[u"cma_conf"] = lst # Cloud Mask Quality mask = 2**3 + 2**4 lst = self.file_content[u"CloudMask"] & mask lst = lst / 2**3 self.file_content[u"cma_qual"] = lst # Opaque Cloud mask = 2**21 lst = self.file_content[u"CloudMask"] & mask lst = lst / 2**21 self.file_content[u"opaq_cloud"] = lst # land /water Background mask = 2**15 + 2**16 + 2**17 lst = self.file_content[u"CloudMask"] & mask lst = lst / 2**15 self.file_content[u"land_water_background"] = lst # CT (Actual CloudType) mask = 2**4 + 2**5 + 2**6 + 2**7 + 2**8 classif = self.file_content[u"CloudType"] & mask classif = classif / 2**4 self.file_content["ct"] = classif.astype(np.uint8)
[docs] def get_platform(self, platform): """Get the platform.""" if self.file_content["sat_id"] in (14,): return "viirs" else: return "avhrr"
@property def start_time(self): """Get the start time.""" return self.finfo["start_time"] @property def end_time(self): """Get the end time.""" return self.finfo["end_time"]
[docs] def get_dataset(self, key, info, out=None): """Get a dataset from the file.""" logger.debug("Reading %s.", key["name"]) values = self.file_content[key["name"]] selected = np.array(self.selected) if key["name"] in ("Latitude", "Longitude"): values = values / 10000. if key["name"] in ("Tsurf", "CloudTopPres", "CloudTopTemp"): goods = values > -9998. selected = np.array(selected & goods) if key["name"] in ("Tsurf", "Alt_surface", "CloudTopTemp"): values = values / 100. if key["name"] in ("CloudTopPres"): values = values / 10. else: selected = self.selected info.update(self.finfo) fill_value = np.nan if key["name"] == "ct": fill_value = 0 info["_FillValue"] = 0 ds = DataArray(values, dims=["y", "x"], attrs=info).where(selected, fill_value) # update dataset info with file_info return ds