#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2017 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Reader for NWPSAF AAPP MAIA Cloud product.
https://nwpsaf.eu/site/software/aapp/
Documentation reference:
[NWPSAF-MF-UD-003] DATA Formats
[NWPSAF-MF-UD-009] MAIA version 4 Scientific User Manual
"""
import logging
import dask.array as da
import h5py
import numpy as np
from xarray import DataArray
from satpy.readers.file_handlers import BaseFileHandler
from satpy.utils import get_legacy_chunk_size
logger = logging.getLogger(__name__)
CHUNK_SIZE = get_legacy_chunk_size()
[docs]
class MAIAFileHandler(BaseFileHandler):
"""File handler for Maia files."""
def __init__(self, filename, filename_info, filetype_info):
"""Init the file handler."""
super(MAIAFileHandler, self).__init__(
filename, filename_info, filetype_info)
self.finfo = filename_info
# set the day date part for end_time from the file name
self.finfo["end_time"] = self.finfo["end_time"].replace(
year=self.finfo["start_time"].year,
month=self.finfo["start_time"].month,
day=self.finfo["start_time"].day)
if self.finfo["end_time"] < self.finfo["start_time"]:
myday = self.finfo["end_time"].day
self.finfo["end_time"] = self.finfo["end_time"].replace(
day=myday + 1)
self.selected = None
self.read(self.filename)
[docs]
def read(self, filename):
"""Read the file."""
self.h5 = h5py.File(filename, "r")
missing = -9999.
self.Lat = da.from_array(self.h5[u"DATA/Latitude"], chunks=CHUNK_SIZE) / 10000.
self.Lon = da.from_array(self.h5[u"DATA/Longitude"], chunks=CHUNK_SIZE) / 10000.
self.selected = (self.Lon > missing)
self.file_content = {}
for key in self.h5["DATA"].keys():
self.file_content[key] = da.from_array(self.h5[u"DATA/" + key], chunks=CHUNK_SIZE)
for key in self.h5[u"HEADER"].keys():
self.file_content[key] = self.h5[u"HEADER/" + key][:]
# Cloud Mask on pixel
mask = 2**0 + 2**1 + 2**2
lst = self.file_content[u"CloudMask"] & mask
lst = lst / 2**0
self.file_content[u"cma"] = lst
# Cloud Mask confidence
mask = 2**5 + 2**6
lst = self.file_content[u"CloudMask"] & mask
lst = lst / 2**5
self.file_content[u"cma_conf"] = lst
# Cloud Mask Quality
mask = 2**3 + 2**4
lst = self.file_content[u"CloudMask"] & mask
lst = lst / 2**3
self.file_content[u"cma_qual"] = lst
# Opaque Cloud
mask = 2**21
lst = self.file_content[u"CloudMask"] & mask
lst = lst / 2**21
self.file_content[u"opaq_cloud"] = lst
# land /water Background
mask = 2**15 + 2**16 + 2**17
lst = self.file_content[u"CloudMask"] & mask
lst = lst / 2**15
self.file_content[u"land_water_background"] = lst
# CT (Actual CloudType)
mask = 2**4 + 2**5 + 2**6 + 2**7 + 2**8
classif = self.file_content[u"CloudType"] & mask
classif = classif / 2**4
self.file_content["ct"] = classif.astype(np.uint8)
@property
def start_time(self):
"""Get the start time."""
return self.finfo["start_time"]
@property
def end_time(self):
"""Get the end time."""
return self.finfo["end_time"]
[docs]
def get_dataset(self, key, info, out=None):
"""Get a dataset from the file."""
logger.debug("Reading %s.", key["name"])
values = self.file_content[key["name"]]
selected = np.array(self.selected)
if key["name"] in ("Latitude", "Longitude"):
values = values / 10000.
if key["name"] in ("Tsurf", "CloudTopPres", "CloudTopTemp"):
goods = values > -9998.
selected = np.array(selected & goods)
if key["name"] in ("Tsurf", "Alt_surface", "CloudTopTemp"):
values = values / 100.
if key["name"] in ("CloudTopPres"):
values = values / 10.
else:
selected = self.selected
info.update(self.finfo)
fill_value = np.nan
if key["name"] == "ct":
fill_value = 0
info["_FillValue"] = 0
ds = DataArray(values, dims=["y", "x"], attrs=info).where(selected, fill_value)
# update dataset info with file_info
return ds