#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2020 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Advanced Himawari Imager (AHI) gridded format data reader.
This data comes in a flat binary format on a fixed grid, and needs to have
calibration coefficients applied to it in order to retrieve reflectance or BT.
LUTs can be downloaded at: ftp://hmwr829gr.cr.chiba-u.ac.jp/gridded/FD/support/
This data is gridded from the original Himawari geometry. To our knowledge,
only full disk grids are available, not for the Meso or Japan rapid scans.
References:
- AHI gridded data website:
http://www.cr.chiba-u.jp/databases/GEO/H8_9/FD/index_jp.html
"""
import logging
import os
import dask.array as da
import numpy as np
import xarray as xr
from platformdirs import AppDirs
from pyresample import geometry
from satpy.readers.file_handlers import BaseFileHandler
from satpy.readers.utils import unzip_file
from satpy.utils import get_legacy_chunk_size
CHUNK_SIZE = get_legacy_chunk_size()
# Hardcoded address of the reflectance and BT look-up tables
AHI_REMOTE_LUTS = "http://www.cr.chiba-u.jp/databases/GEO/H8_9/FD/count2tbb_v102.tgz"
# Full disk image sizes for each spatial resolution
AHI_FULLDISK_SIZES = {0.005: {"x_size": 24000,
"y_size": 24000},
0.01: {"x_size": 12000,
"y_size": 12000},
0.02: {"x_size": 6000,
"y_size": 6000}}
# Geographic extent of the full disk area in degrees
AHI_FULLDISK_EXTENT = [85., -60., 205., 60.]
# Resolutions of each channel type
AHI_CHANNEL_RES = {"vis": 0.01,
"ext": 0.005,
"sir": 0.02,
"tir": 0.02}
# List of LUT filenames
AHI_LUT_NAMES = ["ext.01", "vis.01", "vis.02", "vis.03",
"sir.01", "sir.02", "tir.01", "tir.02",
"tir.03", "tir.04", "tir.05", "tir.06",
"tir.07", "tir.08", "tir.09", "tir.10"]
logger = logging.getLogger("ahi_grid")
[docs]
class AHIGriddedFileHandler(BaseFileHandler):
"""AHI gridded format reader.
This data is flat binary, big endian unsigned short.
It covers the region 85E -> 205E, 60N -> 60S at variable resolution:
- 0.005 degrees for Band 3
- 0.01 degrees for Bands 1, 2 and 4
- 0.02 degrees for all other bands.
These are approximately equivalent to 0.5, 1 and 2km.
Files can either be zipped with bz2 compression (like the HSD format
data), or can be uncompressed flat binary.
"""
def __init__(self, filename, filename_info, filetype_info):
"""Initialize the reader."""
super(AHIGriddedFileHandler, self).__init__(filename, filename_info,
filetype_info)
self._unzipped = unzip_file(self.filename)
# Assume file is not zipped
if self._unzipped:
# But if it is, set the filename to point to unzipped temp file
self.filename = self._unzipped
# Get the band name, needed for finding area and dimensions
self.product_name = filetype_info["file_type"]
self.areaname = filename_info["area"]
self.sensor = "ahi"
self.res = AHI_CHANNEL_RES[self.product_name[:3]]
if self.areaname == "fld":
self.nlines = AHI_FULLDISK_SIZES[self.res]["y_size"]
self.ncols = AHI_FULLDISK_SIZES[self.res]["x_size"]
else:
raise NotImplementedError("Only full disk data is supported.")
# Set up directory path for the LUTs
app_dirs = AppDirs("ahi_gridded_luts", "satpy", "1.0.2")
self.lut_dir = os.path.expanduser(app_dirs.user_data_dir) + "/"
self.area = None
def __del__(self):
"""Delete the object."""
if self._unzipped and os.path.exists(self.filename):
os.remove(self.filename)
[docs]
def _load_lut(self):
"""Determine if LUT is available and, if not, download it."""
# First, check that the LUT is available. If not, download it.
lut_file = self.lut_dir + self.product_name
if not os.path.exists(lut_file):
self._get_luts()
try:
# Load file, it has 2 columns: DN + Refl/BT. We only need latter.
lut = np.loadtxt(lut_file)[:, 1]
except FileNotFoundError:
raise FileNotFoundError("No LUT file found:", lut_file)
return lut
[docs]
def _calibrate(self, data):
"""Load calibration from LUT and apply."""
lut = self._load_lut()
# LUT may truncate NaN values, so manually set those in data
lut_len = len(lut)
data = np.where(data < lut_len - 1, data, np.nan)
return lut[data.astype(np.uint16)]
[docs]
@staticmethod
def _download_luts(file_name):
"""Download LUTs from remote server."""
import shutil
import urllib
# Set up an connection and download
with urllib.request.urlopen(AHI_REMOTE_LUTS) as response: # nosec
with open(file_name, "wb") as out_file:
shutil.copyfileobj(response, out_file)
[docs]
@staticmethod
def _untar_luts(tarred_file, outdir):
"""Uncompress downloaded LUTs, which are a tarball."""
import tarfile
tar = tarfile.open(tarred_file)
tar.extractall(outdir) # nosec
tar.close()
os.remove(tarred_file)
[docs]
def _get_luts(self):
"""Download the LUTs needed for count->Refl/BT conversion."""
import pathlib
import shutil
from satpy import config
# Check that the LUT directory exists
pathlib.Path(self.lut_dir).mkdir(parents=True, exist_ok=True)
logger.info("Download AHI LUTs files and store in directory %s",
self.lut_dir)
tempdir = config["tmp_dir"]
fname = os.path.join(tempdir, "tmp.tgz")
# Download the LUTs
self._download_luts(fname)
# The file is tarred, untar and remove the downloaded file
self._untar_luts(fname, tempdir)
lut_dl_dir = os.path.join(tempdir, "count2tbb_v102/")
# Loop over the LUTs and copy to the correct location
for lutfile in AHI_LUT_NAMES:
shutil.move(os.path.join(lut_dl_dir, lutfile), os.path.join(self.lut_dir, lutfile))
shutil.rmtree(lut_dl_dir)
[docs]
def get_dataset(self, key, info):
"""Get the dataset."""
return self.read_band(key, info)
[docs]
def get_area_def(self, dsid):
"""Get the area definition.
This is fixed, but not defined in the file. So we must
generate it ourselves with some assumptions.
"""
if self.areaname == "fld":
area_extent = AHI_FULLDISK_EXTENT
else:
raise NotImplementedError("Reader only supports full disk data.")
proj_param = "EPSG:4326"
area = geometry.AreaDefinition("gridded_himawari",
"A gridded Himawari area",
"longlat",
proj_param,
self.ncols,
self.nlines,
area_extent)
self.area = area
return area
[docs]
def _read_data(self, fp_):
"""Read raw binary data from file."""
return da.from_array(np.memmap(self.filename,
offset=fp_.tell(),
dtype=">u2",
shape=(self.nlines, self.ncols),
mode="r"),
chunks=CHUNK_SIZE)
[docs]
def read_band(self, key, info):
"""Read the data."""
with open(self.filename, "rb") as fp_:
res = self._read_data(fp_)
# Calibrate
res = self.calibrate(res, key["calibration"])
# Update metadata
new_info = dict(
units=info["units"],
standard_name=info["standard_name"],
wavelength=info["wavelength"],
resolution=info["resolution"],
id=key,
name=key["name"],
sensor=self.sensor,
)
res = xr.DataArray(res, attrs=new_info, dims=["y", "x"])
return res
[docs]
def calibrate(self, data, calib):
"""Calibrate the data."""
if calib == "counts":
return data
if calib == "reflectance" or calib == "brightness_temperature":
return self._calibrate(data)
raise NotImplementedError("ERROR: Unsupported calibration.",
"Only counts, reflectance and ",
"brightness_temperature calibration",
"are supported.")