#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2017-2019 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Interface to GEOCAT HDF4 or NetCDF4 products.
Note: GEOCAT files do not currently have projection information or precise
pixel resolution information. Additionally the longitude and latitude arrays
are stored as 16-bit integers which causes loss of precision. For this reason
the lon/lats can't be used as a reliable coordinate system to calculate the
projection X/Y coordinates.
Until GEOCAT adds projection information and X/Y coordinate arrays, this
reader will estimate the geostationary area the best it can. It currently
takes a single lon/lat point as reference and uses hardcoded resolution
and projection information to calculate the area extents.
"""
from __future__ import annotations
import logging
import numpy as np
from pyproj import Proj
from pyresample import geometry
from satpy.readers.netcdf_utils import NetCDF4FileHandler, netCDF4
LOG = logging.getLogger(__name__)
CF_UNITS = {
"none": "1",
}
# GEOCAT currently doesn't include projection information in it's files
GEO_PROJS = {
"GOES-16": "+proj=geos +lon_0={lon_0:0.02f} +h=35786023.0 +a=6378137.0 +b=6356752.31414 +sweep=x +units=m +no_defs",
"GOES-17": "+proj=geos +lon_0={lon_0:0.02f} +h=35786023.0 +a=6378137.0 +b=6356752.31414 +sweep=x +units=m +no_defs",
"HIMAWARI-8": "+proj=geos +over +lon_0=140.7 +h=35785863 +a=6378137 +b=6356752.299581327 +units=m +no_defs",
}
[docs]
class GEOCATFileHandler(NetCDF4FileHandler):
"""GEOCAT netCDF4 file handler.
**Loading data with decode_times=True**
By default, this reader will use ``xarray_kwargs={"engine": "netcdf4", "decode_times": False}``.
to match behavior of xarray when the geocat reader was first written. To use different options
use reader_kwargs when loading the Scene::
scene = satpy.Scene(filenames,
reader='geocat',
reader_kwargs={'xarray_kwargs': {'engine': 'netcdf4', 'decode_times': True}})
"""
def __init__(self, filename, filename_info, filetype_info,
**kwargs):
"""Open and perform initial investigation of NetCDF file."""
kwargs.setdefault("xarray_kwargs", {}).setdefault(
"engine", "netcdf4")
kwargs.setdefault("xarray_kwargs", {}).setdefault(
"decode_times", False)
super(GEOCATFileHandler, self).__init__(
filename, filename_info, filetype_info,
xarray_kwargs=kwargs["xarray_kwargs"])
sensors = {
"goes": "goes_imager",
"himawari8": "ahi",
"goes16": "abi", # untested
"goesr": "abi", # untested
}
platforms: dict[str, str] = {
}
resolutions = {
"abi": {
1: 1002.0086577437705,
2: 2004.0173154875411,
},
"ahi": {
1: 999.9999820317674, # assumption
2: 1999.999964063535,
4: 3999.99992812707,
}
}
[docs]
def get_sensor(self, sensor):
"""Get sensor."""
last_resort = None
for k, v in self.sensors.items():
if k == sensor:
return v
if k in sensor:
last_resort = v
if last_resort:
return last_resort
raise ValueError("Unknown sensor '{}'".format(sensor))
[docs]
def _get_proj(self, platform, ref_lon):
if platform == "GOES-16" and -76. < ref_lon < -74.:
# geocat file holds the *actual* subsatellite point, not the
# projection (-75.2 actual versus -75 projection)
ref_lon = -75.
return GEO_PROJS[platform].format(lon_0=ref_lon)
@property
def sensor_names(self):
"""Get sensor names."""
return [self.get_sensor(self["/attr/Sensor_Name"])]
@property
def start_time(self):
"""Get start time."""
return self.filename_info["start_time"]
@property
def end_time(self):
"""Get end time."""
return self.filename_info.get("end_time", self.start_time)
@property
def is_geo(self):
"""Check platform."""
platform = self.get_platform(self["/attr/Platform_Name"])
return platform in GEO_PROJS
@property
def resolution(self):
"""Get resolution."""
elem_res = self["/attr/Element_Resolution"]
return int(elem_res * 1000)
[docs]
def _calc_area_resolution(self, ds_res):
elem_res = round(ds_res / 1000.) # mimic 'Element_Resolution' attribute from above
sensor = self.get_sensor(self["/attr/Sensor_Name"])
return self.resolutions.get(sensor, {}).get(int(elem_res),
elem_res * 1000.)
[docs]
def available_datasets(self, configured_datasets=None):
"""Update information for or add datasets provided by this file.
If this file handler can load a dataset then it will supplement the
dataset info with the resolution and possibly coordinate datasets
needed to load it. Otherwise it will continue passing the dataset
information down the chain.
See
:meth:`satpy.readers.file_handlers.BaseFileHandler.available_datasets`
for details.
"""
res = self.resolution
coordinates = ("pixel_longitude", "pixel_latitude")
handled_variables = set()
# update previously configured datasets
for is_avail, ds_info in (configured_datasets or []):
this_res = ds_info.get("resolution")
this_coords = ds_info.get("coordinates")
# some other file handler knows how to load this
if is_avail is not None:
yield is_avail, ds_info
var_name = ds_info.get("file_key", ds_info["name"])
matches = self.file_type_matches(ds_info["file_type"])
# we can confidently say that we can provide this dataset and can
# provide more info
if matches and var_name in self and this_res != res:
handled_variables.add(var_name)
new_info = ds_info.copy() # don't mess up the above yielded
new_info["resolution"] = res
if not self.is_geo and this_coords is None:
new_info["coordinates"] = coordinates
yield True, new_info
elif is_avail is None:
# if we didn't know how to handle this dataset and no one else did
# then we should keep it going down the chain
yield is_avail, ds_info
# Provide new datasets
for var_name, val in self.file_content.items():
if var_name in handled_variables:
continue
if isinstance(val, netCDF4.Variable):
ds_info = {
"file_type": self.filetype_info["file_type"],
"resolution": res,
"name": var_name,
}
if not self.is_geo:
ds_info["coordinates"] = coordinates
yield True, ds_info
[docs]
def get_shape(self, dataset_id, ds_info):
"""Get shape."""
var_name = ds_info.get("file_key", dataset_id["name"])
return self[var_name + "/shape"]
[docs]
def _first_good_nav(self, lon_arr, lat_arr):
if hasattr(lon_arr, "mask"):
good_indexes = np.nonzero(~lon_arr.mask)
else:
# no masked values found in auto maskandscale
good_indexes = ([0], [0])
# nonzero returns (<ndarray of row indexes>, <ndarray of col indexes>)
return tuple(x[0] for x in good_indexes)
[docs]
def _get_extents(self, proj, res, lon_arr, lat_arr):
p = Proj(proj)
res = float(res)
first_good = self._first_good_nav(lon_arr, lat_arr)
one_x, one_y = p(lon_arr[first_good], lat_arr[first_good])
left_x = one_x - res * first_good[1]
right_x = left_x + res * lon_arr.shape[1]
top_y = one_y + res * first_good[0]
bot_y = top_y - res * lon_arr.shape[0]
half_x = res / 2.
half_y = res / 2.
return (left_x - half_x,
bot_y - half_y,
right_x + half_x,
top_y + half_y)
[docs]
def _load_nav(self, name):
nav = self[name]
factor = self[name + "/attr/scale_factor"]
offset = self[name + "/attr/add_offset"]
fill = self[name + "/attr/_FillValue"]
nav = nav[:]
mask = nav == fill
nav = np.ma.masked_array(nav * factor + offset, mask=mask)
return nav[:]
[docs]
def get_area_def(self, dsid):
"""Get area definition."""
if not self.is_geo:
raise NotImplementedError("Don't know how to get the Area Definition for this file")
platform = self.get_platform(self["/attr/Platform_Name"])
res = self._calc_area_resolution(dsid["resolution"])
proj = self._get_proj(platform, float(self["/attr/Subsatellite_Longitude"]))
area_name = "{} {} Area at {}m".format(
platform,
self.metadata.get("sector_id", ""),
int(res))
lon = self._load_nav("pixel_longitude")
lat = self._load_nav("pixel_latitude")
extents = self._get_extents(proj, res, lon, lat)
area_def = geometry.AreaDefinition(
area_name,
area_name,
area_name,
proj,
lon.shape[1],
lon.shape[0],
area_extent=extents,
)
return area_def
[docs]
def get_dataset(self, dataset_id, ds_info):
"""Get dataset."""
var_name = ds_info.get("file_key", dataset_id["name"])
# FUTURE: Metadata retrieval may be separate
info = self.get_metadata(dataset_id, ds_info)
data = self[var_name]
fill = self[var_name + "/attr/_FillValue"]
factor = self.get(var_name + "/attr/scale_factor")
offset = self.get(var_name + "/attr/add_offset")
valid_range = self.get(var_name + "/attr/valid_range")
data = data.where(data != fill)
if valid_range is not None:
data = data.where((data >= valid_range[0]) & (data <= valid_range[1]))
if factor is not None and offset is not None:
data = data * factor + offset
data.attrs.update(info)
data = data.rename({"lines": "y", "elements": "x"})
return data