#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2019 Satpy developers
#
# satpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# satpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with satpy. If not, see <http://www.gnu.org/licenses/>.
"""The fci_cld_l2_nc reader tests package."""
import os
import unittest
import uuid
from contextlib import suppress
from unittest import mock
import numpy as np
import pytest
from netCDF4 import Dataset
from pyresample import geometry
from satpy.readers.fci_l2_nc import FciL2NCFileHandler, FciL2NCSegmentFileHandler
from satpy.tests.utils import make_dataid
AREA_DEF = geometry.AreaDefinition(
'mtg_fci_fdss_2km',
'MTG FCI Full Disk Scanning Service area definition with 2 km resolution',
"",
{'h': 35786400., 'lon_0': 0.0, 'ellps': 'WGS84', 'proj': 'geos', 'units': 'm'},
5568,
5568,
(-5567999.9942, 5567999.9942, 5567999.9942, -5567999.9942)
)
SEG_AREA_DEF = geometry.AreaDefinition(
'mtg_fci_fdss_32km',
'MTG FCI Full Disk Scanning Service area definition with 32 km resolution',
"",
{'h': 35786400., 'lon_0': 0.0, 'ellps': 'WGS84', 'proj': 'geos', 'units': 'm'},
348,
348,
(-5567999.9942, 5567999.9942, 5567999.9942, -5567999.9942)
)
[docs]
class TestFciL2NCFileHandler(unittest.TestCase):
"""Test the FciL2NCFileHandler reader."""
[docs]
def setUp(self):
"""Set up the test by creating a test file and opening it with the reader."""
# Easiest way to test the reader is to create a test netCDF file on the fly
# Create unique filenames to prevent race conditions when tests are run in parallel
self.test_file = str(uuid.uuid4()) + ".nc"
with Dataset(self.test_file, 'w') as nc:
# Create dimensions
nc.createDimension('number_of_columns', 10)
nc.createDimension('number_of_rows', 100)
nc.createDimension('maximum_number_of_layers', 2)
# add global attributes
nc.data_source = 'test_data_source'
nc.platform = 'test_platform'
# Add datasets
x = nc.createVariable('x', np.float32, dimensions=('number_of_columns',))
x.standard_name = 'projection_x_coordinate'
x[:] = np.arange(10)
y = nc.createVariable('y', np.float32, dimensions=('number_of_rows',))
y.standard_name = 'projection_y_coordinate'
y[:] = np.arange(100)
s = nc.createVariable('product_quality', np.int8)
s[:] = 99.
one_layer_dataset = nc.createVariable('test_one_layer', np.float32,
dimensions=('number_of_rows', 'number_of_columns'))
one_layer_dataset[:] = np.ones((100, 10))
one_layer_dataset.test_attr = 'attr'
one_layer_dataset.units = 'test_units'
two_layers_dataset = nc.createVariable('test_two_layers', np.float32,
dimensions=('maximum_number_of_layers',
'number_of_rows',
'number_of_columns'))
two_layers_dataset[0, :, :] = np.ones((100, 10))
two_layers_dataset[1, :, :] = 2 * np.ones((100, 10))
mtg_geos_projection = nc.createVariable('mtg_geos_projection', int, dimensions=())
mtg_geos_projection.longitude_of_projection_origin = 0.0
mtg_geos_projection.semi_major_axis = 6378137.
mtg_geos_projection.inverse_flattening = 298.257223563
mtg_geos_projection.perspective_point_height = 35786400.
self.fh = FciL2NCFileHandler(filename=self.test_file, filename_info={}, filetype_info={})
[docs]
def tearDown(self):
"""Remove the previously created test file."""
# First delete the file handler, forcing the file to be closed if still open
del self.fh
# Then we can safely remove the file from the system
with suppress(OSError):
os.remove(self.test_file)
[docs]
def test_all_basic(self):
"""Test all basic functionalities."""
self.assertEqual(self.fh.spacecraft_name, 'test_platform')
self.assertEqual(self.fh.sensor_name, 'test_data_source')
self.assertEqual(self.fh.ssp_lon, 0.0)
global_attributes = self.fh._get_global_attributes()
expected_global_attributes = {
'filename': self.test_file,
'spacecraft_name': 'test_platform',
'ssp_lon': 0.0,
'sensor': 'test_data_source',
'platform_name': 'test_platform'
}
self.assertEqual(global_attributes, expected_global_attributes)
[docs]
@mock.patch('satpy.readers.fci_l2_nc.geometry.AreaDefinition')
@mock.patch('satpy.readers.fci_l2_nc.make_ext')
def test_area_definition(self, me_, gad_):
"""Test the area definition computation."""
self.fh._compute_area_def(make_dataid(name='test_area_def', resolution=2000))
# Asserts that the make_ext function was called with the correct arguments
me_.assert_called_once()
args, kwargs = me_.call_args
np.testing.assert_allclose(args, [-0.0, -515.6620, 5672.28217, 0.0, 35786400.])
proj_dict = {'a': 6378137.,
'lon_0': 0.0,
'h': 35786400,
"rf": 298.257223563,
'proj': 'geos',
'units': 'm',
'sweep': 'y'}
# Asserts that the get_area_definition function was called with the correct arguments
gad_.assert_called_once()
args, kwargs = gad_.call_args
self.assertEqual(args[0], 'mtg_fci_fdss_2km')
self.assertEqual(args[1], 'MTG FCI Full Disk Scanning Service area definition with 2 km resolution')
self.assertEqual(args[2], '')
self.assertEqual(args[3], proj_dict)
self.assertEqual(args[4], 10)
self.assertEqual(args[5], 100)
[docs]
def test_dataset(self):
"""Test the correct execution of the get_dataset function with a valid file_key."""
dataset = self.fh.get_dataset(make_dataid(name='test_one_layer', resolution=2000),
{'name': 'test_one_layer',
'file_key': 'test_one_layer',
'fill_value': -999,
'file_type': 'test_file_type'})
np.testing.assert_allclose(dataset.values, np.ones((100, 10)))
self.assertEqual(dataset.attrs['test_attr'], 'attr')
self.assertEqual(dataset.attrs['units'], 'test_units')
self.assertEqual(dataset.attrs['fill_value'], -999)
[docs]
def test_dataset_with_layer(self):
"""Check the correct execution of the get_dataset function with a valid file_key & layer."""
dataset = self.fh.get_dataset(make_dataid(name='test_two_layers', resolution=2000),
{'name': 'test_two_layers',
'file_key': 'test_two_layers', 'layer': 1,
'fill_value': -999,
'file_type': 'test_file_type'})
np.testing.assert_allclose(dataset.values, 2 * np.ones((100, 10)))
self.assertEqual(dataset.attrs['units'], None)
self.assertEqual(dataset.attrs['spacecraft_name'], 'test_platform')
[docs]
def test_dataset_with_invalid_filekey(self):
"""Test the correct execution of the get_dataset function with an invalid file_key."""
invalid_dataset = self.fh.get_dataset(make_dataid(name='test_invalid', resolution=2000),
{'name': 'test_invalid',
'file_key': 'test_invalid',
'fill_value': -999,
'file_type': 'test_file_type'})
self.assertEqual(invalid_dataset, None)
[docs]
def test_dataset_with_total_cot(self):
"""Test the correct execution of the get_dataset function for total COT (add contributions from two layers)."""
dataset = self.fh.get_dataset(make_dataid(name='retrieved_cloud_optical_thickness', resolution=2000),
{'name': 'retrieved_cloud_optical_thickness',
'file_key': 'test_two_layers',
'fill_value': -999,
'file_type': 'test_file_type'})
# Checks that the function returns None
expected_sum = np.empty((100, 10))
expected_sum[:] = np.log10(10**2 + 10**1)
np.testing.assert_allclose(dataset.values, expected_sum)
[docs]
def test_dataset_with_scalar(self):
"""Test the execution of the get_dataset function for scalar values."""
# Checks returned scalar value
dataset = self.fh.get_dataset(make_dataid(name='test_scalar'),
{'name': 'product_quality',
'file_key': 'product_quality',
'file_type': 'test_file_type'})
self.assertEqual(dataset.values, 99.)
# Checks that no AreaDefintion is implemented for scalar values
with pytest.raises(NotImplementedError):
self.fh.get_area_def(None)
[docs]
class TestFciL2NCSegmentFileHandler(unittest.TestCase):
"""Test the FciL2NCSegmentFileHandler reader."""
[docs]
def setUp(self):
"""Set up the test by creating a test file and opening it with the reader."""
# Easiest way to test the reader is to create a test netCDF file on the fly
self.seg_test_file = str(uuid.uuid4()) + ".nc"
with Dataset(self.seg_test_file, 'w') as nc:
# Create dimensions
nc.createDimension('number_of_FoR_cols', 348)
nc.createDimension('number_of_FoR_rows', 348)
nc.createDimension('number_of_channels', 8)
nc.createDimension('number_of_categories', 6)
# add global attributes
nc.data_source = 'test_fci_data_source'
nc.platform = 'test_fci_platform'
# Add datasets
x = nc.createVariable('x', np.float32, dimensions=('number_of_FoR_cols',))
x.standard_name = 'projection_x_coordinate'
x[:] = np.arange(348)
y = nc.createVariable('y', np.float32, dimensions=('number_of_FoR_rows',))
y.standard_name = 'projection_y_coordinate'
y[:] = np.arange(348)
s = nc.createVariable('product_quality', np.int8)
s[:] = 99.
chans = nc.createVariable('channels', np.float32, dimensions=('number_of_channels',))
chans.standard_name = 'fci_channels'
chans[:] = np.arange(8)
cats = nc.createVariable('categories', np.float32, dimensions=('number_of_categories',))
cats.standard_name = 'product_categories'
cats[:] = np.arange(6)
test_dataset = nc.createVariable('test_values', np.float32,
dimensions=('number_of_FoR_rows', 'number_of_FoR_cols',
'number_of_channels', 'number_of_categories'))
test_dataset[:] = self._get_unique_array(range(8), range(6))
test_dataset.test_attr = 'attr'
test_dataset.units = 'test_units'
[docs]
def tearDown(self):
"""Remove the previously created test file."""
# First delete the fh, forcing the file to be closed if still open
del self.fh
# Then can safely remove it from the system
with suppress(OSError):
os.remove(self.seg_test_file)
[docs]
def test_all_basic(self):
"""Test all basic functionalities."""
self.fh = FciL2NCSegmentFileHandler(filename=self.seg_test_file, filename_info={}, filetype_info={})
assert self.fh.spacecraft_name == 'test_fci_platform'
assert self.fh.sensor_name == 'test_fci_data_source'
assert self.fh.ssp_lon == 0.0
global_attributes = self.fh._get_global_attributes()
expected_global_attributes = {
'filename': self.seg_test_file,
'spacecraft_name': 'test_fci_platform',
'ssp_lon': 0.0,
'sensor': 'test_fci_data_source',
'platform_name': 'test_fci_platform'
}
self.assertEqual(global_attributes, expected_global_attributes)
[docs]
def test_dataset(self):
"""Test the correct execution of the get_dataset function with valid file_key."""
self.fh = FciL2NCSegmentFileHandler(filename=self.seg_test_file, filename_info={}, filetype_info={})
# Checks the correct execution of the get_dataset function with a valid file_key
dataset = self.fh.get_dataset(make_dataid(name='test_values', resolution=32000),
{'name': 'test_values',
'file_key': 'test_values',
'fill_value': -999, })
expected_dataset = self._get_unique_array(range(8), range(6))
np.testing.assert_allclose(dataset.values, expected_dataset)
self.assertEqual(dataset.attrs['test_attr'], 'attr')
self.assertEqual(dataset.attrs['units'], 'test_units')
self.assertEqual(dataset.attrs['fill_value'], -999)
# Checks that no AreaDefintion is implemented
with pytest.raises(NotImplementedError):
self.fh.get_area_def(None)
[docs]
def test_dataset_with_invalid_filekey(self):
"""Test the correct execution of the get_dataset function with an invalid file_key."""
self.fh = FciL2NCSegmentFileHandler(filename=self.seg_test_file, filename_info={}, filetype_info={})
# Checks the correct execution of the get_dataset function with an invalid file_key
invalid_dataset = self.fh.get_dataset(make_dataid(name='test_invalid', resolution=32000),
{'name': 'test_invalid',
'file_key': 'test_invalid',
'fill_value': -999, })
# Checks that the function returns None
self.assertEqual(invalid_dataset, None)
[docs]
def test_dataset_with_adef(self):
"""Test the correct execution of the get_dataset function with `with_area_definition=True`."""
self.fh = FciL2NCSegmentFileHandler(filename=self.seg_test_file, filename_info={}, filetype_info={},
with_area_definition=True)
# Checks the correct execution of the get_dataset function with a valid file_key
dataset = self.fh.get_dataset(make_dataid(name='test_values', resolution=32000),
{'name': 'test_values',
'file_key': 'test_values',
'fill_value': -999,
'coordinates': ('test_lon', 'test_lat'), })
expected_dataset = self._get_unique_array(range(8), range(6))
np.testing.assert_allclose(dataset.values, expected_dataset)
self.assertEqual(dataset.attrs['test_attr'], 'attr')
self.assertEqual(dataset.attrs['units'], 'test_units')
self.assertEqual(dataset.attrs['fill_value'], -999)
# Checks returned AreaDefinition against reference
adef = self.fh.get_area_def(None)
self.assertEqual(adef, SEG_AREA_DEF)
[docs]
def test_dataset_with_adef_and_wrongs_dims(self):
"""Test the correct execution of the get_dataset function with dims that don't match expected AreaDefinition."""
self.fh = FciL2NCSegmentFileHandler(filename=self.seg_test_file, filename_info={}, filetype_info={},
with_area_definition=True)
with pytest.raises(NotImplementedError):
self.fh.get_dataset(make_dataid(name='test_wrong_dims', resolution=6000),
{'name': 'test_wrong_dims', 'file_key': 'test_values', 'fill_value': -999}
)
[docs]
def test_dataset_with_scalar(self):
"""Test the execution of the get_dataset function for scalar values."""
self.fh = FciL2NCSegmentFileHandler(filename=self.seg_test_file, filename_info={}, filetype_info={})
# Checks returned scalar value
dataset = self.fh.get_dataset(make_dataid(name='test_scalar'),
{'name': 'product_quality',
'file_key': 'product_quality',
'file_type': 'test_file_type'})
self.assertEqual(dataset.values, 99.)
# Checks that no AreaDefintion is implemented for scalar values
with pytest.raises(NotImplementedError):
self.fh.get_area_def(None)
[docs]
def test_dataset_slicing_catid(self):
"""Test the correct execution of the _slice_dataset function with 'category_id' set."""
self.fh = FciL2NCSegmentFileHandler(filename=self.seg_test_file, filename_info={}, filetype_info={})
dataset = self.fh.get_dataset(make_dataid(name='test_values', resolution=32000),
{'name': 'test_values',
'file_key': 'test_values',
'fill_value': -999,
'category_id': 5})
expected_dataset = self._get_unique_array(range(8), 5)
np.testing.assert_allclose(dataset.values, expected_dataset)
[docs]
def test_dataset_slicing_chid_catid(self):
"""Test the correct execution of the _slice_dataset function with 'channel_id' and 'category_id' set."""
self.fh = FciL2NCSegmentFileHandler(filename=self.seg_test_file, filename_info={}, filetype_info={})
dataset = self.fh.get_dataset(make_dataid(name='test_values', resolution=32000),
{'name': 'test_values',
'file_key': 'test_values',
'fill_value': -999,
'channel_id': 0, 'category_id': 1})
expected_dataset = self._get_unique_array(0, 1)
np.testing.assert_allclose(dataset.values, expected_dataset)
[docs]
def test_dataset_slicing_visid_catid(self):
"""Test the correct execution of the _slice_dataset function with 'vis_channel_id' and 'category_id' set."""
self.fh = FciL2NCSegmentFileHandler(filename=self.seg_test_file, filename_info={}, filetype_info={})
self.fh.nc = self.fh.nc.rename_dims({'number_of_channels': 'number_of_vis_channels'})
dataset = self.fh.get_dataset(make_dataid(name='test_values', resolution=32000),
{'name': 'test_values',
'file_key': 'test_values',
'fill_value': -999,
'vis_channel_id': 3, 'category_id': 3})
expected_dataset = self._get_unique_array(3, 3)
np.testing.assert_allclose(dataset.values, expected_dataset)
[docs]
def test_dataset_slicing_irid(self):
"""Test the correct execution of the _slice_dataset function with 'ir_channel_id' set."""
self.fh = FciL2NCSegmentFileHandler(filename=self.seg_test_file, filename_info={}, filetype_info={})
self.fh.nc = self.fh.nc.rename_dims({'number_of_channels': 'number_of_ir_channels'})
dataset = self.fh.get_dataset(make_dataid(name='test_values', resolution=32000),
{'name': 'test_values',
'file_key': 'test_values',
'fill_value': -999,
'ir_channel_id': 4})
expected_dataset = self._get_unique_array(4, range(6))
np.testing.assert_allclose(dataset.values, expected_dataset)
[docs]
@staticmethod
def _get_unique_array(iarr, jarr):
if not hasattr(iarr, '__iter__'):
iarr = [iarr]
if not hasattr(jarr, '__iter__'):
jarr = [jarr]
array = np.zeros((348, 348, 8, 6))
for i in iarr:
for j in jarr:
array[:, :, i, j] = (i * 10) + j
array = array[:, :, list(iarr), :]
array = array[:, :, :, list(jarr)]
return np.squeeze(array)
[docs]
class TestFciL2NCReadingByteData(unittest.TestCase):
"""Test the FciL2NCFileHandler when reading and extracting byte data."""
[docs]
def setUp(self):
"""Set up the test by creating a test file and opening it with the reader."""
# Easiest way to test the reader is to create a test netCDF file on the fly
self.test_byte_file = str(uuid.uuid4()) + ".nc"
with Dataset(self.test_byte_file, 'w') as nc_byte:
# Create dimensions
nc_byte.createDimension('number_of_columns', 1)
nc_byte.createDimension('number_of_rows', 1)
# add global attributes
nc_byte.data_source = 'test_data_source'
nc_byte.platform = 'test_platform'
# Add datasets
x = nc_byte.createVariable('x', np.float32, dimensions=('number_of_columns',))
x.standard_name = 'projection_x_coordinate'
x[:] = np.arange(1)
y = nc_byte.createVariable('y', np.float32, dimensions=('number_of_rows',))
x.standard_name = 'projection_y_coordinate'
y[:] = np.arange(1)
mtg_geos_projection = nc_byte.createVariable('mtg_geos_projection', int, dimensions=())
mtg_geos_projection.longitude_of_projection_origin = 0.0
mtg_geos_projection.semi_major_axis = 6378137.
mtg_geos_projection.inverse_flattening = 298.257223563
mtg_geos_projection.perspective_point_height = 35786400.
test_dataset = nc_byte.createVariable('cloud_mask_test_flag', np.float32,
dimensions=('number_of_rows', 'number_of_columns',))
# This number was chosen as we know the expected byte values
test_dataset[:] = 4544767
self.byte_reader = FciL2NCFileHandler(
filename=self.test_byte_file,
filename_info={},
filetype_info={}
)
[docs]
def tearDown(self):
"""Remove the previously created test file."""
# First delete the file handler, forcing the file to be closed if still open
del self.byte_reader
# Then can safely remove it from the system
with suppress(OSError):
os.remove(self.test_byte_file)