Source code for enpt.model.metadata.metadata_sensorgeo
# -*- coding: utf-8 -*-
# EnPT, EnMAP Processing Tool - A Python package for pre-processing of EnMAP Level-1B data
#
# Copyright (C) 2018-2024 Karl Segl (GFZ Potsdam, segl@gfz-potsdam.de), Daniel Scheffler
# (GFZ Potsdam, danschef@gfz-potsdam.de), Niklas Bohn (GFZ Potsdam, nbohn@gfz-potsdam.de),
# Stéphane Guillaso (GFZ Potsdam, stephane.guillaso@gfz-potsdam.de)
#
# This software was developed within the context of the EnMAP project supported
# by the DLR Space Administration with funds of the German Federal Ministry of
# Economic Affairs and Energy (on the basis of a decision by the German Bundestag:
# 50 EE 1529) and contributions from DLR, GFZ and OHB System AG.
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version. Please note the following exception: `EnPT` depends on tqdm, which
# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <https://www.gnu.org/licenses/>.
"""EnPT metadata objects for EnMAP data in sensor geometry."""
from datetime import datetime
from lxml import etree as ElementTree
import logging
import os
import fnmatch
from typing import Union, List, Tuple, Optional # noqa: F401
from collections import OrderedDict
from packaging.version import parse as parse_version
import numpy as np
from py_tools_ds.geo.vector.topology import Polygon, get_footprint_polygon # noqa: F401 # flake8 issue
from geoarray import GeoArray
from ...options.config import EnPTConfig
from ..srf import SRF
from ...processors.spatial_transform import RPC_3D_Geolayer_Generator
__author__ = ['Daniel Scheffler', 'Stéphane Guillaso', 'André Hollstein']
[docs]
class EnMAP_Metadata_L1B_Detector_SensorGeo(object):
"""Class for all EnMAP metadata associated with a single EnMAP detector in sensor geometry.
NOTE:
- All metadata that have VNIR and SWIR detector in sensor geometry in common should be included here.
"""
def __init__(self, detector_name: str, config: EnPTConfig, logger: logging.Logger = None):
"""Get a metadata object containing the metadata of a single EnMAP detector in sensor geometry.
:param detector_name: Name of the detector (VNIR or SWIR)
:param config: EnPT configuration object
:param logger: instance of logging.logger or subclassed
"""
from . import L1B_product_props, L1B_product_props_DLR
self.cfg = config
self.detector_name: str = detector_name
if not self.cfg.is_dummy_dataformat:
self.detector_label = L1B_product_props_DLR['xml_detector_label'][detector_name]
else:
self.detector_label = L1B_product_props['xml_detector_label'][detector_name]
self.logger = logger or logging.getLogger()
# These lines are used to load path information
self.filename_data: Optional[str] = '' # detector data filename
self.scene_basename: Optional[str] = '' # basename of the EnMAP image
self.filename_deadpixelmap: Optional[str] = '' # filename of the dead pixel file
self.filename_quicklook: Optional[str] = '' # filename of the quicklook file
# FIXME masks of BOTH detectors
self.filename_mask_landwater: Optional[str] = '' # filename of the land/water mask file
self.filename_mask_snow: Optional[str] = '' # filename of the snow mask file
self.filename_mask_cloudshadow: Optional[str] = '' # filename of the cloud shadow mask file
self.filename_mask_clouds: Optional[str] = '' # filename of the cloud mask file
self.filename_mask_haze: Optional[str] = '' # filename of the haze mask file
self.filename_mask_cirrus: Optional[str] = '' # filename of the cirrus mask file
self.wvl_center: Optional[np.ndarray] = None # Center wavelengths for each EnMAP band
self.fwhm: Optional[np.ndarray] = None # Full width half maximmum for each EnMAP band
self.srf: Optional[SRF] = None # SRF object holding the spectral response functions for each EnMAP band
self.solar_irrad: Optional[np.ndarray] = None # solar irradiance in [W/m2/nm] for each band
self.nwvl: Optional[int] = None # Number of wave bands
self.nrows: Optional[int] = None # number of rows
self.ncols: Optional[int] = None # number of columns
self.smile_coef: Optional[np.ndarray] = None # smile coefficients needed for smile computation
self.nsmile_coef: Optional[int] = None # number of smile coefficients
self.smile: Optional[np.ndarray] = None # smile for each EnMAP image column
self.gains: Optional[np.ndarray] = None # band-wise gains for computing radiance from DNs
self.offsets: Optional[np.ndarray] = None # band-wise offsets for computing radiance from DNs
self.l_min: Optional[np.ndarray] = None # band-wise l-min for computing radiance from DNs
self.l_max: Optional[np.ndarray] = None # band-wise l-max for computing radiance from DNs
self.goodbands_inds: Optional[List] = None # list of band indices included in the processing (all other bands are removed) # noqa
self.lat_UL_UR_LL_LR: Optional[List[float, float, float, float]] = None # latitude coords for UL, UR, LL, LR
self.lon_UL_UR_LL_LR: Optional[List[float, float, float, float]] = None # longitude coords for UL, UR, LL, LR
self.epsg_ortho: Optional[int] = None # EPSG code of the orthorectified image
self.rpc_coeffs: OrderedDict = OrderedDict() # RPC coefficients for geolayer computation
self.ll_mapPoly: Optional[Polygon] = None # footprint polygon in longitude/latitude map coordinates
self.lats: Optional[np.ndarray] = None # 2D/3D array of latitude coordinates
self.lons: Optional[np.ndarray] = None # 2D/3D array of longitude coordinates
self.geolayer_has_keystone: Optional[bool] = None # indicates if lon/lat geolayer considers keystone (3D array)
self.unit: str = '' # radiometric unit of pixel values
self.unitcode: str = '' # code of radiometric unit
self.preview_wvls: Optional[List[float]] = None # wavelengths to be used for quicklook images
self.preview_bands: Optional[List[int]] = None # band indices to be used for quicklook images
self.snr: Optional[np.ndarray] = None # Signal to noise ratio as computed from radiance data
[docs]
def read_metadata(self, path_xml):
"""Read the metadata of a specific EnMAP detector in sensor geometry.
:param path_xml: file path of the metadata file
:return: None
"""
xml = ElementTree.parse(path_xml).getroot()
if not self.cfg.is_dummy_dataformat:
lbl = self.detector_label
self.logger.info("Reading metadata for %s detector..." % self.detector_name)
# read data filenames
all_filenames = [ele.text for ele in xml.findall("product/productFileInformation/file/name")]
def get_filename(matching_exp: str):
matches = []
for ext in ['', '.TIF', '.GEOTIFF', '.BSQ', '.BIL', '.BIP', 'JPEG2000', '.JP2', '.jp2']:
matches.extend(fnmatch.filter(all_filenames, f'{matching_exp}{ext}'))
if matches:
break
if not matches:
raise FileNotFoundError("Could not find a file that matches the expression '%s'." % matching_exp)
return matches[0]
self.filename_data = xml.find("product/image/%s/name" % lbl).text
self.scene_basename = self.filename_data.split('-SPECTRAL_IMAGE')[0]
self.filename_quicklook = xml.find("product/quicklook/%s/name" % lbl).text
self.filename_deadpixelmap = get_filename('*QL_PIXELMASK_%s' % self.detector_name)
self.filename_mask_landwater = get_filename('*QL_QUALITY_CLASSES')
self.filename_mask_snow = get_filename('*QL_QUALITY_SNOW')
self.filename_mask_cloudshadow = get_filename('*QL_QUALITY_CLOUDSHADOW')
self.filename_mask_clouds = get_filename('*-QL_QUALITY_CLOUD')
self.filename_mask_haze = get_filename('*QL_QUALITY_HAZE')
self.filename_mask_cirrus = get_filename('*QL_QUALITY_CIRRUS')
# FIXME combine different cloud masks?
# TODO: Add test flags layer.
# read some basic information concerning the detector
self.nrows = int(xml.find("product/image/%s/dimension/rows" % lbl).text)
self.ncols = int(xml.find("product/image/%s/dimension/columns" % lbl).text)
self.unitcode = 'DN'
self.unit = ''
# Read image coordinates
# FIXME why do we have the same corner coordinates for VNIR and SWIR?
points = xml.findall("base/spatialCoverage/boundingPolygon/point")
coords_xy = {p.find('frame').text: (float(p.find('longitude').text),
float(p.find('latitude').text))
for p in points}
coord_tags = ['upper_left', 'upper_right', 'lower_left', 'lower_right']
self.lon_UL_UR_LL_LR = [coords_xy[ct][0] for ct in coord_tags]
self.lat_UL_UR_LL_LR = [coords_xy[ct][1] for ct in coord_tags]
# read the band related information: wavelength, fwhm
self.nwvl = int(xml.find("product/image/%s/channels" % lbl).text)
# FIXME hardcoded - DLR does not provide any smile information
# => smile coefficients are set to zero until we have valid ones
self.nsmile_coef = 5
self.smile_coef = np.zeros((self.nwvl, self.nsmile_coef), dtype=float)
startband = 0 if self.detector_name == 'VNIR' else int(xml.find("product/image/vnir/channels").text)
subset = slice(startband, startband + self.nwvl)
bi = "specific/bandCharacterisation/bandID/"
self.wvl_center = np.array([float(ele.text) for ele in xml.findall(bi + "wavelengthCenterOfBand")[subset]])
self.fwhm = np.array([float(ele.text) for ele in xml.findall(bi + "FWHMOfBand")[subset]])
self.gains = np.array([float(ele.text) for ele in xml.findall(bi + "GainOfBand")[subset]])
self.offsets = np.array([float(ele.text) for ele in xml.findall(bi + "OffsetOfBand")[subset]])
# read preview bands
wvl_red = float(xml.find("product/image/%s/qlChannels/red" % lbl).text)
wvl_green = float(xml.find("product/image/%s/qlChannels/green" % lbl).text)
wvl_blue = float(xml.find("product/image/%s/qlChannels/blue" % lbl).text)
self.preview_wvls = [wvl_red, wvl_green, wvl_blue]
self.preview_bands = np.array([np.argmin(np.abs(self.wvl_center - wvl)) for wvl in self.preview_wvls])
# read RPC coefficients
for bID in xml.findall("product/navigation/RPC/bandID")[subset]:
bN = 'band_%d' % np.int64(bID.attrib['number'])
keys2combine = ('row_num', 'row_den', 'col_num', 'col_den')
tmp = OrderedDict([(ele.tag.lower(), float(ele.text)) for ele in bID.findall('./')])
self.rpc_coeffs[bN] = {k: v for k, v in tmp.items() if not k.startswith(keys2combine)}
for n in keys2combine:
self.rpc_coeffs[bN]['%s_coeffs' % n.lower()] = \
np.array([v for k, v in tmp.items() if k.startswith(n)])
else:
lbl = self.detector_label
self.logger.info("Reading metadata for %s detector..." % self.detector_name)
# read data filenames
self.filename_data = xml.findall("ProductComponent/%s/Data/Filename" % lbl)[0].text
self.scene_basename = os.path.splitext(self.filename_data)[0]
self.filename_deadpixelmap = xml.findall("ProductComponent/%s/Sensor/DeadPixel/Filename" % lbl)[0].text
self.filename_quicklook = xml.findall("ProductComponent/%s/Preview/Filename" % lbl)[0].text
self.filename_mask_clouds = xml.findall("ProductComponent/%s/Data/CloudMaskMap/Filename" % lbl)[0].text
# read preview bands
self.preview_bands = np.zeros(3, dtype=int)
self.preview_bands[0] = int(xml.findall("ProductComponent/%s/Preview/Bands/Red" % lbl)[0].text)
self.preview_bands[1] = int(xml.findall("ProductComponent/%s/Preview/Bands/Green" % lbl)[0].text)
self.preview_bands[2] = int(xml.findall("ProductComponent/%s/Preview/Bands/Blue" % lbl)[0].text)
# read some basic information concerning the detector
self.nrows = int(xml.findall("ProductComponent/%s/Data/Size/NRows" % lbl)[0].text)
self.ncols = int(xml.findall("ProductComponent/%s/Data/Size/NCols" % lbl)[0].text)
self.unitcode = xml.findall("ProductComponent/%s/Data/Type/UnitCode" % lbl)[0].text
self.unit = xml.findall("ProductComponent/%s/Data/Type/Unit" % lbl)[0].text
# Read image coordinates
scene_corner_coordinates = xml.findall("ProductComponent/%s/Data/SceneInformation/"
"SceneCornerCoordinates" % lbl)
self.lat_UL_UR_LL_LR = [
float(scene_corner_coordinates[0].findall("Latitude")[0].text),
float(scene_corner_coordinates[1].findall("Latitude")[0].text),
float(scene_corner_coordinates[2].findall("Latitude")[0].text),
float(scene_corner_coordinates[3].findall("Latitude")[0].text)
]
self.lon_UL_UR_LL_LR = [
float(scene_corner_coordinates[0].findall("Longitude")[0].text),
float(scene_corner_coordinates[1].findall("Longitude")[0].text),
float(scene_corner_coordinates[2].findall("Longitude")[0].text),
float(scene_corner_coordinates[3].findall("Longitude")[0].text)
]
# read the band related information: wavelength, fwhm
self.nwvl = int(xml.findall("ProductComponent/%s/Data/BandInformationList/NumberOfBands" % lbl)[0].text)
self.nsmile_coef = int(xml.findall(
"ProductComponent/%s/Data/BandInformationList/SmileInformation/NumberOfCoefficients" % lbl)[0].text)
self.fwhm = np.zeros(self.nwvl, dtype=float)
self.wvl_center = np.zeros(self.nwvl, dtype=float)
self.smile_coef = np.zeros((self.nwvl, self.nsmile_coef), dtype=float)
self.l_min = np.zeros(self.nwvl, dtype=float)
self.l_max = np.zeros(self.nwvl, dtype=float)
band_informations = xml.findall("ProductComponent/%s/Data/BandInformationList/BandInformation" % lbl)
for bi in band_informations:
k = np.int64(bi.attrib['Id']) - 1
self.wvl_center[k] = float(bi.findall("CenterWavelength")[0].text)
self.fwhm[k] = float(bi.findall("FullWidthHalfMaximum")[0].text)
self.l_min[k] = float(bi.findall("L_min")[0].text)
self.l_max[k] = float(bi.findall("L_max")[0].text)
scl = bi.findall("Smile/Coefficient")
for sc in scl:
self.smile_coef[k, np.int64(sc.attrib['exponent'])] = float(sc.text)
self.lats = self.interpolate_corners(*self.lat_UL_UR_LL_LR, self.ncols, self.nrows)
self.lons = self.interpolate_corners(*self.lon_UL_UR_LL_LR, self.ncols, self.nrows)
self.geolayer_has_keystone = False
self.preview_wvls = np.array([self.wvl_center[i] for i in self.preview_bands])
# drop bad bands from metadata if desired
if self.cfg.drop_bad_bands:
wvls = list(self.wvl_center)
self.goodbands_inds = gbl = [wvls.index(wvl) for wvl in wvls
if not (1358 < wvl < 1453 or
1814 < wvl < 1961)]
if len(gbl) < len(wvls):
for attrN in ['wvl_center', 'fwhm', 'offsets', 'gains', 'l_min', 'l_max']:
if getattr(self, attrN) is not None:
setattr(self, attrN, getattr(self, attrN)[gbl])
self.nwvl = len(gbl)
self.smile_coef = self.smile_coef[gbl, :]
self.rpc_coeffs = OrderedDict([(k, v) for i, (k, v) in enumerate(self.rpc_coeffs.items())
if i in gbl])
# compute metadata derived from read data
self.smile = self.calc_smile()
self.srf = SRF.from_cwl_fwhm(self.wvl_center, self.fwhm)
self.solar_irrad = self.calc_solar_irradiance_CWL_FWHM_per_band()
self.ll_mapPoly = get_footprint_polygon(tuple(zip(self.lon_UL_UR_LL_LR,
self.lat_UL_UR_LL_LR)), fix_invalid=True)
from ...processors.spatial_transform import get_UTMEPSG_from_LonLat_cornersXY
# NOTE: self.cfg.target_epsg is set if user-provided or in case of Lon/Lat coordinates
self.epsg_ortho = \
self.cfg.target_epsg or \
get_UTMEPSG_from_LonLat_cornersXY(lons=self.lon_UL_UR_LL_LR, lats=self.lat_UL_UR_LL_LR)
[docs]
def calc_smile(self):
"""Compute smile for each EnMAP column.
The sum in (1) is expressed as inner product of two arrays with inner dimension as the polynomial smile
coefficients shape = (ncols, nsmile_coef) of polynomial x
:return:
"""
# smile(icol, iwvl) = sum_(p=0)^(nsmile_coef-1) smile_coef[iwvl, p] * icol**p (1)
return np.inner(
np.array([[icol ** p for p in np.arange(self.nsmile_coef)] for icol in np.arange(self.ncols)]),
self.smile_coef # shape = (nwvl, nsmile_coef)
) # shape = (ncols, nwvl)
[docs]
def calc_snr_from_radiance(self, rad_data: Union[GeoArray, np.ndarray], dir_snr_models: str):
"""Compute EnMAP SNR from radiance data for the given detector.
SNR equation: SNR = p0 + p1 * LTOA + p2 * LTOA ^ 2 [W/(m^2 sr nm)].
NOTE: The SNR model files (SNR_D1.bsq/SNR_D2.bsq) contain polynomial coefficients needed to compute SNR.
SNR_D1.bsq: SNR model for EnMAP VNIR detector (contains high gain and low gain model coefficients)
- 1000 columns (for 1000 EnMAP columns)
- 88 bands for 88 EnMAP VNIR bands
- 7 lines: - 3 lines: high gain coefficients
- 3 lines: low gain coefficients
- 1 line: threshold needed to decide about high gain or low gain
SNR_D2.bsq: SNR model for EnMAP SWIR detector
- 1000 columns (for 1000 EnMAP columns)
- x bands for x EnMAP SWIR bands
- 3 lines for 3 coefficients
:param rad_data: image radiance data of EnMAP_Detector_SensorGeo
:param dir_snr_models: root directory where SNR model data is stored (must contain SNR_D1.bsq/SNR_D2.bsq)
"""
rad_data = np.array(rad_data)
assert self.unitcode == 'TOARad'
self.logger.info(f"Computing SNR from {self.detector_name} TOA radiance.")
if self.detector_name == 'VNIR':
gA = self._get_snr_model(dir_snr_models)
coeffs_highgain = gA[0:3, :, :] # [3 x ncols x nbands]
coeffs_lowgain = gA[3:6, :, :] # [3 x ncols x nbands]
gain_threshold = np.squeeze(gA[6, :, :])
self.snr = np.zeros(rad_data.shape)
for irow in range(self.nrows):
highgain_mask = rad_data[irow, :, :] < gain_threshold # a single row
rad_highgain = rad_data[irow, highgain_mask]
self.snr[irow, :, :][highgain_mask] = \
coeffs_highgain[0, highgain_mask] + \
coeffs_highgain[1, highgain_mask] * rad_highgain + \
coeffs_highgain[2, highgain_mask] * rad_highgain ** 2
lowgain_mask = rad_data[irow, :, :] >= gain_threshold
rad_lowgain = rad_data[irow, lowgain_mask]
self.snr[irow, :, :][lowgain_mask] = \
coeffs_lowgain[0, lowgain_mask] + \
coeffs_lowgain[1, lowgain_mask] * rad_lowgain + \
coeffs_lowgain[2, lowgain_mask] * rad_lowgain ** 2
else:
gA = self._get_snr_model(dir_snr_models)
coeffs = gA[:]
self.snr = coeffs[0, :, :] + coeffs[1, :, :] * rad_data[:, :, :] + coeffs[2, :, :] * rad_data[:, :, :] ** 2
[docs]
def _get_snr_model(self, dir_snr_models: str) -> GeoArray:
"""Get the SNR model coefficients for the current detector.
NOTE: Missing bands are linearly interpolated.
:param dir_snr_models: directory containing the SNR models
"""
path_snr_model = os.path.join(dir_snr_models, "SNR_D1.bsq" if self.detector_name == 'VNIR' else "SNR_D2.bsq")
gA = GeoArray(path_snr_model)
if gA.bands == self.nwvl:
return gA
else:
from scipy.interpolate import make_interp_spline
# equivalent to legacy scipy.interpolate.interp1d
# interp1d(wvl, gA[:], axis=2, kind='linear', fill_value="extrapolate", bounds_error=False)(self.wvl_center)
coeffs_interp = make_interp_spline(gA.meta.band_meta['wavelength'], gA[:], k=1, axis=2)(self.wvl_center)
gA_ = GeoArray(coeffs_interp)
gA_.meta.band_meta['wavelength'] = self.wvl_center
return gA_
[docs]
@staticmethod
def interpolate_corners(ul: float, ur: float, ll: float, lr: float, nx: int, ny: int):
"""Compute interpolated field from corner values of a scalar field given at: ul, ur, ll, lr.
:param ul: tbd
:param ur: tbd
:param ll: tbd
:param lr: tbd
:param nx: final shape (x-axis direction)
:param ny: final shape (y-axis direction)
"""
# FIXME this method must later be replaced by the geolayer provided by the ground segment
# => a linear interpolation between the EnMAP corner coordinates is NOT sufficient for modelling the
# geometry of VNIR and SWIR
# - especially at off-nadir acquisitions and with some terrain present, a linear interpolation leads
# to large deviations (> 180 m y-coordinate offset for the EnPT test dataset)
# TODO ensure that lons/lats represent UL coordinates not pixel coordinate centers (as given by Karl / DLR(?))
corner_coords = np.array([[ul, ur],
[ll, lr]])
rowpos, colpos = [0, 1], [0, 1]
from scipy.interpolate import RegularGridInterpolator
rgi = RegularGridInterpolator([rowpos, colpos], corner_coords, method='linear')
out_rows_grid, out_cols_grid = np.meshgrid(np.linspace(0, 1, ny),
np.linspace(0, 1, nx),
indexing='ij')
coords = rgi(np.dstack([out_rows_grid, out_cols_grid]))
return coords
[docs]
def compute_geolayer_for_cube(self):
self.logger.info('Computing %s geolayer...' % self.detector_name)
GeolayerGen = \
RPC_3D_Geolayer_Generator(
rpc_coeffs_per_band=self.rpc_coeffs,
elevation=self.cfg.path_dem if self.cfg.path_dem else self.cfg.average_elevation,
enmapIm_cornerCoords=tuple(zip(self.lon_UL_UR_LL_LR, self.lat_UL_UR_LL_LR)),
enmapIm_dims_sensorgeo=(self.nrows, self.ncols),
CPUs=self.cfg.CPUs
)
lons, lats = GeolayerGen.compute_geolayer()
self.geolayer_has_keystone = len(GeolayerGen.bandgroups_with_unique_rpc_coeffs) > 1
return lons, lats
[docs]
def calc_solar_irradiance_CWL_FWHM_per_band(self) -> np.array:
from ...io.reader import read_solar_irradiance
self.logger.debug('Calculating solar irradiance...')
sol_irr = read_solar_irradiance(path_solar_irr_model=self.cfg.path_solar_irr, wvl_min_nm=350, wvl_max_nm=2500)
irr_bands = []
for band in self.srf.bands:
WVL_band = self.srf.srfs_wvl if self.srf.wvl_unit == 'nanometers' else self.srf.srfs_wvl * 1000
RSP_band = self.srf.srfs_norm01[band]
sol_irr_at_WVL = np.interp(WVL_band, sol_irr[:, 0], sol_irr[:, 1], left=0, right=0)
irr_bands.append(np.round(np.sum(sol_irr_at_WVL * RSP_band) / np.sum(RSP_band), 2))
return np.array(irr_bands)
[docs]
class EnMAP_Metadata_L1B_SensorGeo(object):
"""EnMAP Metadata class for the metadata of the complete EnMAP L1B product in sensor geometry incl. VNIR and SWIR.
Attributes:
- logger(logging.Logger): None or logging instance
- observation_datetime(datetime.datetime): datetime of observation time
- geom_view_zenith: viewing zenith angle
- geom_view_azimuth: viewing azimuth angle
- geom_sun_zenith: sun zenith angle
- geom_sun_azimuth: sun azimuth angle
- mu_sun: needed by SICOR for TOARad > TOARef conversion
- vnir(EnMAP_Metadata_VNIR_SensorGeo)
- swir(EnMAP_Metadata_SWIR_SensorGeo)
- detector_attrNames: attribute names of the detector objects
"""
def __init__(self, path_metaxml, config: EnPTConfig, logger=None):
"""Get a metadata object instance for the given EnMAP L1B product in sensor geometry.
:param path_metaxml: file path of the EnMAP L1B metadata XML file
:para, config: EnPT configuration object
:param logger: instance of logging.logger or subclassed
"""
self.cfg = config
self.logger = logger or logging.getLogger()
self.path_xml = path_metaxml
self.rootdir = os.path.dirname(path_metaxml)
# defaults - Common
self.proc_level: Optional[str] = None # Dataset processing level
self.version_provider: Optional[str] = '' # version of ground segment processing system
self.observation_datetime: Optional[datetime] = None # Date and Time of image observation
self.geom_view_zenith: Optional[float] = None # viewing zenith angle
self.geom_view_azimuth: Optional[float] = None # viewing azimuth angle
self.geom_sun_zenith: Optional[float] = None # sun zenith angle
self.geom_sun_azimuth: Optional[float] = None # sun azimuth angle
self.mu_sun: Optional[float] = None # needed by SICOR for TOARad > TOARef conversion
self.earthSunDist: Optional[float] = None # earth-sun distance
self.aot: Optional[float] = None # scene aerosol optical thickness
self.water_vapour: Optional[float] = None # scene water vapour [cm]
self.vnir: Optional[EnMAP_Metadata_L1B_Detector_SensorGeo] = None # metadata of VNIR only
self.swir: Optional[EnMAP_Metadata_L1B_Detector_SensorGeo] = None # metadata of SWIR only
self.detector_attrNames: list = ['vnir', 'swir'] # attribute names of the detector objects
self.filename_metaxml: Optional[str] = None # filename of XML metadata file
self._scene_basename: Optional[str] = None # basename of the EnMAP image
@property
def scene_basename(self):
if self.vnir:
self._scene_basename = self.vnir.scene_basename
return self._scene_basename
[docs]
def read_common_meta(self, path_xml):
"""Read the common metadata, principally stored in General Info.
- the acquisition time
- the geometrical observation and illumination
:param path_xml: path to the main xml file
:return: None
"""
# load the metadata xml file
xml = ElementTree.parse(path_xml).getroot()
self.filename_metaxml = os.path.basename(path_xml)
if not self.cfg.is_dummy_dataformat:
# read processing level
self.proc_level = xml.find("base/level").text
if self.proc_level != 'L1B':
raise RuntimeError(self.proc_level, "Unexpected input data processing level. Expected 'L1B'.")
# read version of ground segment processing system
self.version_provider = xml.find("base/revision").text
# raise a warning in case of old processing version (de-striping was implemented in version 01.02.00)
if parse_version(self.version_provider) < parse_version('01.02.00'):
self.logger.warning(
f"The input EnMAP Level-1B image was processed with an old version of the ground segment "
f"processing system (version {self.version_provider}), which, e.g. did not include de-striping. "
f"It is highly recommended to re-download the dataset in the latest processing version from the "
f"archive via the EOWEB GeoPortal (www.eoweb.dlr.de) before passing it to EnPT.")
# read the acquisition time
self.observation_datetime = \
datetime.strptime(xml.find("base/temporalCoverage/startTime").text, '%Y-%m-%dT%H:%M:%S.%fZ')
# get the distance earth sun from the acquisition date
self.earthSunDist = self.get_earth_sun_distance(self.observation_datetime)
# read Geometry (observation/illumination) angle
# NOTE: EnMAP metadata provide also the angles for the image corners
# -> would allow even more precise computation (e.g., specific/sunElevationAngle/upper_left)
# NOTE: alongOffNadirAngle is always near 0 and therefore ignored here (not relevant for AC)
# FIXME VZA may be negative in DLR L1B data -> correct to always use the absolute value for SICOR?
self.geom_view_zenith = np.abs(float(xml.find("specific/acrossOffNadirAngle/center").text))
# FIXME correct to directly use sceneAzimuthAngle (14.24 (DLR) vs. 101.1 (AlpineTest)
self.geom_view_azimuth = float(xml.find("specific/sceneAzimuthAngle/center").text)
self.geom_sun_zenith = 90 - float(xml.find("specific/sunElevationAngle/center").text)
self.geom_sun_azimuth = float(xml.find("specific/sunAzimuthAngle/center").text)
self.mu_sun = np.cos(np.deg2rad(self.geom_sun_zenith))
self.aot = float(xml.find("specific/qualityFlag/sceneAOT").text) / 1000 # scale factor is 1000
self.water_vapour = float(xml.find("specific/qualityFlag/sceneWV").text) / 1000 # scale factor is 1000
else:
# read the acquisition time
self.observation_datetime = \
datetime.strptime(xml.findall("GeneralInfo/ProductInfo/ProductStartTime")[0].text,
'%Y-%m-%dT%H:%M:%S.%fZ')
# get the distance earth sun from the acquisition date
self.earthSunDist = self.get_earth_sun_distance(self.observation_datetime)
# read Geometry (observation/illumination) angle
self.geom_view_zenith = float(xml.findall("GeneralInfo/Geometry/Observation/ZenithAngle")[0].text)
self.geom_view_azimuth = float(xml.findall("GeneralInfo/Geometry/Observation/AzimuthAngle")[0].text)
self.geom_sun_zenith = float(xml.findall("GeneralInfo/Geometry/Illumination/ZenithAngle")[0].text)
self.geom_sun_azimuth = float(xml.findall("GeneralInfo/Geometry/Illumination/AzimuthAngle")[0].text)
self.mu_sun = np.cos(np.deg2rad(self.geom_sun_zenith))
[docs]
def get_earth_sun_distance(self, acqDate: datetime):
"""Get earth-sun distance (requires file of pre-calculated earth sun distance per day).
:param acqDate:
"""
if not os.path.exists(self.cfg.path_earthSunDist):
self.logger.warning("\n\t WARNING: Earth Sun Distance is assumed to be "
"1.0 because no database can be found at %s.""" % self.cfg.path_earthSunDist)
return 1.0
if not acqDate:
self.logger.warning("\n\t WARNING: Earth Sun Distance is assumed to be 1.0 because "
"acquisition date could not be read from metadata.")
return 1.0
with open(self.cfg.path_earthSunDist, "r") as EA_dist_f:
EA_dist_dict = {}
for line in EA_dist_f:
date, EA = [item.strip() for item in line.split(",")]
EA_dist_dict[date] = EA
return float(EA_dist_dict[acqDate.strftime('%Y-%m-%d')])
[docs]
def read_metadata(self):
"""Read the metadata of the entire EnMAP L1B product in sensor geometry."""
# first read common metadata
self.read_common_meta(self.path_xml)
# define and read the VNIR metadata
self.vnir = EnMAP_Metadata_L1B_Detector_SensorGeo('VNIR', config=self.cfg, logger=self.logger)
self.vnir.read_metadata(self.path_xml)
# define and read the SWIR metadata
self.swir = EnMAP_Metadata_L1B_Detector_SensorGeo('SWIR', config=self.cfg, logger=self.logger)
self.swir.read_metadata(self.path_xml)
[docs]
def to_XML(self) -> str:
"""Generate an XML metadata string from the L1B metadata."""
from . import L1B_product_props, L1B_product_props_DLR
xml = ElementTree.parse(self.path_xml).getroot()
if not self.cfg.is_dummy_dataformat:
for detName, detMeta in zip(['VNIR', 'SWIR'], [self.vnir, self.swir]):
lbl = L1B_product_props_DLR['xml_detector_label'][detName]
xml.find("product/image/%s/dimension/rows" % lbl).text = str(detMeta.nrows)
xml.find("product/image/%s/dimension/columns" % lbl).text = str(detMeta.ncols)
xml.find("product/quicklook/%s/dimension/rows" % lbl).text = str(detMeta.nrows)
xml.find("product/quicklook/%s/dimension/columns" % lbl).text = str(detMeta.ncols)
else:
for detName, detMeta in zip(['VNIR', 'SWIR'], [self.vnir, self.swir]):
lbl = L1B_product_props['xml_detector_label'][detName]
xml.find("ProductComponent/%s/Data/Size/NRows" % lbl).text = str(detMeta.nrows)
xml.find("ProductComponent/%s/Data/Type/UnitCode" % lbl).text = detMeta.unitcode
xml.find("ProductComponent/%s/Data/Type/Unit" % lbl).text = detMeta.unit
xml_string = ElementTree.tostring(xml, encoding='unicode', pretty_print=True)
return xml_string