#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2016-2020 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""SAFE MSI L1C/L2A reader.
The MSI data has a special value for saturated pixels. By default, these
pixels are set to np.inf, but for some applications it might be desirable
to have these pixels left untouched.
For this case, the `mask_saturated` flag is available in the reader, and can be
toggled with ``reader_kwargs`` upon Scene creation::
scene = satpy.Scene(filenames,
reader='msi_safe',
reader_kwargs={'mask_saturated': False})
scene.load(['B01'])
L1C/L2A format description for the files read here:
https://sentinels.copernicus.eu/documents/247904/685211/S2-PDGS-TAS-DI-PSD-V14.9.pdf/3d3b6c9c-4334-dcc4-3aa7-f7c0deffbaf7?t=1643013091529
NOTE: At present, L1B data is not supported. If the user needs radiance data instead of counts or reflectances, these
are retrieved by first calculating the reflectance and then working back to the radiance. L1B radiance data support
will be added once the data is published onto the Copernicus data ecosystem.
"""
import logging
from datetime import datetime
import dask.array as da
import defusedxml.ElementTree as ET
import numpy as np
import xarray as xr
from pyresample import geometry
from satpy._compat import cached_property
from satpy.readers.file_handlers import BaseFileHandler
from satpy.utils import get_legacy_chunk_size
logger = logging.getLogger(__name__)
CHUNK_SIZE = get_legacy_chunk_size()
PLATFORMS = {"S2A": "Sentinel-2A",
"S2B": "Sentinel-2B",
"S2C": "Sentinel-2C",
"S2D": "Sentinel-2D"}
[docs]
class SAFEMSIL1C(BaseFileHandler):
"""File handler for SAFE MSI files (jp2)."""
def __init__(self, filename, filename_info, filetype_info, mda, tile_mda,
mask_saturated=True):
"""Initialize the reader."""
super(SAFEMSIL1C, self).__init__(filename, filename_info,
filetype_info)
del mask_saturated
self._channel = filename_info["band_name"]
self.process_level = filename_info["process_level"]
if self.process_level not in ["L1C", "L2A"]:
raise ValueError(f"Unsupported process level: {self.process_level}")
self._tile_mda = tile_mda
self._mda = mda
self.platform_name = PLATFORMS[filename_info["fmission_id"]]
self._start_time = self._tile_mda.start_time()
self._end_time = filename_info["observation_time"]
[docs]
def get_dataset(self, key, info):
"""Load a dataset."""
if self._channel != key["name"]:
return
logger.debug("Reading %s.", key["name"])
proj = self._read_from_file(key)
if proj is None:
return
proj.attrs = info.copy()
proj.attrs["platform_name"] = self.platform_name
return proj
[docs]
def _read_from_file(self, key):
proj = xr.open_dataset(self.filename, engine="rasterio", chunks=CHUNK_SIZE)["band_data"]
proj = proj.squeeze("band")
if key["calibration"] == "reflectance":
return self._mda.calibrate_to_reflectances(proj, self._channel)
if key["calibration"] == "radiance":
# The calibration procedure differs for L1B and L1C/L2A data!
if self.process_level in ["L1C", "L2A"]:
# For higher level data, radiances must be computed from the reflectance.
# By default, we use the mean solar angles so that the user does not need to resample,
# but the user can also choose to use the solar angles from the tile metadata.
# This is on a coarse grid so for most bands must be resampled before use.
dq = dict(name="solar_zenith_angle", resolution=key["resolution"])
zen = self._tile_mda.get_dataset(dq, dict(xml_tag="Sun_Angles_Grid/Zenith"))
tmp_refl = self._mda.calibrate_to_reflectances(proj, self._channel)
return self._mda.calibrate_to_radiances(tmp_refl, zen, self._channel)
else:
# For L1B the radiances can be directly computed from the digital counts.
return self._mda.calibrate_to_radiances_l1b(proj, self._channel)
if key["calibration"] == "counts":
return self._mda._sanitize_data(proj)
if key["calibration"] in ["aerosol_thickness", "water_vapor"]:
return self._mda.calibrate_to_atmospheric(proj, self._channel)
@property
def start_time(self):
"""Get the start time."""
return self._start_time
@property
def end_time(self):
"""Get the end time."""
return self._start_time
[docs]
def get_area_def(self, dsid):
"""Get the area def."""
if self._channel != dsid["name"]:
return
return self._tile_mda.get_area_def(dsid)
[docs]
class SAFEMSIMDXML(SAFEMSIXMLMetadata):
"""File handle for sentinel 2 safe XML generic metadata."""
[docs]
def calibrate_to_reflectances(self, data, band_name):
"""Calibrate *data* using the radiometric information for the metadata."""
quantification = int(self.root.find(".//QUANTIFICATION_VALUE").text) if self.process_level[:2] == "L1" else \
int(self.root.find(".//BOA_QUANTIFICATION_VALUE").text)
data = self._sanitize_data(data)
return (data + self.band_offset(band_name)) / quantification * 100
[docs]
def calibrate_to_atmospheric(self, data, band_name):
"""Calibrate L2A AOT/WVP product."""
atmospheric_bands = ["AOT", "WVP"]
if self.process_level == "L1C" or self.process_level == "L1B":
return
elif self.process_level == "L2A" and band_name not in atmospheric_bands:
return
quantification = float(self.root.find(f".//{band_name}_QUANTIFICATION_VALUE").text)
data = self._sanitize_data(data)
return data / quantification
[docs]
def _sanitize_data(self, data):
data = data.where(data != self.no_data)
if self.mask_saturated:
data = data.where(data != self.saturated, np.inf)
return data
[docs]
def band_offset(self, band):
"""Get the band offset for *band*."""
band_index = self._band_index(band)
return self.band_offsets.get(band_index, 0)
[docs]
def _band_index(self, band):
band_indices = self.band_indices
band_conversions = {"B01": "B1", "B02": "B2", "B03": "B3", "B04": "B4", "B05": "B5", "B06": "B6", "B07": "B7",
"B08": "B8", "B8A": "B8A", "B09": "B9", "B10": "B10", "B11": "B11", "B12": "B12"}
band_index = band_indices[band_conversions[band]]
return band_index
@cached_property
def band_indices(self):
"""Get the band indices from the metadata."""
spectral_info = self.root.findall(".//Spectral_Information")
band_indices = {spec.attrib["physicalBand"]: int(spec.attrib["bandId"]) for spec in spectral_info}
return band_indices
@cached_property
def band_offsets(self):
"""Get the band offsets from the metadata."""
offsets = self.root.find(".//Radiometric_Offset_List") if self.process_level[:2] == "L1" else \
self.root.find(".//BOA_ADD_OFFSET_VALUES_LIST")
if offsets is not None:
band_offsets = {int(off.attrib["band_id"]): float(off.text) for off in offsets}
else:
band_offsets = {}
return band_offsets
[docs]
def solar_irradiance(self, band_name):
"""Get the solar irradiance for a given *band_name*."""
band_index = self._band_index(band_name)
return self.solar_irradiances[band_index]
@cached_property
def solar_irradiances(self):
"""Get the TOA solar irradiance values from the metadata."""
irrads = self.root.find(".//Solar_Irradiance_List")
if irrads is not None:
solar_irrad = {int(irr.attrib["bandId"]): float(irr.text) for irr in irrads}
if len(solar_irrad) > 0:
return solar_irrad
raise ValueError("No solar irradiance values were found in the metadata.")
@cached_property
def sun_earth_dist(self):
"""Get the sun-earth distance from the metadata."""
sed = self.root.find(".//U")
if sed.text is not None:
return float(sed.text)
raise ValueError("Sun-Earth distance in metadata is missing.")
@cached_property
def special_values(self):
"""Get the special values from the metadata."""
special_values = self.root.findall(".//Special_Values")
special_values_dict = {value[0].text: float(value[1].text) for value in special_values}
return special_values_dict
@property
def no_data(self):
"""Get the nodata value from the metadata."""
return self.special_values["NODATA"]
@property
def saturated(self):
"""Get the saturated value from the metadata."""
return self.special_values["SATURATED"]
[docs]
def calibrate_to_radiances_l1b(self, data, band_name):
"""Calibrate *data* to radiance using the radiometric information for the metadata."""
physical_gain = self.physical_gain(band_name)
data = self._sanitize_data(data)
return (data + self.band_offset(band_name)) / physical_gain
[docs]
def calibrate_to_radiances(self, data, solar_zenith, band_name):
"""Calibrate *data* to radiance using the radiometric information for the metadata."""
sed = self.sun_earth_dist
solar_irrad_band = self.solar_irradiance(band_name)
solar_zenith = np.deg2rad(solar_zenith)
return (data / 100.) * solar_irrad_band * np.cos(solar_zenith) / (np.pi * sed * sed)
[docs]
def physical_gain(self, band_name):
"""Get the physical gain for a given *band_name*."""
band_index = self._band_index(band_name)
return self.physical_gains[band_index]
@cached_property
def physical_gains(self):
"""Get the physical gains dictionary."""
physical_gains = {int(elt.attrib["bandId"]): float(elt.text) for elt in self.root.findall(".//PHYSICAL_GAINS")}
return physical_gains
[docs]
def _fill_swath_edges(angles):
"""Fill gaps at edges of swath."""
darr = xr.DataArray(angles, dims=["y", "x"])
darr = darr.bfill("x")
darr = darr.ffill("x")
darr = darr.bfill("y")
darr = darr.ffill("y")
angles = darr.data
return angles
[docs]
class SAFEMSITileMDXML(SAFEMSIXMLMetadata):
"""File handle for sentinel 2 safe XML tile metadata."""
def __init__(self, filename, filename_info, filetype_info, mask_saturated=True):
"""Init the reader."""
super().__init__(filename, filename_info, filetype_info, mask_saturated)
self.geocoding = self.root.find(".//Tile_Geocoding")
[docs]
def get_area_def(self, dsid):
"""Get the area definition of the dataset."""
area_extent = self._area_extent(dsid["resolution"])
cols, rows = self._shape(dsid["resolution"])
area = geometry.AreaDefinition(
self.tile,
"On-the-fly area",
self.tile,
self.projection,
cols,
rows,
area_extent)
return area
@cached_property
def projection(self):
"""Get the geographic projection."""
from pyproj import CRS
epsg = self.geocoding.find("HORIZONTAL_CS_CODE").text
return CRS(epsg)
[docs]
def _area_extent(self, resolution):
cols, rows = self._shape(resolution)
geoposition = self.geocoding.find('Geoposition[@resolution="' + str(resolution) + '"]')
ulx = float(geoposition.find("ULX").text)
uly = float(geoposition.find("ULY").text)
xdim = float(geoposition.find("XDIM").text)
ydim = float(geoposition.find("YDIM").text)
area_extent = (ulx, uly + rows * ydim, ulx + cols * xdim, uly)
return area_extent
[docs]
def _shape(self, resolution):
rows = int(self.geocoding.find('Size[@resolution="' + str(resolution) + '"]/NROWS').text)
cols = int(self.geocoding.find('Size[@resolution="' + str(resolution) + '"]/NCOLS').text)
return cols, rows
[docs]
def start_time(self):
"""Get the observation time from the tile metadata."""
timestr = self.root.find(".//SENSING_TIME").text
return datetime.strptime(timestr, "%Y-%m-%dT%H:%M:%S.%fZ")
[docs]
@staticmethod
def _do_interp(minterp, xcoord, ycoord):
interp_points2 = np.vstack((ycoord.ravel(), xcoord.ravel()))
res = minterp(interp_points2)
return res.reshape(xcoord.shape)
[docs]
def interpolate_angles(self, angles, resolution):
"""Interpolate the angles."""
from geotiepoints.multilinear import MultilinearInterpolator
cols, rows = self._shape(resolution)
smin = [0, 0]
smax = np.array(angles.shape) - 1
orders = angles.shape
minterp = MultilinearInterpolator(smin, smax, orders)
minterp.set_values(da.atleast_2d(angles.ravel()))
y = da.arange(rows, dtype=angles.dtype, chunks=CHUNK_SIZE) / (rows-1) * (angles.shape[0] - 1)
x = da.arange(cols, dtype=angles.dtype, chunks=CHUNK_SIZE) / (cols-1) * (angles.shape[1] - 1)
xcoord, ycoord = da.meshgrid(x, y)
return da.map_blocks(self._do_interp, minterp, xcoord, ycoord, dtype=angles.dtype, chunks=xcoord.chunks)
[docs]
def _get_coarse_dataset(self, key, info):
"""Get the coarse dataset refered to by `key` from the XML data."""
angles = self.root.find(".//Tile_Angles")
if key["name"] in ["solar_zenith_angle", "solar_azimuth_angle"]:
angles = self._get_solar_angles(angles, info)
elif key["name"] in ["satellite_zenith_angle", "satellite_azimuth_angle"]:
angles = self._get_satellite_angles(angles, info)
else:
angles = None
return angles
[docs]
def _get_solar_angles(self, angles, info):
angles = self._get_values_from_tag(angles, info["xml_tag"])
return angles
[docs]
@staticmethod
def _get_values_from_tag(xml_tree, xml_tag):
elts = xml_tree.findall(xml_tag + "/Values_List/VALUES")
return np.array([[val for val in elt.text.split()] for elt in elts],
dtype=np.float64)
[docs]
def _get_satellite_angles(self, angles, info):
arrays = []
elts = angles.findall(info["xml_tag"] + '[@bandId="1"]')
for elt in elts:
arrays.append(self._get_values_from_tag(elt, info["xml_item"]))
angles = np.nanmean(np.dstack(arrays), -1)
return angles
[docs]
def get_dataset(self, key, info):
"""Get the dataset referred to by `key`."""
angles = self._get_coarse_dataset(key, info)
if angles is None:
return None
angles = _fill_swath_edges(angles)
res = self.interpolate_angles(angles, key["resolution"])
proj = xr.DataArray(res, dims=["y", "x"])
proj.attrs = info.copy()
proj.attrs["units"] = "degrees"
proj.attrs["platform_name"] = self.platform_name
return proj