Source code for satpy.readers.ahi_l1b_gridded_bin

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2020 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Advanced Himawari Imager (AHI) gridded format data reader.

This data comes in a flat binary format on a fixed grid, and needs to have
calibration coefficients applied to it in order to retrieve reflectance or BT.
LUTs can be downloaded at: ftp://hmwr829gr.cr.chiba-u.ac.jp/gridded/FD/support/

This data is gridded from the original Himawari geometry. To our knowledge,
only full disk grids are available, not for the Meso or Japan rapid scans.

References:
 - AHI gridded data website:
        http://www.cr.chiba-u.jp/databases/GEO/H8_9/FD/index_jp.html


"""

import logging
import os

import dask.array as da
import numpy as np
import xarray as xr
from appdirs import AppDirs
from pyresample import geometry

from satpy.readers.file_handlers import BaseFileHandler
from satpy.readers.utils import unzip_file
from satpy.utils import get_legacy_chunk_size

CHUNK_SIZE = get_legacy_chunk_size()

# Hardcoded address of the reflectance and BT look-up tables
AHI_REMOTE_LUTS = 'http://www.cr.chiba-u.jp/databases/GEO/H8_9/FD/count2tbb_v102.tgz'

# Full disk image sizes for each spatial resolution
AHI_FULLDISK_SIZES = {0.005: {'x_size': 24000,
                              'y_size': 24000},
                      0.01: {'x_size': 12000,
                             'y_size': 12000},
                      0.02: {'x_size': 6000,
                             'y_size': 6000}}

# Geographic extent of the full disk area in degrees
AHI_FULLDISK_EXTENT = [85., -60., 205., 60.]

# Resolutions of each channel type
AHI_CHANNEL_RES = {'vis': 0.01,
                   'ext': 0.005,
                   'sir': 0.02,
                   'tir': 0.02}

# List of LUT filenames
AHI_LUT_NAMES = ['ext.01', 'vis.01', 'vis.02', 'vis.03',
                 'sir.01', 'sir.02', 'tir.01', 'tir.02',
                 'tir.03', 'tir.04', 'tir.05', 'tir.06',
                 'tir.07', 'tir.08', 'tir.09', 'tir.10']

logger = logging.getLogger('ahi_grid')


[docs] class AHIGriddedFileHandler(BaseFileHandler): """AHI gridded format reader. This data is flat binary, big endian unsigned short. It covers the region 85E -> 205E, 60N -> 60S at variable resolution: - 0.005 degrees for Band 3 - 0.01 degrees for Bands 1, 2 and 4 - 0.02 degrees for all other bands. These are approximately equivalent to 0.5, 1 and 2km. Files can either be zipped with bz2 compression (like the HSD format data), or can be uncompressed flat binary. """ def __init__(self, filename, filename_info, filetype_info): """Initialize the reader.""" super(AHIGriddedFileHandler, self).__init__(filename, filename_info, filetype_info) self._unzipped = unzip_file(self.filename) # Assume file is not zipped if self._unzipped: # But if it is, set the filename to point to unzipped temp file self.filename = self._unzipped # Get the band name, needed for finding area and dimensions self.product_name = filetype_info['file_type'] self.areaname = filename_info['area'] self.sensor = 'ahi' self.res = AHI_CHANNEL_RES[self.product_name[:3]] if self.areaname == 'fld': self.nlines = AHI_FULLDISK_SIZES[self.res]['y_size'] self.ncols = AHI_FULLDISK_SIZES[self.res]['x_size'] else: raise NotImplementedError("Only full disk data is supported.") # Set up directory path for the LUTs app_dirs = AppDirs('ahi_gridded_luts', 'satpy', '1.0.2') self.lut_dir = os.path.expanduser(app_dirs.user_data_dir) + '/' self.area = None def __del__(self): """Delete the object.""" if self._unzipped and os.path.exists(self.filename): os.remove(self.filename)
[docs] def _load_lut(self): """Determine if LUT is available and, if not, download it.""" # First, check that the LUT is available. If not, download it. lut_file = self.lut_dir + self.product_name if not os.path.exists(lut_file): self._get_luts() try: # Load file, it has 2 columns: DN + Refl/BT. We only need latter. lut = np.loadtxt(lut_file)[:, 1] except FileNotFoundError: raise FileNotFoundError("No LUT file found:", lut_file) return lut
[docs] def _calibrate(self, data): """Load calibration from LUT and apply.""" lut = self._load_lut() # LUT may truncate NaN values, so manually set those in data lut_len = len(lut) data = np.where(data < lut_len - 1, data, np.nan) return lut[data.astype(np.uint16)]
[docs] @staticmethod def _download_luts(file_name): """Download LUTs from remote server.""" import shutil import urllib # Set up an connection and download with urllib.request.urlopen(AHI_REMOTE_LUTS) as response: # nosec with open(file_name, 'wb') as out_file: shutil.copyfileobj(response, out_file)
[docs] @staticmethod def _untar_luts(tarred_file, outdir): """Uncompress downloaded LUTs, which are a tarball.""" import tarfile tar = tarfile.open(tarred_file) tar.extractall(outdir) # nosec tar.close() os.remove(tarred_file)
[docs] def _get_luts(self): """Download the LUTs needed for count->Refl/BT conversion.""" import pathlib import shutil from satpy import config # Check that the LUT directory exists pathlib.Path(self.lut_dir).mkdir(parents=True, exist_ok=True) logger.info("Download AHI LUTs files and store in directory %s", self.lut_dir) tempdir = config["tmp_dir"] fname = os.path.join(tempdir, 'tmp.tgz') # Download the LUTs self._download_luts(fname) # The file is tarred, untar and remove the downloaded file self._untar_luts(fname, tempdir) lut_dl_dir = os.path.join(tempdir, 'count2tbb_v102/') # Loop over the LUTs and copy to the correct location for lutfile in AHI_LUT_NAMES: shutil.move(os.path.join(lut_dl_dir, lutfile), os.path.join(self.lut_dir, lutfile)) shutil.rmtree(lut_dl_dir)
[docs] def get_dataset(self, key, info): """Get the dataset.""" return self.read_band(key, info)
[docs] def get_area_def(self, dsid): """Get the area definition. This is fixed, but not defined in the file. So we must generate it ourselves with some assumptions. """ if self.areaname == 'fld': area_extent = AHI_FULLDISK_EXTENT else: raise NotImplementedError("Reader only supports full disk data.") proj_param = 'EPSG:4326' area = geometry.AreaDefinition('gridded_himawari', 'A gridded Himawari area', 'longlat', proj_param, self.ncols, self.nlines, area_extent) self.area = area return area
[docs] def _read_data(self, fp_): """Read raw binary data from file.""" return da.from_array(np.memmap(self.filename, offset=fp_.tell(), dtype='>u2', shape=(self.nlines, self.ncols), mode='r'), chunks=CHUNK_SIZE)
[docs] def read_band(self, key, info): """Read the data.""" with open(self.filename, "rb") as fp_: res = self._read_data(fp_) # Calibrate res = self.calibrate(res, key['calibration']) # Update metadata new_info = dict( units=info['units'], standard_name=info['standard_name'], wavelength=info['wavelength'], resolution=info['resolution'], id=key, name=key['name'], sensor=self.sensor, ) res = xr.DataArray(res, attrs=new_info, dims=['y', 'x']) return res
[docs] def calibrate(self, data, calib): """Calibrate the data.""" if calib == 'counts': return data if calib == 'reflectance' or calib == 'brightness_temperature': return self._calibrate(data) raise NotImplementedError("ERROR: Unsupported calibration.", "Only counts, reflectance and ", "brightness_temperature calibration", "are supported.")