"""Unit tests for GMS-5 VISSR reader."""
import datetime as dt
import gzip
import fsspec
import numpy as np
import pytest
import xarray as xr
from pyresample.geometry import AreaDefinition
import satpy.tests.reader_tests.gms.test_gms5_vissr_data as real_world
from satpy.readers import FSFile
from satpy.tests.reader_tests.utils import get_jit_methods
from satpy.tests.utils import make_dataid, skip_numba_unstable_if_missing
try:
import satpy.readers.gms.gms5_vissr_format as fmt
import satpy.readers.gms.gms5_vissr_l1b as vissr
import satpy.readers.gms.gms5_vissr_navigation as nav
except ImportError as err:
if skip_numba_unstable_if_missing():
pytest.skip(f"Numba is not compatible with unstable NumPy: {err!s}", allow_module_level=True)
raise
[docs]
@pytest.fixture(params=[False, True], autouse=True)
def _disable_jit(request, monkeypatch):
"""Run tests with jit enabled and disabled.
Reason: Coverage report is only accurate with jit disabled.
"""
if request.param:
jit_methods = get_jit_methods(vissr)
for name, method in jit_methods.items():
monkeypatch.setattr(name, method.py_func)
[docs]
class TestEarthMask:
"""Test getting the earth mask."""
[docs]
def test_get_earth_mask(self):
"""Test getting the earth mask."""
first_earth_pixels = np.array([-1, 1, 0, -1])
last_earth_pixels = np.array([-1, 3, 2, -1])
edges = first_earth_pixels, last_earth_pixels
mask_exp = np.array(
[[0, 0, 0, 0],
[0, 1, 1, 1],
[1, 1, 1, 0],
[0, 0, 0, 0]]
)
mask = vissr.get_earth_mask(mask_exp.shape, edges)
np.testing.assert_equal(mask, mask_exp)
[docs]
class TestFileHandler:
"""Test VISSR file handler."""
[docs]
@pytest.fixture(autouse=True)
def _patch_number_of_pixels_per_scanline(self, monkeypatch):
"""Patch data types so that each scanline has two pixels."""
num_pixels = 2
IMAGE_DATA_BLOCK_IR = np.dtype(
[
("LCW", fmt.LINE_CONTROL_WORD),
("DOC", fmt.U1, (256,)),
("image_data", fmt.U1, num_pixels),
]
)
IMAGE_DATA_BLOCK_VIS = np.dtype(
[
("LCW", fmt.LINE_CONTROL_WORD),
("DOC", fmt.U1, (64,)),
("image_data", fmt.U1, (num_pixels,)),
]
)
IMAGE_DATA = {
fmt.VIS_CHANNEL: {
"offset": 6 * fmt.BLOCK_SIZE_VIS,
"dtype": IMAGE_DATA_BLOCK_VIS,
},
fmt.IR_CHANNEL: {
"offset": 18 * fmt.BLOCK_SIZE_IR,
"dtype": IMAGE_DATA_BLOCK_IR,
},
}
monkeypatch.setattr(
"satpy.readers.gms.gms5_vissr_format.IMAGE_DATA_BLOCK_IR", IMAGE_DATA_BLOCK_IR
)
monkeypatch.setattr(
"satpy.readers.gms.gms5_vissr_format.IMAGE_DATA_BLOCK_VIS", IMAGE_DATA_BLOCK_VIS
)
monkeypatch.setattr("satpy.readers.gms.gms5_vissr_format.IMAGE_DATA", IMAGE_DATA)
[docs]
@pytest.fixture(
params=[
make_dataid(name="VIS", calibration="reflectance", resolution=1250),
make_dataid(
name="IR1", calibration="brightness_temperature", resolution=5000
),
make_dataid(name="IR1", calibration="counts", resolution=5000),
]
)
def dataset_id(self, request):
"""Get dataset ID."""
return request.param
[docs]
@pytest.fixture(params=[True, False])
def mask_space(self, request):
"""Mask space pixels."""
return request.param
[docs]
@pytest.fixture(params=[True, False])
def with_compression(self, request):
"""Enable compression."""
return request.param
[docs]
@pytest.fixture
def open_function(self, with_compression):
"""Get open function for writing test files."""
return gzip.open if with_compression else open
[docs]
@pytest.fixture
def vissr_file(self, dataset_id, file_contents, open_function, tmp_path):
"""Get test VISSR file."""
filename = tmp_path / "vissr_file"
ch_type = fmt.CHANNEL_TYPES[dataset_id["name"]]
writer = VissrFileWriter(ch_type, open_function)
writer.write(filename, file_contents)
return filename
[docs]
@pytest.fixture
def file_contents(self, control_block, image_parameters, image_data):
"""Get VISSR file contents."""
return {
"control_block": control_block,
"image_parameters": image_parameters,
"image_data": image_data,
}
[docs]
@pytest.fixture
def control_block(self, dataset_id):
"""Get VISSR control block."""
block_size = {"IR1": 16, "VIS": 4}
ctrl_block = np.zeros(1, dtype=fmt.CONTROL_BLOCK)
ctrl_block["parameter_block_size"] = block_size[dataset_id["name"]]
ctrl_block["available_block_size_of_image_data"] = 2
return ctrl_block
[docs]
@pytest.fixture
def image_parameters(self, mode_block, cal_params, nav_params):
"""Get VISSR image parameters."""
image_params = {"mode": mode_block}
image_params.update(cal_params)
image_params.update(nav_params)
return image_params
[docs]
@pytest.fixture
def nav_params(
self,
coordinate_conversion,
attitude_prediction,
orbit_prediction,
):
"""Get navigation parameters."""
nav_params = {}
nav_params.update(attitude_prediction)
nav_params.update(orbit_prediction)
nav_params.update(coordinate_conversion)
return nav_params
[docs]
@pytest.fixture
def cal_params(
self,
vis_calibration,
ir1_calibration,
ir2_calibration,
wv_calibration,
):
"""Get calibration parameters."""
return {
"vis_calibration": vis_calibration,
"ir1_calibration": ir1_calibration,
"ir2_calibration": ir2_calibration,
"wv_calibration": wv_calibration,
}
[docs]
@pytest.fixture
def mode_block(self):
"""Get VISSR mode block."""
mode = np.zeros(1, dtype=fmt.MODE_BLOCK)
mode["satellite_name"] = b"GMS-5 "
mode["spin_rate"] = 99.21774
mode["observation_time_mjd"] = 50000.0
mode["ssp_longitude"] = 140.0
mode["satellite_height"] = 123456.0
mode["ir_frame_parameters"]["number_of_lines"] = 2
mode["ir_frame_parameters"]["number_of_pixels"] = 2
mode["vis_frame_parameters"]["number_of_lines"] = 2
mode["vis_frame_parameters"]["number_of_pixels"] = 2
return mode
[docs]
@pytest.fixture
def coordinate_conversion(self, coord_conv, simple_coord_conv_table):
"""Get all coordinate conversion parameters."""
return {
"coordinate_conversion": coord_conv,
"simple_coordinate_conversion_table": simple_coord_conv_table
}
[docs]
@pytest.fixture
def coord_conv(self):
"""Get parameters for coordinate conversions.
Adjust pixel offset so that the first column is at the image center.
This has the advantage that we can test with very small 2x2 images.
Otherwise, all pixels would be in space.
"""
conv = np.zeros(1, dtype=fmt.COORDINATE_CONVERSION_PARAMETERS)
cline = conv["central_line_number_of_vissr_frame"]
cline["IR1"] = 1378.5
cline["VIS"] = 5513.0
cpix = conv["central_pixel_number_of_vissr_frame"]
cpix["IR1"] = 0.5 # instead of 1672.5
cpix["VIS"] = 0.5 # instead of 6688.5
conv["scheduled_observation_time"] = 50130.979089568464
nsensors = conv["number_of_sensor_elements"]
nsensors["IR1"] = 1
nsensors["VIS"] = 4
sampling_angle = conv["sampling_angle_along_pixel"]
sampling_angle["IR1"] = 9.5719995e-05
sampling_angle["VIS"] = 2.3929999e-05
stepping_angle = conv["stepping_angle_along_line"]
stepping_angle["IR1"] = 0.00014000005
stepping_angle["VIS"] = 3.5000005e-05
conv["matrix_of_misalignment"] = np.array(
[[9.9999917e-01, -5.1195198e-04, -1.2135329e-03],
[5.1036407e-04, 9.9999905e-01, -1.3083406e-03],
[1.2142011e-03, 1.3077201e-03, 9.9999845e-01]],
dtype=np.float32
)
conv["parameters"]["equatorial_radius"] = 6377397.0
conv["parameters"]["oblateness_of_earth"] = 0.003342773
conv["orbital_parameters"]["longitude_of_ssp"] = 141.0
conv["orbital_parameters"]["latitude_of_ssp"] = 1.0
return conv
[docs]
@pytest.fixture
def attitude_prediction(self):
"""Get attitude prediction."""
att_pred = np.zeros(1, dtype=fmt.ATTITUDE_PREDICTION)
att_pred["data"] = real_world.ATTITUDE_PREDICTION
return {"attitude_prediction": att_pred}
[docs]
@pytest.fixture
def orbit_prediction(self, orbit_prediction_1, orbit_prediction_2):
"""Get predictions of orbital parameters."""
return {
"orbit_prediction_1": orbit_prediction_1,
"orbit_prediction_2": orbit_prediction_2
}
[docs]
@pytest.fixture
def orbit_prediction_1(self):
"""Get first block of orbit prediction data."""
orb_pred = np.zeros(1, dtype=fmt.ORBIT_PREDICTION)
orb_pred["data"] = real_world.ORBIT_PREDICTION_1
return orb_pred
[docs]
@pytest.fixture
def orbit_prediction_2(self):
"""Get second block of orbit prediction data."""
orb_pred = np.zeros(1, dtype=fmt.ORBIT_PREDICTION)
orb_pred["data"] = real_world.ORBIT_PREDICTION_2
return orb_pred
[docs]
@pytest.fixture
def vis_calibration(self):
"""Get VIS calibration block."""
vis_cal = np.zeros(1, dtype=fmt.VIS_CALIBRATION)
table = vis_cal["vis1_calibration_table"]["brightness_albedo_conversion_table"]
table[0, 0:4] = np.array([0, 0.25, 0.5, 1])
return vis_cal
[docs]
@pytest.fixture
def ir1_calibration(self):
"""Get IR1 calibration block."""
cal = np.zeros(1, dtype=fmt.IR_CALIBRATION)
table = cal["conversion_table_of_equivalent_black_body_temperature"]
table[0, 0:4] = np.array([0, 100, 200, 300])
return cal
[docs]
@pytest.fixture
def ir2_calibration(self):
"""Get IR2 calibration block."""
cal = np.zeros(1, dtype=fmt.IR_CALIBRATION)
return cal
[docs]
@pytest.fixture
def wv_calibration(self):
"""Get WV calibration block."""
cal = np.zeros(1, dtype=fmt.IR_CALIBRATION)
return cal
[docs]
@pytest.fixture
def simple_coord_conv_table(self):
"""Get simple coordinate conversion table."""
table = np.zeros(1, dtype=fmt.SIMPLE_COORDINATE_CONVERSION_TABLE)
table["satellite_height"] = 123457.0
return table
[docs]
@pytest.fixture
def image_data(self, dataset_id, image_data_ir1, image_data_vis):
"""Get VISSR image data."""
data = {"IR1": image_data_ir1, "VIS": image_data_vis}
return data[dataset_id["name"]]
[docs]
@pytest.fixture
def image_data_ir1(self):
"""Get IR1 image data."""
image_data = np.zeros(2, fmt.IMAGE_DATA_BLOCK_IR)
image_data["LCW"]["line_number"] = [686, 2089]
image_data["LCW"]["scan_time"] = [50000, 50000]
image_data["LCW"]["west_side_earth_edge"] = [0, 0]
image_data["LCW"]["east_side_earth_edge"] = [1, 1]
image_data["image_data"] = [[0, 1], [2, 3]]
return image_data
[docs]
@pytest.fixture
def image_data_vis(self):
"""Get VIS image data."""
image_data = np.zeros(2, fmt.IMAGE_DATA_BLOCK_VIS)
image_data["LCW"]["line_number"] = [2744, 8356]
image_data["LCW"]["scan_time"] = [50000, 50000]
image_data["LCW"]["west_side_earth_edge"] = [-1, 0]
image_data["LCW"]["east_side_earth_edge"] = [-1, 1]
image_data["image_data"] = [[0, 1], [2, 3]]
return image_data
[docs]
@pytest.fixture
def vissr_file_like(self, vissr_file, with_compression):
"""Get file-like object for VISSR test file."""
if with_compression:
open_file = fsspec.open(vissr_file, compression="gzip")
return FSFile(open_file)
return vissr_file
[docs]
@pytest.fixture
def file_handler(self, vissr_file_like, mask_space):
"""Get file handler to be tested."""
return vissr.GMS5VISSRFileHandler(
vissr_file_like, {}, {}, mask_space=mask_space
)
[docs]
@pytest.fixture
def vis_refl_exp(self, mask_space, lons_lats_exp):
"""Get expected VIS reflectance."""
lons, lats = lons_lats_exp
if mask_space:
data = [[np.nan, np.nan], [50, 100]]
else:
data = [[0, 25], [50, 100]]
return xr.DataArray(
data,
dims=("y", "x"),
coords={
"lon": lons,
"lat": lats,
"acq_time": (
"y",
[dt.datetime(1995, 10, 10), dt.datetime(1995, 10, 10)],
),
"line_number": ("y", [2744, 8356]),
},
)
[docs]
@pytest.fixture
def ir1_counts_exp(self, lons_lats_exp):
"""Get expected IR1 counts."""
lons, lats = lons_lats_exp
return xr.DataArray(
[[0, 1], [2, 3]],
dims=("y", "x"),
coords={
"lon": lons,
"lat": lats,
"acq_time": (
"y",
[dt.datetime(1995, 10, 10), dt.datetime(1995, 10, 10)],
),
"line_number": ("y", [686, 2089]),
},
)
[docs]
@pytest.fixture
def ir1_bt_exp(self, lons_lats_exp):
"""Get expected IR1 brightness temperature."""
lons, lats = lons_lats_exp
return xr.DataArray(
[[0, 100], [200, 300]],
dims=("y", "x"),
coords={
"lon": lons,
"lat": lats,
"acq_time": (
"y",
[dt.datetime(1995, 10, 10), dt.datetime(1995, 10, 10)],
),
"line_number": ("y", [686, 2089]),
},
)
[docs]
@pytest.fixture
def lons_lats_exp(self, dataset_id):
"""Get expected lon/lat coordinates.
Computed with JMA's Msial library for 2 pixels near the central column
(6688.5/1672.5 for VIS/IR).
VIS:
pix = [6688, 6688, 6689, 6689]
lin = [2744, 8356, 2744, 8356]
IR1:
pix = [1672, 1672, 1673, 1673]
lin = [686, 2089, 686, 2089]
"""
expectations = {
"IR1": {
"lons": [[139.680120, 139.718902],
[140.307367, 140.346062]],
"lats": [[35.045132, 35.045361],
[-34.971012, -34.970738]]
},
"VIS": {
"lons": [[139.665133, 139.674833],
[140.292579, 140.302249]],
"lats": [[35.076113, 35.076170],
[-34.940439, -34.940370]]
}
}
exp = expectations[dataset_id["name"]]
lons = xr.DataArray(exp["lons"], dims=("y", "x"))
lats = xr.DataArray(exp["lats"], dims=("y", "x"))
return lons, lats
[docs]
@pytest.fixture
def dataset_exp(self, dataset_id, ir1_counts_exp, ir1_bt_exp, vis_refl_exp):
"""Get expected dataset."""
ir1_counts_id = make_dataid(name="IR1", calibration="counts", resolution=5000)
ir1_bt_id = make_dataid(
name="IR1", calibration="brightness_temperature", resolution=5000
)
vis_refl_id = make_dataid(
name="VIS", calibration="reflectance", resolution=1250
)
expectations = {
ir1_counts_id: ir1_counts_exp,
ir1_bt_id: ir1_bt_exp,
vis_refl_id: vis_refl_exp,
}
return expectations[dataset_id]
[docs]
@pytest.fixture
def area_def_exp(self, dataset_id):
"""Get expected area definition."""
if dataset_id["name"] == "IR1":
resol = 5
size = 2366
extent = (-20438.1468, -20438.1468, 20455.4306, 20455.4306)
else:
resol = 1
size = 9464
extent = (-20444.6235, -20444.6235, 20448.9445, 20448.9445)
area_id = f"gms-5_vissr_western-pacific_{resol}km"
desc = f"GMS-5 VISSR Western Pacific area definition with {resol} km resolution"
return AreaDefinition(
area_id=area_id,
description=desc,
proj_id=area_id,
projection={
"a": nav.EARTH_EQUATORIAL_RADIUS,
"b": nav.EARTH_POLAR_RADIUS,
"h": "123456",
"lon_0": "140",
"no_defs": "None",
"proj": "geos",
"type": "crs",
"units": "m",
"x_0": "0",
"y_0": "0",
},
area_extent=extent,
width=size,
height=size,
)
[docs]
@pytest.fixture
def attrs_exp(self, area_def_exp):
"""Get expected dataset attributes."""
return {
"yaml": "info",
"platform": "GMS-5",
"sensor": "VISSR",
"time_parameters": {
"nominal_start_time": dt.datetime(1995, 10, 10),
"nominal_end_time": dt.datetime(1995, 10, 10, 0, 25),
},
"orbital_parameters": {
"satellite_nominal_longitude": 140.0,
"satellite_nominal_latitude": 0.0,
"satellite_nominal_altitude": 123456.0,
"satellite_actual_longitude": 141.0,
"satellite_actual_latitude": 1.0,
"satellite_actual_altitude": 123457.0,
},
"area_def_uniform_sampling": area_def_exp,
}
[docs]
def test_get_dataset(self, file_handler, dataset_id, dataset_exp, attrs_exp):
"""Test getting the dataset."""
dataset = file_handler.get_dataset(dataset_id, {"yaml": "info"})
xr.testing.assert_allclose(dataset.compute(), dataset_exp, atol=1e-6)
assert dataset.attrs == attrs_exp
[docs]
def test_time_attributes(self, file_handler, attrs_exp):
"""Test the file handler's time attributes."""
start_time_exp = attrs_exp["time_parameters"]["nominal_start_time"]
end_time_exp = attrs_exp["time_parameters"]["nominal_end_time"]
assert file_handler.start_time == start_time_exp
assert file_handler.end_time == end_time_exp
[docs]
class TestCorruptFile:
"""Test reading corrupt files."""
[docs]
@pytest.fixture
def file_contents(self):
"""Get corrupt file contents (all zero)."""
control_block = np.zeros(1, dtype=fmt.CONTROL_BLOCK)
image_data = np.zeros(1, dtype=fmt.IMAGE_DATA_BLOCK_IR)
return {
"control_block": control_block,
"image_parameters": {},
"image_data": image_data,
}
[docs]
@pytest.fixture
def corrupt_file(self, file_contents, tmp_path):
"""Write corrupt VISSR file to disk."""
filename = tmp_path / "my_vissr_file"
writer = VissrFileWriter(ch_type="VIS", open_function=open)
writer.write(filename, file_contents)
return filename
[docs]
def test_corrupt_file(self, corrupt_file):
"""Test reading a corrupt file."""
with pytest.raises(ValueError, match=r".* corrupt .*"):
vissr.GMS5VISSRFileHandler(corrupt_file, {}, {})
[docs]
class VissrFileWriter:
"""Write data in VISSR archive format."""
image_params_order = [
"mode",
"coordinate_conversion",
"attitude_prediction",
"orbit_prediction_1",
"orbit_prediction_2",
"vis_calibration",
"ir1_calibration",
"ir2_calibration",
"wv_calibration",
"simple_coordinate_conversion_table",
]
def __init__(self, ch_type, open_function):
"""Initialize the writer.
Args:
ch_type: Channel type (VIS or IR)
open_function: Open function to be used (e.g. open or gzip.open)
"""
self.ch_type = ch_type
self.open_function = open_function
[docs]
def write(self, filename, contents):
"""Write file contents to disk."""
with self.open_function(filename, mode="wb") as fd:
self._write_control_block(fd, contents)
self._write_image_parameters(fd, contents)
self._write_image_data(fd, contents)
[docs]
def _write_control_block(self, fd, contents):
self._write(fd, contents["control_block"])
[docs]
def _write_image_parameters(self, fd, contents):
for name in self.image_params_order:
im_param = contents["image_parameters"].get(name)
if im_param:
self._write_image_parameter(fd, im_param, name)
[docs]
def _write_image_parameter(self, fd, im_param, name):
offset = fmt.IMAGE_PARAMS[name]["offset"][self.ch_type]
self._write(fd, im_param, offset)
[docs]
def _write_image_data(self, fd, contents):
offset = fmt.IMAGE_DATA[self.ch_type]["offset"]
self._write(fd, contents["image_data"], offset)
[docs]
def _write(self, fd, data, offset=None):
"""Write data to file.
If specified, prepend with 'offset' placeholder bytes.
"""
if offset:
self._fill(fd, offset)
fd.write(data.tobytes())
[docs]
def _fill(self, fd, target_byte):
"""Write placeholders from current position to target byte."""
nbytes = target_byte - fd.tell()
fd.write(b" " * nbytes)