Coverage for enpt/model/metadata/metadata_sensorgeo.py: 98%
321 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-07 11:39 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-07 11:39 +0000
1# -*- coding: utf-8 -*-
3# EnPT, EnMAP Processing Tool - A Python package for pre-processing of EnMAP Level-1B data
4#
5# Copyright (C) 2018-2024 Karl Segl (GFZ Potsdam, segl@gfz-potsdam.de), Daniel Scheffler
6# (GFZ Potsdam, danschef@gfz-potsdam.de), Niklas Bohn (GFZ Potsdam, nbohn@gfz-potsdam.de),
7# Stéphane Guillaso (GFZ Potsdam, stephane.guillaso@gfz-potsdam.de)
8#
9# This software was developed within the context of the EnMAP project supported
10# by the DLR Space Administration with funds of the German Federal Ministry of
11# Economic Affairs and Energy (on the basis of a decision by the German Bundestag:
12# 50 EE 1529) and contributions from DLR, GFZ and OHB System AG.
13#
14# This program is free software: you can redistribute it and/or modify it under
15# the terms of the GNU General Public License as published by the Free Software
16# Foundation, either version 3 of the License, or (at your option) any later
17# version. Please note the following exception: `EnPT` depends on tqdm, which
18# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
19# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
20# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
21#
22# This program is distributed in the hope that it will be useful, but WITHOUT
23# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
24# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
25# details.
26#
27# You should have received a copy of the GNU Lesser General Public License along
28# with this program. If not, see <https://www.gnu.org/licenses/>.
30"""EnPT metadata objects for EnMAP data in sensor geometry."""
32from datetime import datetime
33from lxml import etree as ElementTree
34import logging
35import os
36import fnmatch
37from typing import Union, List, Tuple, Optional # noqa: F401
38from collections import OrderedDict
39from packaging.version import parse as parse_version
40import numpy as np
41from py_tools_ds.geo.vector.topology import Polygon, get_footprint_polygon # noqa: F401 # flake8 issue
42from geoarray import GeoArray
44from ...options.config import EnPTConfig
45from ..srf import SRF
46from ...processors.spatial_transform import RPC_3D_Geolayer_Generator
48__author__ = ['Daniel Scheffler', 'Stéphane Guillaso', 'André Hollstein']
51class EnMAP_Metadata_L1B_Detector_SensorGeo(object):
52 """Class for all EnMAP metadata associated with a single EnMAP detector in sensor geometry.
54 NOTE:
55 - All metadata that have VNIR and SWIR detector in sensor geometry in common should be included here.
57 """
59 def __init__(self, detector_name: str, config: EnPTConfig, logger: logging.Logger = None):
60 """Get a metadata object containing the metadata of a single EnMAP detector in sensor geometry.
62 :param detector_name: Name of the detector (VNIR or SWIR)
63 :param config: EnPT configuration object
64 :param logger: instance of logging.logger or subclassed
65 """
66 from . import L1B_product_props, L1B_product_props_DLR
67 self.cfg = config
68 self.detector_name: str = detector_name
69 if not self.cfg.is_dummy_dataformat:
70 self.detector_label = L1B_product_props_DLR['xml_detector_label'][detector_name]
71 else:
72 self.detector_label = L1B_product_props['xml_detector_label'][detector_name]
73 self.logger = logger or logging.getLogger()
75 # These lines are used to load path information
76 self.filename_data: Optional[str] = '' # detector data filename
77 self.scene_basename: Optional[str] = '' # basename of the EnMAP image
78 self.filename_deadpixelmap: Optional[str] = '' # filename of the dead pixel file
79 self.filename_quicklook: Optional[str] = '' # filename of the quicklook file
80 # FIXME masks of BOTH detectors
81 self.filename_mask_landwater: Optional[str] = '' # filename of the land/water mask file
82 self.filename_mask_snow: Optional[str] = '' # filename of the snow mask file
83 self.filename_mask_cloudshadow: Optional[str] = '' # filename of the cloud shadow mask file
84 self.filename_mask_clouds: Optional[str] = '' # filename of the cloud mask file
85 self.filename_mask_haze: Optional[str] = '' # filename of the haze mask file
86 self.filename_mask_cirrus: Optional[str] = '' # filename of the cirrus mask file
88 self.wvl_center: Optional[np.ndarray] = None # Center wavelengths for each EnMAP band
89 self.fwhm: Optional[np.ndarray] = None # Full width half maximmum for each EnMAP band
90 self.srf: Optional[SRF] = None # SRF object holding the spectral response functions for each EnMAP band
91 self.solar_irrad: Optional[np.ndarray] = None # solar irradiance in [W/m2/nm] for each band
92 self.nwvl: Optional[int] = None # Number of wave bands
93 self.nrows: Optional[int] = None # number of rows
94 self.ncols: Optional[int] = None # number of columns
95 self.smile_coef: Optional[np.ndarray] = None # smile coefficients needed for smile computation
96 self.nsmile_coef: Optional[int] = None # number of smile coefficients
97 self.smile: Optional[np.ndarray] = None # smile for each EnMAP image column
98 self.gains: Optional[np.ndarray] = None # band-wise gains for computing radiance from DNs
99 self.offsets: Optional[np.ndarray] = None # band-wise offsets for computing radiance from DNs
100 self.l_min: Optional[np.ndarray] = None # band-wise l-min for computing radiance from DNs
101 self.l_max: Optional[np.ndarray] = None # band-wise l-max for computing radiance from DNs
102 self.goodbands_inds: Optional[List] = None # list of band indices included in the processing (all other bands are removed) # noqa
103 self.lat_UL_UR_LL_LR: Optional[List[float, float, float, float]] = None # latitude coords for UL, UR, LL, LR
104 self.lon_UL_UR_LL_LR: Optional[List[float, float, float, float]] = None # longitude coords for UL, UR, LL, LR
105 self.epsg_ortho: Optional[int] = None # EPSG code of the orthorectified image
106 self.rpc_coeffs: OrderedDict = OrderedDict() # RPC coefficients for geolayer computation
107 self.ll_mapPoly: Optional[Polygon] = None # footprint polygon in longitude/latitude map coordinates
108 self.lats: Optional[np.ndarray] = None # 2D/3D array of latitude coordinates
109 self.lons: Optional[np.ndarray] = None # 2D/3D array of longitude coordinates
110 self.geolayer_has_keystone: Optional[bool] = None # indicates if lon/lat geolayer considers keystone (3D array)
111 self.unit: str = '' # radiometric unit of pixel values
112 self.unitcode: str = '' # code of radiometric unit
113 self.preview_wvls: Optional[List[float]] = None # wavelengths to be used for quicklook images
114 self.preview_bands: Optional[List[int]] = None # band indices to be used for quicklook images
115 self.snr: Optional[np.ndarray] = None # Signal to noise ratio as computed from radiance data
117 def read_metadata(self, path_xml):
118 """Read the metadata of a specific EnMAP detector in sensor geometry.
120 :param path_xml: file path of the metadata file
121 :return: None
122 """
123 xml = ElementTree.parse(path_xml).getroot()
125 if not self.cfg.is_dummy_dataformat:
126 lbl = self.detector_label
127 self.logger.info("Reading metadata for %s detector..." % self.detector_name)
129 # read data filenames
130 all_filenames = [ele.text for ele in xml.findall("product/productFileInformation/file/name")]
132 def get_filename(matching_exp: str):
133 matches = []
134 for ext in ['', '.TIF', '.GEOTIFF', '.BSQ', '.BIL', '.BIP', 'JPEG2000', '.JP2', '.jp2']:
135 matches.extend(fnmatch.filter(all_filenames, f'{matching_exp}{ext}'))
137 if matches:
138 break
140 if not matches:
141 raise FileNotFoundError("Could not find a file that matches the expression '%s'." % matching_exp)
143 return matches[0]
145 self.filename_data = xml.find("product/image/%s/name" % lbl).text
146 self.scene_basename = self.filename_data.split('-SPECTRAL_IMAGE')[0]
147 self.filename_quicklook = xml.find("product/quicklook/%s/name" % lbl).text
148 self.filename_deadpixelmap = get_filename('*QL_PIXELMASK_%s' % self.detector_name)
149 self.filename_mask_landwater = get_filename('*QL_QUALITY_CLASSES')
150 self.filename_mask_snow = get_filename('*QL_QUALITY_SNOW')
151 self.filename_mask_cloudshadow = get_filename('*QL_QUALITY_CLOUDSHADOW')
152 self.filename_mask_clouds = get_filename('*-QL_QUALITY_CLOUD')
153 self.filename_mask_haze = get_filename('*QL_QUALITY_HAZE')
154 self.filename_mask_cirrus = get_filename('*QL_QUALITY_CIRRUS')
156 # FIXME combine different cloud masks?
157 # TODO: Add test flags layer.
159 # read some basic information concerning the detector
160 self.nrows = int(xml.find("product/image/%s/dimension/rows" % lbl).text)
161 self.ncols = int(xml.find("product/image/%s/dimension/columns" % lbl).text)
162 self.unitcode = 'DN'
163 self.unit = ''
165 # Read image coordinates
166 # FIXME why do we have the same corner coordinates for VNIR and SWIR?
167 points = xml.findall("base/spatialCoverage/boundingPolygon/point")
168 coords_xy = {p.find('frame').text: (float(p.find('longitude').text),
169 float(p.find('latitude').text))
170 for p in points}
171 coord_tags = ['upper_left', 'upper_right', 'lower_left', 'lower_right']
172 self.lon_UL_UR_LL_LR = [coords_xy[ct][0] for ct in coord_tags]
173 self.lat_UL_UR_LL_LR = [coords_xy[ct][1] for ct in coord_tags]
175 # read the band related information: wavelength, fwhm
176 self.nwvl = int(xml.find("product/image/%s/channels" % lbl).text)
177 # FIXME hardcoded - DLR does not provide any smile information
178 # => smile coefficients are set to zero until we have valid ones
179 self.nsmile_coef = 5
180 self.smile_coef = np.zeros((self.nwvl, self.nsmile_coef), dtype=float)
182 startband = 0 if self.detector_name == 'VNIR' else int(xml.find("product/image/vnir/channels").text)
183 subset = slice(startband, startband + self.nwvl)
184 bi = "specific/bandCharacterisation/bandID/"
185 self.wvl_center = np.array([float(ele.text) for ele in xml.findall(bi + "wavelengthCenterOfBand")[subset]])
186 self.fwhm = np.array([float(ele.text) for ele in xml.findall(bi + "FWHMOfBand")[subset]])
187 self.gains = np.array([float(ele.text) for ele in xml.findall(bi + "GainOfBand")[subset]])
188 self.offsets = np.array([float(ele.text) for ele in xml.findall(bi + "OffsetOfBand")[subset]])
190 # read preview bands
191 wvl_red = float(xml.find("product/image/%s/qlChannels/red" % lbl).text)
192 wvl_green = float(xml.find("product/image/%s/qlChannels/green" % lbl).text)
193 wvl_blue = float(xml.find("product/image/%s/qlChannels/blue" % lbl).text)
194 self.preview_wvls = [wvl_red, wvl_green, wvl_blue]
195 self.preview_bands = np.array([np.argmin(np.abs(self.wvl_center - wvl)) for wvl in self.preview_wvls])
197 # read RPC coefficients
198 for bID in xml.findall("product/navigation/RPC/bandID")[subset]:
199 bN = 'band_%d' % np.int64(bID.attrib['number'])
201 keys2combine = ('row_num', 'row_den', 'col_num', 'col_den')
203 tmp = OrderedDict([(ele.tag.lower(), float(ele.text)) for ele in bID.findall('./')])
204 self.rpc_coeffs[bN] = {k: v for k, v in tmp.items() if not k.startswith(keys2combine)}
206 for n in keys2combine:
207 self.rpc_coeffs[bN]['%s_coeffs' % n.lower()] = \
208 np.array([v for k, v in tmp.items() if k.startswith(n)])
210 else:
211 lbl = self.detector_label
212 self.logger.info("Reading metadata for %s detector..." % self.detector_name)
214 # read data filenames
215 self.filename_data = xml.findall("ProductComponent/%s/Data/Filename" % lbl)[0].text
216 self.scene_basename = os.path.splitext(self.filename_data)[0]
217 self.filename_deadpixelmap = xml.findall("ProductComponent/%s/Sensor/DeadPixel/Filename" % lbl)[0].text
218 self.filename_quicklook = xml.findall("ProductComponent/%s/Preview/Filename" % lbl)[0].text
219 self.filename_mask_clouds = xml.findall("ProductComponent/%s/Data/CloudMaskMap/Filename" % lbl)[0].text
221 # read preview bands
222 self.preview_bands = np.zeros(3, dtype=int)
223 self.preview_bands[0] = int(xml.findall("ProductComponent/%s/Preview/Bands/Red" % lbl)[0].text)
224 self.preview_bands[1] = int(xml.findall("ProductComponent/%s/Preview/Bands/Green" % lbl)[0].text)
225 self.preview_bands[2] = int(xml.findall("ProductComponent/%s/Preview/Bands/Blue" % lbl)[0].text)
227 # read some basic information concerning the detector
228 self.nrows = int(xml.findall("ProductComponent/%s/Data/Size/NRows" % lbl)[0].text)
229 self.ncols = int(xml.findall("ProductComponent/%s/Data/Size/NCols" % lbl)[0].text)
230 self.unitcode = xml.findall("ProductComponent/%s/Data/Type/UnitCode" % lbl)[0].text
231 self.unit = xml.findall("ProductComponent/%s/Data/Type/Unit" % lbl)[0].text
233 # Read image coordinates
234 scene_corner_coordinates = xml.findall("ProductComponent/%s/Data/SceneInformation/"
235 "SceneCornerCoordinates" % lbl)
236 self.lat_UL_UR_LL_LR = [
237 float(scene_corner_coordinates[0].findall("Latitude")[0].text),
238 float(scene_corner_coordinates[1].findall("Latitude")[0].text),
239 float(scene_corner_coordinates[2].findall("Latitude")[0].text),
240 float(scene_corner_coordinates[3].findall("Latitude")[0].text)
241 ]
242 self.lon_UL_UR_LL_LR = [
243 float(scene_corner_coordinates[0].findall("Longitude")[0].text),
244 float(scene_corner_coordinates[1].findall("Longitude")[0].text),
245 float(scene_corner_coordinates[2].findall("Longitude")[0].text),
246 float(scene_corner_coordinates[3].findall("Longitude")[0].text)
247 ]
249 # read the band related information: wavelength, fwhm
250 self.nwvl = int(xml.findall("ProductComponent/%s/Data/BandInformationList/NumberOfBands" % lbl)[0].text)
251 self.nsmile_coef = int(xml.findall(
252 "ProductComponent/%s/Data/BandInformationList/SmileInformation/NumberOfCoefficients" % lbl)[0].text)
253 self.fwhm = np.zeros(self.nwvl, dtype=float)
254 self.wvl_center = np.zeros(self.nwvl, dtype=float)
255 self.smile_coef = np.zeros((self.nwvl, self.nsmile_coef), dtype=float)
256 self.l_min = np.zeros(self.nwvl, dtype=float)
257 self.l_max = np.zeros(self.nwvl, dtype=float)
258 band_informations = xml.findall("ProductComponent/%s/Data/BandInformationList/BandInformation" % lbl)
259 for bi in band_informations:
260 k = np.int64(bi.attrib['Id']) - 1
261 self.wvl_center[k] = float(bi.findall("CenterWavelength")[0].text)
262 self.fwhm[k] = float(bi.findall("FullWidthHalfMaximum")[0].text)
263 self.l_min[k] = float(bi.findall("L_min")[0].text)
264 self.l_max[k] = float(bi.findall("L_max")[0].text)
265 scl = bi.findall("Smile/Coefficient")
266 for sc in scl:
267 self.smile_coef[k, np.int64(sc.attrib['exponent'])] = float(sc.text)
269 self.lats = self.interpolate_corners(*self.lat_UL_UR_LL_LR, self.ncols, self.nrows)
270 self.lons = self.interpolate_corners(*self.lon_UL_UR_LL_LR, self.ncols, self.nrows)
271 self.geolayer_has_keystone = False
272 self.preview_wvls = np.array([self.wvl_center[i] for i in self.preview_bands])
274 # drop bad bands from metadata if desired
275 if self.cfg.drop_bad_bands:
276 wvls = list(self.wvl_center)
277 self.goodbands_inds = gbl = [wvls.index(wvl) for wvl in wvls
278 if not (1358 < wvl < 1453 or
279 1814 < wvl < 1961)]
281 if len(gbl) < len(wvls):
282 for attrN in ['wvl_center', 'fwhm', 'offsets', 'gains', 'l_min', 'l_max']:
283 if getattr(self, attrN) is not None:
284 setattr(self, attrN, getattr(self, attrN)[gbl])
286 self.nwvl = len(gbl)
287 self.smile_coef = self.smile_coef[gbl, :]
288 self.rpc_coeffs = OrderedDict([(k, v) for i, (k, v) in enumerate(self.rpc_coeffs.items())
289 if i in gbl])
291 # compute metadata derived from read data
292 self.smile = self.calc_smile()
293 self.srf = SRF.from_cwl_fwhm(self.wvl_center, self.fwhm)
294 self.solar_irrad = self.calc_solar_irradiance_CWL_FWHM_per_band()
295 self.ll_mapPoly = get_footprint_polygon(tuple(zip(self.lon_UL_UR_LL_LR,
296 self.lat_UL_UR_LL_LR)), fix_invalid=True)
297 from ...processors.spatial_transform import get_UTMEPSG_from_LonLat_cornersXY
298 # NOTE: self.cfg.target_epsg is set if user-provided or in case of Lon/Lat coordinates
299 self.epsg_ortho = \
300 self.cfg.target_epsg or \
301 get_UTMEPSG_from_LonLat_cornersXY(lons=self.lon_UL_UR_LL_LR, lats=self.lat_UL_UR_LL_LR)
303 def calc_smile(self):
304 """Compute smile for each EnMAP column.
306 The sum in (1) is expressed as inner product of two arrays with inner dimension as the polynomial smile
307 coefficients shape = (ncols, nsmile_coef) of polynomial x
309 :return:
310 """
311 # smile(icol, iwvl) = sum_(p=0)^(nsmile_coef-1) smile_coef[iwvl, p] * icol**p (1)
312 return np.inner(
313 np.array([[icol ** p for p in np.arange(self.nsmile_coef)] for icol in np.arange(self.ncols)]),
314 self.smile_coef # shape = (nwvl, nsmile_coef)
315 ) # shape = (ncols, nwvl)
317 def calc_snr_from_radiance(self, rad_data: Union[GeoArray, np.ndarray], dir_snr_models: str):
318 """Compute EnMAP SNR from radiance data for the given detector.
320 SNR equation: SNR = p0 + p1 * LTOA + p2 * LTOA ^ 2 [W/(m^2 sr nm)].
322 NOTE: The SNR model files (SNR_D1.bsq/SNR_D2.bsq) contain polynomial coefficients needed to compute SNR.
324 SNR_D1.bsq: SNR model for EnMAP VNIR detector (contains high gain and low gain model coefficients)
326 - 1000 columns (for 1000 EnMAP columns)
327 - 88 bands for 88 EnMAP VNIR bands
328 - 7 lines: - 3 lines: high gain coefficients
329 - 3 lines: low gain coefficients
330 - 1 line: threshold needed to decide about high gain or low gain
332 SNR_D2.bsq: SNR model for EnMAP SWIR detector
334 - 1000 columns (for 1000 EnMAP columns)
335 - x bands for x EnMAP SWIR bands
336 - 3 lines for 3 coefficients
338 :param rad_data: image radiance data of EnMAP_Detector_SensorGeo
339 :param dir_snr_models: root directory where SNR model data is stored (must contain SNR_D1.bsq/SNR_D2.bsq)
340 """
341 rad_data = np.array(rad_data)
342 assert self.unitcode == 'TOARad'
343 self.logger.info(f"Computing SNR from {self.detector_name} TOA radiance.")
345 if self.detector_name == 'VNIR':
346 gA = self._get_snr_model(dir_snr_models)
348 coeffs_highgain = gA[0:3, :, :] # [3 x ncols x nbands]
349 coeffs_lowgain = gA[3:6, :, :] # [3 x ncols x nbands]
350 gain_threshold = np.squeeze(gA[6, :, :])
352 self.snr = np.zeros(rad_data.shape)
353 for irow in range(self.nrows):
354 highgain_mask = rad_data[irow, :, :] < gain_threshold # a single row
355 rad_highgain = rad_data[irow, highgain_mask]
356 self.snr[irow, :, :][highgain_mask] = \
357 coeffs_highgain[0, highgain_mask] + \
358 coeffs_highgain[1, highgain_mask] * rad_highgain + \
359 coeffs_highgain[2, highgain_mask] * rad_highgain ** 2
361 lowgain_mask = rad_data[irow, :, :] >= gain_threshold
362 rad_lowgain = rad_data[irow, lowgain_mask]
363 self.snr[irow, :, :][lowgain_mask] = \
364 coeffs_lowgain[0, lowgain_mask] + \
365 coeffs_lowgain[1, lowgain_mask] * rad_lowgain + \
366 coeffs_lowgain[2, lowgain_mask] * rad_lowgain ** 2
368 else:
369 gA = self._get_snr_model(dir_snr_models)
370 coeffs = gA[:]
371 self.snr = coeffs[0, :, :] + coeffs[1, :, :] * rad_data[:, :, :] + coeffs[2, :, :] * rad_data[:, :, :] ** 2
373 def _get_snr_model(self, dir_snr_models: str) -> GeoArray:
374 """Get the SNR model coefficients for the current detector.
376 NOTE: Missing bands are linearly interpolated.
378 :param dir_snr_models: directory containing the SNR models
379 """
380 path_snr_model = os.path.join(dir_snr_models, "SNR_D1.bsq" if self.detector_name == 'VNIR' else "SNR_D2.bsq")
381 gA = GeoArray(path_snr_model)
383 if gA.bands == self.nwvl:
384 return gA
386 else:
387 from scipy.interpolate import make_interp_spline
388 # equivalent to legacy scipy.interpolate.interp1d
389 # interp1d(wvl, gA[:], axis=2, kind='linear', fill_value="extrapolate", bounds_error=False)(self.wvl_center)
390 coeffs_interp = make_interp_spline(gA.meta.band_meta['wavelength'], gA[:], k=1, axis=2)(self.wvl_center)
391 gA_ = GeoArray(coeffs_interp)
392 gA_.meta.band_meta['wavelength'] = self.wvl_center
394 return gA_
396 @staticmethod
397 def interpolate_corners(ul: float, ur: float, ll: float, lr: float, nx: int, ny: int):
398 """Compute interpolated field from corner values of a scalar field given at: ul, ur, ll, lr.
400 :param ul: tbd
401 :param ur: tbd
402 :param ll: tbd
403 :param lr: tbd
404 :param nx: final shape (x-axis direction)
405 :param ny: final shape (y-axis direction)
406 """
407 # FIXME this method must later be replaced by the geolayer provided by the ground segment
408 # => a linear interpolation between the EnMAP corner coordinates is NOT sufficient for modelling the
409 # geometry of VNIR and SWIR
410 # - especially at off-nadir acquisitions and with some terrain present, a linear interpolation leads
411 # to large deviations (> 180 m y-coordinate offset for the EnPT test dataset)
413 # TODO ensure that lons/lats represent UL coordinates not pixel coordinate centers (as given by Karl / DLR(?))
415 corner_coords = np.array([[ul, ur],
416 [ll, lr]])
417 rowpos, colpos = [0, 1], [0, 1]
419 from scipy.interpolate import RegularGridInterpolator
420 rgi = RegularGridInterpolator([rowpos, colpos], corner_coords, method='linear')
421 out_rows_grid, out_cols_grid = np.meshgrid(np.linspace(0, 1, ny),
422 np.linspace(0, 1, nx),
423 indexing='ij')
425 coords = rgi(np.dstack([out_rows_grid, out_cols_grid]))
427 return coords
429 def compute_geolayer_for_cube(self):
430 self.logger.info('Computing %s geolayer...' % self.detector_name)
431 GeolayerGen = \
432 RPC_3D_Geolayer_Generator(
433 rpc_coeffs_per_band=self.rpc_coeffs,
434 elevation=self.cfg.path_dem if self.cfg.path_dem else self.cfg.average_elevation,
435 enmapIm_cornerCoords=tuple(zip(self.lon_UL_UR_LL_LR, self.lat_UL_UR_LL_LR)),
436 enmapIm_dims_sensorgeo=(self.nrows, self.ncols),
437 CPUs=self.cfg.CPUs
438 )
439 lons, lats = GeolayerGen.compute_geolayer()
441 self.geolayer_has_keystone = len(GeolayerGen.bandgroups_with_unique_rpc_coeffs) > 1
443 return lons, lats
445 def calc_solar_irradiance_CWL_FWHM_per_band(self) -> np.array:
446 from ...io.reader import read_solar_irradiance
448 self.logger.debug('Calculating solar irradiance...')
450 sol_irr = read_solar_irradiance(path_solar_irr_model=self.cfg.path_solar_irr, wvl_min_nm=350, wvl_max_nm=2500)
452 irr_bands = []
453 for band in self.srf.bands:
454 WVL_band = self.srf.srfs_wvl if self.srf.wvl_unit == 'nanometers' else self.srf.srfs_wvl * 1000
455 RSP_band = self.srf.srfs_norm01[band]
456 sol_irr_at_WVL = np.interp(WVL_band, sol_irr[:, 0], sol_irr[:, 1], left=0, right=0)
458 irr_bands.append(np.round(np.sum(sol_irr_at_WVL * RSP_band) / np.sum(RSP_band), 2))
460 return np.array(irr_bands)
463class EnMAP_Metadata_L1B_SensorGeo(object):
464 """EnMAP Metadata class for the metadata of the complete EnMAP L1B product in sensor geometry incl. VNIR and SWIR.
466 Attributes:
467 - logger(logging.Logger): None or logging instance
468 - observation_datetime(datetime.datetime): datetime of observation time
469 - geom_view_zenith: viewing zenith angle
470 - geom_view_azimuth: viewing azimuth angle
471 - geom_sun_zenith: sun zenith angle
472 - geom_sun_azimuth: sun azimuth angle
473 - mu_sun: needed by SICOR for TOARad > TOARef conversion
474 - vnir(EnMAP_Metadata_VNIR_SensorGeo)
475 - swir(EnMAP_Metadata_SWIR_SensorGeo)
476 - detector_attrNames: attribute names of the detector objects
477 """
479 def __init__(self, path_metaxml, config: EnPTConfig, logger=None):
480 """Get a metadata object instance for the given EnMAP L1B product in sensor geometry.
482 :param path_metaxml: file path of the EnMAP L1B metadata XML file
483 :para, config: EnPT configuration object
484 :param logger: instance of logging.logger or subclassed
485 """
486 self.cfg = config
487 self.logger = logger or logging.getLogger()
488 self.path_xml = path_metaxml
489 self.rootdir = os.path.dirname(path_metaxml)
491 # defaults - Common
492 self.proc_level: Optional[str] = None # Dataset processing level
493 self.version_provider: Optional[str] = '' # version of ground segment processing system
494 self.observation_datetime: Optional[datetime] = None # Date and Time of image observation
495 self.geom_view_zenith: Optional[float] = None # viewing zenith angle
496 self.geom_view_azimuth: Optional[float] = None # viewing azimuth angle
497 self.geom_sun_zenith: Optional[float] = None # sun zenith angle
498 self.geom_sun_azimuth: Optional[float] = None # sun azimuth angle
499 self.mu_sun: Optional[float] = None # needed by SICOR for TOARad > TOARef conversion
500 self.earthSunDist: Optional[float] = None # earth-sun distance
501 self.aot: Optional[float] = None # scene aerosol optical thickness
502 self.water_vapour: Optional[float] = None # scene water vapour [cm]
503 self.vnir: Optional[EnMAP_Metadata_L1B_Detector_SensorGeo] = None # metadata of VNIR only
504 self.swir: Optional[EnMAP_Metadata_L1B_Detector_SensorGeo] = None # metadata of SWIR only
505 self.detector_attrNames: list = ['vnir', 'swir'] # attribute names of the detector objects
506 self.filename_metaxml: Optional[str] = None # filename of XML metadata file
508 self._scene_basename: Optional[str] = None # basename of the EnMAP image
510 @property
511 def scene_basename(self):
512 if self.vnir:
513 self._scene_basename = self.vnir.scene_basename
515 return self._scene_basename
517 def read_common_meta(self, path_xml):
518 """Read the common metadata, principally stored in General Info.
520 - the acquisition time
521 - the geometrical observation and illumination
523 :param path_xml: path to the main xml file
524 :return: None
525 """
526 # load the metadata xml file
527 xml = ElementTree.parse(path_xml).getroot()
529 self.filename_metaxml = os.path.basename(path_xml)
531 if not self.cfg.is_dummy_dataformat:
532 # read processing level
533 self.proc_level = xml.find("base/level").text
534 if self.proc_level != 'L1B':
535 raise RuntimeError(self.proc_level, "Unexpected input data processing level. Expected 'L1B'.")
537 # read version of ground segment processing system
538 self.version_provider = xml.find("base/revision").text
540 # raise a warning in case of old processing version (de-striping was implemented in version 01.02.00)
541 if parse_version(self.version_provider) < parse_version('01.02.00'):
542 self.logger.warning(
543 f"The input EnMAP Level-1B image was processed with an old version of the ground segment "
544 f"processing system (version {self.version_provider}), which, e.g. did not include de-striping. "
545 f"It is highly recommended to re-download the dataset in the latest processing version from the "
546 f"archive via the EOWEB GeoPortal (www.eoweb.dlr.de) before passing it to EnPT.")
548 # read the acquisition time
549 self.observation_datetime = \
550 datetime.strptime(xml.find("base/temporalCoverage/startTime").text, '%Y-%m-%dT%H:%M:%S.%fZ')
552 # get the distance earth sun from the acquisition date
553 self.earthSunDist = self.get_earth_sun_distance(self.observation_datetime)
555 # read Geometry (observation/illumination) angle
556 # NOTE: EnMAP metadata provide also the angles for the image corners
557 # -> would allow even more precise computation (e.g., specific/sunElevationAngle/upper_left)
558 # NOTE: alongOffNadirAngle is always near 0 and therefore ignored here (not relevant for AC)
559 # FIXME VZA may be negative in DLR L1B data -> correct to always use the absolute value for SICOR?
560 self.geom_view_zenith = np.abs(float(xml.find("specific/acrossOffNadirAngle/center").text))
561 # FIXME correct to directly use sceneAzimuthAngle (14.24 (DLR) vs. 101.1 (AlpineTest)
562 self.geom_view_azimuth = float(xml.find("specific/sceneAzimuthAngle/center").text)
563 self.geom_sun_zenith = 90 - float(xml.find("specific/sunElevationAngle/center").text)
564 self.geom_sun_azimuth = float(xml.find("specific/sunAzimuthAngle/center").text)
565 self.mu_sun = np.cos(np.deg2rad(self.geom_sun_zenith))
566 self.aot = float(xml.find("specific/qualityFlag/sceneAOT").text) / 1000 # scale factor is 1000
567 self.water_vapour = float(xml.find("specific/qualityFlag/sceneWV").text) / 1000 # scale factor is 1000
568 else:
569 # read the acquisition time
570 self.observation_datetime = \
571 datetime.strptime(xml.findall("GeneralInfo/ProductInfo/ProductStartTime")[0].text,
572 '%Y-%m-%dT%H:%M:%S.%fZ')
574 # get the distance earth sun from the acquisition date
575 self.earthSunDist = self.get_earth_sun_distance(self.observation_datetime)
577 # read Geometry (observation/illumination) angle
578 self.geom_view_zenith = float(xml.findall("GeneralInfo/Geometry/Observation/ZenithAngle")[0].text)
579 self.geom_view_azimuth = float(xml.findall("GeneralInfo/Geometry/Observation/AzimuthAngle")[0].text)
580 self.geom_sun_zenith = float(xml.findall("GeneralInfo/Geometry/Illumination/ZenithAngle")[0].text)
581 self.geom_sun_azimuth = float(xml.findall("GeneralInfo/Geometry/Illumination/AzimuthAngle")[0].text)
582 self.mu_sun = np.cos(np.deg2rad(self.geom_sun_zenith))
584 def get_earth_sun_distance(self, acqDate: datetime):
585 """Get earth-sun distance (requires file of pre-calculated earth sun distance per day).
587 :param acqDate:
588 """
589 if not os.path.exists(self.cfg.path_earthSunDist):
590 self.logger.warning("\n\t WARNING: Earth Sun Distance is assumed to be "
591 "1.0 because no database can be found at %s.""" % self.cfg.path_earthSunDist)
592 return 1.0
593 if not acqDate:
594 self.logger.warning("\n\t WARNING: Earth Sun Distance is assumed to be 1.0 because "
595 "acquisition date could not be read from metadata.")
596 return 1.0
598 with open(self.cfg.path_earthSunDist, "r") as EA_dist_f:
599 EA_dist_dict = {}
600 for line in EA_dist_f:
601 date, EA = [item.strip() for item in line.split(",")]
602 EA_dist_dict[date] = EA
604 return float(EA_dist_dict[acqDate.strftime('%Y-%m-%d')])
606 def read_metadata(self):
607 """Read the metadata of the entire EnMAP L1B product in sensor geometry."""
608 # first read common metadata
609 self.read_common_meta(self.path_xml)
611 # define and read the VNIR metadata
612 self.vnir = EnMAP_Metadata_L1B_Detector_SensorGeo('VNIR', config=self.cfg, logger=self.logger)
613 self.vnir.read_metadata(self.path_xml)
615 # define and read the SWIR metadata
616 self.swir = EnMAP_Metadata_L1B_Detector_SensorGeo('SWIR', config=self.cfg, logger=self.logger)
617 self.swir.read_metadata(self.path_xml)
619 def to_XML(self) -> str:
620 """Generate an XML metadata string from the L1B metadata."""
621 from . import L1B_product_props, L1B_product_props_DLR
622 xml = ElementTree.parse(self.path_xml).getroot()
624 if not self.cfg.is_dummy_dataformat:
625 for detName, detMeta in zip(['VNIR', 'SWIR'], [self.vnir, self.swir]):
626 lbl = L1B_product_props_DLR['xml_detector_label'][detName]
627 xml.find("product/image/%s/dimension/rows" % lbl).text = str(detMeta.nrows)
628 xml.find("product/image/%s/dimension/columns" % lbl).text = str(detMeta.ncols)
629 xml.find("product/quicklook/%s/dimension/rows" % lbl).text = str(detMeta.nrows)
630 xml.find("product/quicklook/%s/dimension/columns" % lbl).text = str(detMeta.ncols)
632 else:
633 for detName, detMeta in zip(['VNIR', 'SWIR'], [self.vnir, self.swir]):
634 lbl = L1B_product_props['xml_detector_label'][detName]
635 xml.find("ProductComponent/%s/Data/Size/NRows" % lbl).text = str(detMeta.nrows)
636 xml.find("ProductComponent/%s/Data/Type/UnitCode" % lbl).text = detMeta.unitcode
637 xml.find("ProductComponent/%s/Data/Type/Unit" % lbl).text = detMeta.unit
639 xml_string = ElementTree.tostring(xml, encoding='unicode', pretty_print=True)
641 return xml_string