Source code for satpy.readers.core.landsat

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2024 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Landsat reader.

Details of the data format can be found here:
  https://d9-wret.s3.us-west-2.amazonaws.com/assets/palladium/production/s3fs-public/media/files/LSDS-1822_Landsat8-9-OLI-TIRS_C2_L1_DataFormatControlBook-v7.pdf
  https://d9-wret.s3.us-west-2.amazonaws.com/assets/palladium/production/s3fs-public/atoms/files/LSDS-1414_Landsat7ETM-C2-L1-DFCB-v3.pdf
  https://d9-wret.s3.us-west-2.amazonaws.com/assets/palladium/production/s3fs-public/atoms/files/LSDS-1415_Landsat4-5-TM-C2-L1-DFCB-v3.pdf
  https://d9-wret.s3.us-west-2.amazonaws.com/assets/palladium/production/s3fs-public/atoms/files/LSDS-1416_LandsatMSS-C2-L1-DFCB-v3.pdf
  https://d9-wret.s3.us-west-2.amazonaws.com/assets/palladium/production/s3fs-public/media/files/LSDS-1328_Landsat8-9_OLI-TIRS-C2-L2_DFCB-v7.pdf
  https://d9-wret.s3.us-west-2.amazonaws.com/assets/palladium/production/s3fs-public/media/files/LSDS-1337_Landsat7ETM-C2-L2-DFCB-v6.pdf
  https://d9-wret.s3.us-west-2.amazonaws.com/assets/palladium/production/s3fs-public/atoms/files/LSDS-1336_Landsat4-5-TM-C2-L2-DFCB-v4.pdf
  https://d9-wret.s3.us-west-2.amazonaws.com/assets/palladium/production/s3fs-public/media/files/LSDS-1927_Landsat7-Data-Users-Handbook-v3.pdf
  https://www.usgs.gov/landsat-missions/using-usgs-landsat-level-1-data-product
  https://www.usgs.gov/landsat-missions/landsat-collection-2-level-2-science-products

NOTE: The scene geometry data (SZA, VZA, SAA, VAA) is retrieved from the L1 TIFF files, which are derived from Band 04.
The geometry differs between bands, so if you need precise geometry you should calculate this from the metadata instead.

"""

import logging
from datetime import datetime, timezone

import dask.array as da
import defusedxml.ElementTree as ET
import numpy as np
import rioxarray  # noqa: F401  # need by xarray with the engine rasterio
import xarray as xr

from satpy.readers.core.file_handlers import BaseFileHandler
from satpy.readers.core.remote import open_file_or_filename

logger = logging.getLogger(__name__)

PLATFORMS = {"01": "Landsat-1",
             "02": "Landsat-2",
             "03": "Landsat-3",
             "04": "Landsat-4",
             "05": "Landsat-5",
             "07": "Landsat-7",
             "08": "Landsat-8",
             "09": "Landsat-9"}

PAN_BANDLIST = ["B8"]

ANGLIST = ["satellite_azimuth_angle",
           "satellite_zenith_angle",
           "solar_azimuth_angle",
           "solar_zenith_angle"]

ANGLIST_CHAN = ["sza", "saa", "vaa", "vza"]
QALIST_CHAN = ["qa", "qa_radsat", "qa_aerosol", "qa_atmos_opacity", "qa_cloud", "qa_st"]


[docs] class BaseLandsatReader(BaseFileHandler): """Basic file handler for Landsat files (tif)."""
[docs] @staticmethod def get_btype(file_type): """Return the band type from the file type.""" if "_B" in file_type: pos = file_type.rfind("_B") else: pos = file_type.rfind("_") if pos == -1: raise ValueError(f"Invalid file type: {file_type}") else: return file_type[pos+1:]
@property def start_time(self): """Return start time.""" return self._mda.start_time @property def end_time(self): """Return end time.""" return self._mda.end_time @property def spectral_bands(self): """Landsat bands.""" raise NotImplementedError() @property def thermal_bands(self): """Landsat thermal bands.""" raise NotImplementedError() @property def sensor(self): """Sensor name.""" raise NotImplementedError()
[docs] def __init__(self, filename, filename_info, filetype_info, mda, **kwargs): """Initialize the reader.""" super().__init__(filename, filename_info, filetype_info) # Check we have landsat data if filename_info["platform_type"] != "L": raise ValueError("This reader only supports Landsat data") # Get the channel name self.channel = self.get_btype(filetype_info["file_type"]) # Data can be VIS, TIR or Combined. This flag denotes what the granule contains (O, T or C respectively). self.chan_selector = filename_info["data_type"] self._obs_date = filename_info["observation_date"] self._mda = mda # Retrieve some per-band useful metadata self.bsat = self._mda.band_saturation self.calinfo = self._mda.band_calibration self.platform_name = PLATFORMS[filename_info["spacecraft_id"]] self.collection = filename_info["collection_id"]
[docs] def get_dataset(self, key, info): """Load a dataset.""" self._check_channel_availability(key["name"]) logger.debug("Reading %s.", key["name"]) # xarray use the engine 'rasterio' to open the file, but # its actually rioxarray used in the backend. # however, error is not explicit enough (see https://github.com/pydata/xarray/issues/7831) data = xr.open_dataarray(open_file_or_filename(self.filename), engine="rasterio", chunks={"band": 1, "y": "auto", "x": "auto"}, mask_and_scale=False).squeeze() self._mask_data(data, key["name"]) attrs = self._collect_attrs(data, key["name"], info) # Rename to Satpy convention data = data.rename({"band": "bands"}) data.attrs.update(attrs) # Calibrate if we're using a band rather than a QA or geometry dataset if "calibration" in key: data = self.calibrate(data, key["calibration"]) self._scale_angle_data(data, key["name"]) return data
[docs] def _check_channel_availability(self, name): self._check_channel_and_reader(name) # OLI-TIRS sensor data sometimes can contain only OLI or only TIRS data self._check_oli_tirs_spectral(name) self._check_oli_tirs_thermal(name)
[docs] def _check_channel_and_reader(self, name): if self.channel == name: return if self.channel in ANGLIST_CHAN: return if name in QALIST_CHAN: return raise ValueError(f"Requested channel {name} does not match the reader channel {self.channel}")
[docs] def _check_oli_tirs_spectral(self, name): if self.sensor != "OLI_TIRS": return if name not in self.spectral_bands: return if self.chan_selector in ["O", "C"]: return raise ValueError(f"Requested channel {name} is not available in this granule")
[docs] def _check_oli_tirs_thermal(self, name): if self.sensor != "OLI_TIRS": return if name not in self.thermal_bands: return if self.chan_selector in ["T", "C"]: return raise ValueError(f"Requested channel {name} is not available in this granule")
[docs] def _mask_data(self, data, name): # For calibration simplicity convert the fill value to np.nan bands_9999 = ["TRAD", "DRAD", "URAD", "ATRAN", "EMIS", "EMSD", "CDIST", "qa_st", "qa_atmos_opacity"] if self.collection == "02" and name in bands_9999: # The fill value for several Landsat L2 bands is '-9999' data.data = da.where(data.data == -9999, np.float32(np.nan), data.data) else: # The fill value for Landsat is '0' data.data = da.where(data.data == 0, np.float32(np.nan), data.data)
[docs] def _collect_attrs(self, data, name, info): attrs = data.attrs.copy() # Add useful metadata to the attributes. attrs["perc_cloud_cover"] = self._mda.cloud_cover # Add platform / sensor attributes attrs["platform_name"] = self.platform_name attrs["sensor"] = self.sensor # Apply attrs from YAML if "standard_name" in info: attrs["standard_name"] = info["standard_name"] if "units" in info: attrs["units"] = info["units"] # Only OLI bands have a saturation flag if name in self.bsat: attrs["saturated"] = self.bsat[name] return attrs
[docs] def _scale_angle_data(self, data, name): if name in ANGLIST: data.data = data.data * 0.01
[docs] def calibrate(self, data, calibration): """Calibrate the data from counts into the desired units.""" raise NotImplementedError()
[docs] def get_area_def(self, dsid): """Get area definition of the image from the metadata.""" return self._mda.build_area_def(dsid["name"])
[docs] class BaseLandsatL1Reader(BaseLandsatReader): """Basic file handler for Landsat L1 files (tif)."""
[docs] def calibrate(self, data, calibration): """Calibrate the data from counts into the desired units.""" if calibration == "counts": return data if calibration in ["radiance", "brightness_temperature"]: data.data = data.data * self.calinfo[self.channel][0] + self.calinfo[self.channel][1] if calibration == "radiance": data.data = data.data.astype(np.float32) return data if calibration == "reflectance": data.data = data.data * self.calinfo[self.channel][2] + self.calinfo[self.channel][3] data.data = data.data.astype(np.float32) * 100 return data if calibration == "brightness_temperature": data.data = (self.calinfo[self.channel][3] / np.log((self.calinfo[self.channel][2] / data.data) + 1)) data.data = data.data.astype(np.float32) return data
[docs] class BaseLandsatL2Reader(BaseLandsatReader): """Basic file handler for Landsat L2 files (tif)."""
[docs] def calibrate(self, data, calibration): """Calibrate the data from counts into the desired units.""" if calibration == "counts": return data if calibration in ["reflectance", "brightness_temperature"]: data.data = data.data * self.calinfo[self.channel][0] + self.calinfo[self.channel][1] if calibration == "reflectance": data.data = data.data * 100 data.data = data.data.astype(np.float32) return data if calibration == "radiance": data.data = data.data * 0.001 data.data = data.data.astype(np.float32) return data if calibration in ["emissivity", "atmospheric_transmittance"]: data.data = data.data * 0.0001 data.data = data.data.astype(np.float32) return data if calibration in ["qa_brightness_temperature", "cloud_distance"]: data.data = data.data * 0.01 data.data = data.data.astype(np.float32) return data
[docs] class OLITIRSCHReader(BaseLandsatL1Reader): """File handler for Landsat OLI-TIRS L1 files (tif).""" @property def spectral_bands(self): """Landsat bands.""" return ["B1", "B2", "B3", "B4", "B5", "B6", "B7", "B8", "B9"] @property def thermal_bands(self): """Landsat thermal bands.""" return ["B10", "B11"] @property def sensor(self): """Sensor name.""" return "OLI_TIRS"
[docs] class OLITIRSL2CHReader(BaseLandsatL2Reader): """File handler for Landsat OLI-TIRS L2 files (tif).""" @property def spectral_bands(self): """Landsat bands.""" return ["B1", "B2", "B3", "B4", "B5", "B6", "B7"] @property def thermal_bands(self): """Landsat thermal bands.""" return ["B10"] @property def sensor(self): """Sensor name.""" return "OLI_TIRS"
[docs] class ETMCHReader(BaseLandsatL1Reader): """File handler for Landsat ETM+ L1 files (tif).""" @property def spectral_bands(self): """Landsat bands.""" return ["B1", "B2", "B3", "B4", "B5", "B7", "B8"] @property def thermal_bands(self): """Landsat thermal bands.""" return ["B6_VCID_1", "B6_VCID_2"] @property def sensor(self): """Sensor name.""" return "ETM+"
[docs] class ETML2CHReader(BaseLandsatL2Reader): """File handler for Landsat ETM+ L2 files (tif).""" @property def spectral_bands(self): """Landsat bands.""" return ["B1", "B2", "B3", "B4", "B5", "B7"] @property def thermal_bands(self): """Landsat thermal bands.""" return ["B6"] @property def sensor(self): """Sensor name.""" return "ETM+"
[docs] class TMCHReader(BaseLandsatL1Reader): """File handler for Landsat TM L1 files (tif).""" @property def spectral_bands(self): """Landsat bands.""" return ["B1", "B2", "B3", "B4", "B5", "B7"] @property def thermal_bands(self): """Landsat thermal bands.""" return ["B6"] @property def sensor(self): """Sensor name.""" return "TM"
[docs] class TML2CHReader(BaseLandsatL2Reader): """File handler for Landsat TM L2 files (tif).""" @property def spectral_bands(self): """Landsat bands.""" return ["B1", "B2", "B3", "B4", "B5", "B7"] @property def thermal_bands(self): """Landsat thermal bands.""" return ["B6"] @property def sensor(self): """Sensor name.""" return "TM"
[docs] class MSSCHReader(BaseLandsatL1Reader): """File handler for Landsat MSS L1 files (tif).""" @property def spectral_bands(self): """Landsat bands.""" if self.platform_name in ["Landsat-1", "Landsat-2", "Landsat-3"]: return ["B4", "B5", "B6", "B7"] elif self.platform_name in ["Landsat-4", "Landsat-5"]: return ["B1", "B2", "B3", "B4"] @property def thermal_bands(self): """Landsat thermal bands.""" return [] @property def sensor(self): """Sensor name.""" return "MSS"
[docs] def available_datasets(self, configured_datasets=None): """Set up wavelength to B4 band.""" # update previously configured datasets for is_avail, ds_info in (configured_datasets or []): availability, info = self._get_dataset_info(ds_info, is_avail) yield availability, info
[docs] def _get_dataset_info(self, ds_info, is_avail): # some other file handler knows how to load this # don't override what they've done if is_avail is not None: return is_avail, ds_info matches = self.file_type_matches(ds_info["file_type"]) if matches: info = self._get_matched_dataset_info(ds_info) return True, info # we don't know what to do with this # see if another future file handler does return is_avail, ds_info
[docs] def _get_matched_dataset_info(self, ds_info): if ds_info.get("name") == "B4": return self._get_modified_wavelength_info(ds_info) return True, ds_info
[docs] def _get_modified_wavelength_info(self, ds_info): # Modify the dataset's wavelength dynamically new_info = ds_info.copy() if self.platform_name in ["Landsat-1", "Landsat-2", "Landsat-3"]: new_info["wavelength"] = [0.5, 0.55, 0.6] # Green elif self.platform_name in ["Landsat-4", "Landsat-5"]: new_info["wavelength"] = [0.8, 0.95, 1.1] # NIR return new_info
[docs] class BaseLandsatMDReader(BaseFileHandler): """Metadata file handler for Landsat files (tif)."""
[docs] def __init__(self, filename, filename_info, filetype_info): """Init the reader.""" super().__init__(filename, filename_info, filetype_info) # Check we have landsat data if filename_info["platform_type"] != "L": raise ValueError("This reader only supports Landsat data") self.platform_name = PLATFORMS[filename_info["spacecraft_id"]] self._obs_date = filename_info["observation_date"] self.root = ET.parse(open_file_or_filename(self.filename)) self.process_level = filename_info["process_level_correction"] import bottleneck # noqa import geotiepoints # noqa
[docs] def get_cal_params(self, top_key, key_1, key_2): """Read the requested calibration parameters.""" gain_flags = {} add_flags = {} for elem in self.root.findall(f".//{top_key}/*"): if elem.tag.startswith(f"{key_1}_BAND_"): band_num = _parse_band_num(elem, key_1) gain_flags[band_num] = float(elem.text) if elem.tag.startswith(f"{key_2}_BAND_"): band_num = _parse_band_num(elem, key_2) add_flags[band_num] = float(elem.text) return {key: tuple([gain_flags[key], add_flags[key]]) for key in gain_flags}
@property def center_time(self): """Return center time.""" return datetime.strptime(self.root.find(".//IMAGE_ATTRIBUTES/SCENE_CENTER_TIME").text[:-2], "%H:%M:%S.%f").replace(tzinfo=timezone.utc) @property def start_time(self): """Return start time. This is actually the scene center time, as we don't have the start time. It is constructed from the observation date (from the filename) and the center time (from the metadata). """ return datetime(self._obs_date.year, self._obs_date.month, self._obs_date.day, self.center_time.hour, self.center_time.minute, self.center_time.second, tzinfo=timezone.utc) @property def end_time(self): """Return end time. This is actually the scene center time, as we don't have the end time. It is constructed from the observation date (from the filename) and the center time (from the metadata). """ return datetime(self._obs_date.year, self._obs_date.month, self._obs_date.day, self.center_time.hour, self.center_time.minute, self.center_time.second, tzinfo=timezone.utc) @property def cloud_cover(self): """Return estimated granule cloud cover percentage.""" return float(self.root.find(".//IMAGE_ATTRIBUTES/CLOUD_COVER").text) @property def band_saturation(self): """Return per-band saturation flag.""" flags = {} for elem in self.root.findall(".//IMAGE_ATTRIBUTES/*"): if elem.tag.startswith("SATURATION_BAND_"): band_num = elem.tag.replace("SATURATION_BAND_", "B") flags[band_num] = (elem.text == "Y") return flags @property def band_calibration(self): """Return per-band saturation flag.""" raise NotImplementedError()
[docs] def earth_sun_distance(self): """Return Earth-Sun distance.""" return float(self.root.find(".//IMAGE_ATTRIBUTES/EARTH_SUN_DISTANCE").text)
[docs] def build_area_def(self, bname): """Build area definition from metadata.""" from pyresample.geometry import AreaDefinition # Here we assume that the thermal bands have the same resolution as the reflective bands, # with only the panchromatic band (b08) having a different resolution. if bname in PAN_BANDLIST and self.platform_name in ["Landsat-7", "Landsat-8", "Landsat-9"]: pixoff = float(self.root.find(".//PROJECTION_ATTRIBUTES/GRID_CELL_SIZE_PANCHROMATIC").text) / 2. x_size = float(self.root.find(".//PROJECTION_ATTRIBUTES/PANCHROMATIC_SAMPLES").text) y_size = float(self.root.find(".//PROJECTION_ATTRIBUTES/PANCHROMATIC_LINES").text) else: pixoff = float(self.root.find(".//PROJECTION_ATTRIBUTES/GRID_CELL_SIZE_REFLECTIVE").text) / 2. x_size = float(self.root.find(".//PROJECTION_ATTRIBUTES/REFLECTIVE_SAMPLES").text) y_size = float(self.root.find(".//PROJECTION_ATTRIBUTES/REFLECTIVE_LINES").text) # Get remaining geoinfo from file datum = self.root.find(".//PROJECTION_ATTRIBUTES/DATUM").text # Reading utm zone or get specific crs for arctic and antarctic if self.root.find(".//PROJECTION_ATTRIBUTES/UTM_ZONE") is not None: utm_zone = self.root.find(".//PROJECTION_ATTRIBUTES/UTM_ZONE").text pcs_id = f"{datum} / UTM zone {utm_zone}N" proj_code = f"EPSG:326{utm_zone.zfill(2)}" else: lat_ts = self.root.find(".//PROJECTION_ATTRIBUTES/TRUE_SCALE_LAT").text if lat_ts == "-71.00000": # Antarctic proj_code = "EPSG:3031" if lat_ts == "71.00000": # Arctic proj_code = "EPSG:3995" pcs_id = f"{datum} / EPSG: {proj_code[5:]}N" # We need to subtract / add half a pixel from the corner to get the correct extent (pixel centers) ext_p1 = float(self.root.find(".//PROJECTION_ATTRIBUTES/CORNER_UL_PROJECTION_X_PRODUCT").text) - pixoff ext_p2 = float(self.root.find(".//PROJECTION_ATTRIBUTES/CORNER_LR_PROJECTION_Y_PRODUCT").text) - pixoff ext_p3 = float(self.root.find(".//PROJECTION_ATTRIBUTES/CORNER_LR_PROJECTION_X_PRODUCT").text) + pixoff ext_p4 = float(self.root.find(".//PROJECTION_ATTRIBUTES/CORNER_UL_PROJECTION_Y_PRODUCT").text) + pixoff # Create area definition area_extent = (ext_p1, ext_p2, ext_p3, ext_p4) # Return the area extent return AreaDefinition(f"EPSG_{proj_code[5:]}", pcs_id, pcs_id, proj_code, x_size, y_size, area_extent)
[docs] def _parse_band_num(elem, key): band_num = elem.tag.replace(f"{key}_BAND_", "") if band_num.startswith("ST_"): band_num = band_num.replace("ST_", "") else: band_num = "B" + band_num return band_num
[docs] class LandsatL1MDReader(BaseLandsatMDReader): """Metadata file handler for Landsat L1 files (tif).""" @property def band_calibration(self): """Return per-band calibration parameters.""" radcal = self.get_cal_params("LEVEL1_RADIOMETRIC_RESCALING", "RADIANCE_MULT", "RADIANCE_ADD") viscal = self.get_cal_params("LEVEL1_RADIOMETRIC_RESCALING", "REFLECTANCE_MULT", "REFLECTANCE_ADD") tircal = self.get_cal_params("LEVEL1_THERMAL_CONSTANTS", "K1_CONSTANT", "K2_CONSTANT") topcal = viscal | tircal return {key: tuple([*radcal[key], *topcal[key]]) for key in radcal}
[docs] class LandsatL2MDReader(BaseLandsatMDReader): """Metadata file handler for Landsat L2 files (tif).""" @property def band_calibration(self): """Return per-band calibration parameters.""" viscal = self.get_cal_params("LEVEL2_SURFACE_REFLECTANCE_PARAMETERS", "REFLECTANCE_MULT", "REFLECTANCE_ADD") tircal = self.get_cal_params("LEVEL2_SURFACE_TEMPERATURE_PARAMETERS", "TEMPERATURE_MULT", "TEMPERATURE_ADD") return viscal | tircal