Coverage for enpt/model/metadata/metadata_sensorgeo.py: 98%

321 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-03-07 11:39 +0000

1# -*- coding: utf-8 -*- 

2 

3# EnPT, EnMAP Processing Tool - A Python package for pre-processing of EnMAP Level-1B data 

4# 

5# Copyright (C) 2018-2024 Karl Segl (GFZ Potsdam, segl@gfz-potsdam.de), Daniel Scheffler 

6# (GFZ Potsdam, danschef@gfz-potsdam.de), Niklas Bohn (GFZ Potsdam, nbohn@gfz-potsdam.de), 

7# Stéphane Guillaso (GFZ Potsdam, stephane.guillaso@gfz-potsdam.de) 

8# 

9# This software was developed within the context of the EnMAP project supported 

10# by the DLR Space Administration with funds of the German Federal Ministry of 

11# Economic Affairs and Energy (on the basis of a decision by the German Bundestag: 

12# 50 EE 1529) and contributions from DLR, GFZ and OHB System AG. 

13# 

14# This program is free software: you can redistribute it and/or modify it under 

15# the terms of the GNU General Public License as published by the Free Software 

16# Foundation, either version 3 of the License, or (at your option) any later 

17# version. Please note the following exception: `EnPT` depends on tqdm, which 

18# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files 

19# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore". 

20# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE. 

21# 

22# This program is distributed in the hope that it will be useful, but WITHOUT 

23# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

24# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

25# details. 

26# 

27# You should have received a copy of the GNU Lesser General Public License along 

28# with this program. If not, see <https://www.gnu.org/licenses/>. 

29 

30"""EnPT metadata objects for EnMAP data in sensor geometry.""" 

31 

32from datetime import datetime 

33from lxml import etree as ElementTree 

34import logging 

35import os 

36import fnmatch 

37from typing import Union, List, Tuple, Optional # noqa: F401 

38from collections import OrderedDict 

39from packaging.version import parse as parse_version 

40import numpy as np 

41from py_tools_ds.geo.vector.topology import Polygon, get_footprint_polygon # noqa: F401 # flake8 issue 

42from geoarray import GeoArray 

43 

44from ...options.config import EnPTConfig 

45from ..srf import SRF 

46from ...processors.spatial_transform import RPC_3D_Geolayer_Generator 

47 

48__author__ = ['Daniel Scheffler', 'Stéphane Guillaso', 'André Hollstein'] 

49 

50 

51class EnMAP_Metadata_L1B_Detector_SensorGeo(object): 

52 """Class for all EnMAP metadata associated with a single EnMAP detector in sensor geometry. 

53 

54 NOTE: 

55 - All metadata that have VNIR and SWIR detector in sensor geometry in common should be included here. 

56 

57 """ 

58 

59 def __init__(self, detector_name: str, config: EnPTConfig, logger: logging.Logger = None): 

60 """Get a metadata object containing the metadata of a single EnMAP detector in sensor geometry. 

61 

62 :param detector_name: Name of the detector (VNIR or SWIR) 

63 :param config: EnPT configuration object 

64 :param logger: instance of logging.logger or subclassed 

65 """ 

66 from . import L1B_product_props, L1B_product_props_DLR 

67 self.cfg = config 

68 self.detector_name: str = detector_name 

69 if not self.cfg.is_dummy_dataformat: 

70 self.detector_label = L1B_product_props_DLR['xml_detector_label'][detector_name] 

71 else: 

72 self.detector_label = L1B_product_props['xml_detector_label'][detector_name] 

73 self.logger = logger or logging.getLogger() 

74 

75 # These lines are used to load path information 

76 self.filename_data: Optional[str] = '' # detector data filename 

77 self.scene_basename: Optional[str] = '' # basename of the EnMAP image 

78 self.filename_deadpixelmap: Optional[str] = '' # filename of the dead pixel file 

79 self.filename_quicklook: Optional[str] = '' # filename of the quicklook file 

80 # FIXME masks of BOTH detectors 

81 self.filename_mask_landwater: Optional[str] = '' # filename of the land/water mask file 

82 self.filename_mask_snow: Optional[str] = '' # filename of the snow mask file 

83 self.filename_mask_cloudshadow: Optional[str] = '' # filename of the cloud shadow mask file 

84 self.filename_mask_clouds: Optional[str] = '' # filename of the cloud mask file 

85 self.filename_mask_haze: Optional[str] = '' # filename of the haze mask file 

86 self.filename_mask_cirrus: Optional[str] = '' # filename of the cirrus mask file 

87 

88 self.wvl_center: Optional[np.ndarray] = None # Center wavelengths for each EnMAP band 

89 self.fwhm: Optional[np.ndarray] = None # Full width half maximmum for each EnMAP band 

90 self.srf: Optional[SRF] = None # SRF object holding the spectral response functions for each EnMAP band 

91 self.solar_irrad: Optional[np.ndarray] = None # solar irradiance in [W/m2/nm] for each band 

92 self.nwvl: Optional[int] = None # Number of wave bands 

93 self.nrows: Optional[int] = None # number of rows 

94 self.ncols: Optional[int] = None # number of columns 

95 self.smile_coef: Optional[np.ndarray] = None # smile coefficients needed for smile computation 

96 self.nsmile_coef: Optional[int] = None # number of smile coefficients 

97 self.smile: Optional[np.ndarray] = None # smile for each EnMAP image column 

98 self.gains: Optional[np.ndarray] = None # band-wise gains for computing radiance from DNs 

99 self.offsets: Optional[np.ndarray] = None # band-wise offsets for computing radiance from DNs 

100 self.l_min: Optional[np.ndarray] = None # band-wise l-min for computing radiance from DNs 

101 self.l_max: Optional[np.ndarray] = None # band-wise l-max for computing radiance from DNs 

102 self.goodbands_inds: Optional[List] = None # list of band indices included in the processing (all other bands are removed) # noqa 

103 self.lat_UL_UR_LL_LR: Optional[List[float, float, float, float]] = None # latitude coords for UL, UR, LL, LR 

104 self.lon_UL_UR_LL_LR: Optional[List[float, float, float, float]] = None # longitude coords for UL, UR, LL, LR 

105 self.epsg_ortho: Optional[int] = None # EPSG code of the orthorectified image 

106 self.rpc_coeffs: OrderedDict = OrderedDict() # RPC coefficients for geolayer computation 

107 self.ll_mapPoly: Optional[Polygon] = None # footprint polygon in longitude/latitude map coordinates 

108 self.lats: Optional[np.ndarray] = None # 2D/3D array of latitude coordinates 

109 self.lons: Optional[np.ndarray] = None # 2D/3D array of longitude coordinates 

110 self.geolayer_has_keystone: Optional[bool] = None # indicates if lon/lat geolayer considers keystone (3D array) 

111 self.unit: str = '' # radiometric unit of pixel values 

112 self.unitcode: str = '' # code of radiometric unit 

113 self.preview_wvls: Optional[List[float]] = None # wavelengths to be used for quicklook images 

114 self.preview_bands: Optional[List[int]] = None # band indices to be used for quicklook images 

115 self.snr: Optional[np.ndarray] = None # Signal to noise ratio as computed from radiance data 

116 

117 def read_metadata(self, path_xml): 

118 """Read the metadata of a specific EnMAP detector in sensor geometry. 

119 

120 :param path_xml: file path of the metadata file 

121 :return: None 

122 """ 

123 xml = ElementTree.parse(path_xml).getroot() 

124 

125 if not self.cfg.is_dummy_dataformat: 

126 lbl = self.detector_label 

127 self.logger.info("Reading metadata for %s detector..." % self.detector_name) 

128 

129 # read data filenames 

130 all_filenames = [ele.text for ele in xml.findall("product/productFileInformation/file/name")] 

131 

132 def get_filename(matching_exp: str): 

133 matches = [] 

134 for ext in ['', '.TIF', '.GEOTIFF', '.BSQ', '.BIL', '.BIP', 'JPEG2000', '.JP2', '.jp2']: 

135 matches.extend(fnmatch.filter(all_filenames, f'{matching_exp}{ext}')) 

136 

137 if matches: 

138 break 

139 

140 if not matches: 

141 raise FileNotFoundError("Could not find a file that matches the expression '%s'." % matching_exp) 

142 

143 return matches[0] 

144 

145 self.filename_data = xml.find("product/image/%s/name" % lbl).text 

146 self.scene_basename = self.filename_data.split('-SPECTRAL_IMAGE')[0] 

147 self.filename_quicklook = xml.find("product/quicklook/%s/name" % lbl).text 

148 self.filename_deadpixelmap = get_filename('*QL_PIXELMASK_%s' % self.detector_name) 

149 self.filename_mask_landwater = get_filename('*QL_QUALITY_CLASSES') 

150 self.filename_mask_snow = get_filename('*QL_QUALITY_SNOW') 

151 self.filename_mask_cloudshadow = get_filename('*QL_QUALITY_CLOUDSHADOW') 

152 self.filename_mask_clouds = get_filename('*-QL_QUALITY_CLOUD') 

153 self.filename_mask_haze = get_filename('*QL_QUALITY_HAZE') 

154 self.filename_mask_cirrus = get_filename('*QL_QUALITY_CIRRUS') 

155 

156 # FIXME combine different cloud masks? 

157 # TODO: Add test flags layer. 

158 

159 # read some basic information concerning the detector 

160 self.nrows = int(xml.find("product/image/%s/dimension/rows" % lbl).text) 

161 self.ncols = int(xml.find("product/image/%s/dimension/columns" % lbl).text) 

162 self.unitcode = 'DN' 

163 self.unit = '' 

164 

165 # Read image coordinates 

166 # FIXME why do we have the same corner coordinates for VNIR and SWIR? 

167 points = xml.findall("base/spatialCoverage/boundingPolygon/point") 

168 coords_xy = {p.find('frame').text: (float(p.find('longitude').text), 

169 float(p.find('latitude').text)) 

170 for p in points} 

171 coord_tags = ['upper_left', 'upper_right', 'lower_left', 'lower_right'] 

172 self.lon_UL_UR_LL_LR = [coords_xy[ct][0] for ct in coord_tags] 

173 self.lat_UL_UR_LL_LR = [coords_xy[ct][1] for ct in coord_tags] 

174 

175 # read the band related information: wavelength, fwhm 

176 self.nwvl = int(xml.find("product/image/%s/channels" % lbl).text) 

177 # FIXME hardcoded - DLR does not provide any smile information 

178 # => smile coefficients are set to zero until we have valid ones 

179 self.nsmile_coef = 5 

180 self.smile_coef = np.zeros((self.nwvl, self.nsmile_coef), dtype=float) 

181 

182 startband = 0 if self.detector_name == 'VNIR' else int(xml.find("product/image/vnir/channels").text) 

183 subset = slice(startband, startband + self.nwvl) 

184 bi = "specific/bandCharacterisation/bandID/" 

185 self.wvl_center = np.array([float(ele.text) for ele in xml.findall(bi + "wavelengthCenterOfBand")[subset]]) 

186 self.fwhm = np.array([float(ele.text) for ele in xml.findall(bi + "FWHMOfBand")[subset]]) 

187 self.gains = np.array([float(ele.text) for ele in xml.findall(bi + "GainOfBand")[subset]]) 

188 self.offsets = np.array([float(ele.text) for ele in xml.findall(bi + "OffsetOfBand")[subset]]) 

189 

190 # read preview bands 

191 wvl_red = float(xml.find("product/image/%s/qlChannels/red" % lbl).text) 

192 wvl_green = float(xml.find("product/image/%s/qlChannels/green" % lbl).text) 

193 wvl_blue = float(xml.find("product/image/%s/qlChannels/blue" % lbl).text) 

194 self.preview_wvls = [wvl_red, wvl_green, wvl_blue] 

195 self.preview_bands = np.array([np.argmin(np.abs(self.wvl_center - wvl)) for wvl in self.preview_wvls]) 

196 

197 # read RPC coefficients 

198 for bID in xml.findall("product/navigation/RPC/bandID")[subset]: 

199 bN = 'band_%d' % np.int64(bID.attrib['number']) 

200 

201 keys2combine = ('row_num', 'row_den', 'col_num', 'col_den') 

202 

203 tmp = OrderedDict([(ele.tag.lower(), float(ele.text)) for ele in bID.findall('./')]) 

204 self.rpc_coeffs[bN] = {k: v for k, v in tmp.items() if not k.startswith(keys2combine)} 

205 

206 for n in keys2combine: 

207 self.rpc_coeffs[bN]['%s_coeffs' % n.lower()] = \ 

208 np.array([v for k, v in tmp.items() if k.startswith(n)]) 

209 

210 else: 

211 lbl = self.detector_label 

212 self.logger.info("Reading metadata for %s detector..." % self.detector_name) 

213 

214 # read data filenames 

215 self.filename_data = xml.findall("ProductComponent/%s/Data/Filename" % lbl)[0].text 

216 self.scene_basename = os.path.splitext(self.filename_data)[0] 

217 self.filename_deadpixelmap = xml.findall("ProductComponent/%s/Sensor/DeadPixel/Filename" % lbl)[0].text 

218 self.filename_quicklook = xml.findall("ProductComponent/%s/Preview/Filename" % lbl)[0].text 

219 self.filename_mask_clouds = xml.findall("ProductComponent/%s/Data/CloudMaskMap/Filename" % lbl)[0].text 

220 

221 # read preview bands 

222 self.preview_bands = np.zeros(3, dtype=int) 

223 self.preview_bands[0] = int(xml.findall("ProductComponent/%s/Preview/Bands/Red" % lbl)[0].text) 

224 self.preview_bands[1] = int(xml.findall("ProductComponent/%s/Preview/Bands/Green" % lbl)[0].text) 

225 self.preview_bands[2] = int(xml.findall("ProductComponent/%s/Preview/Bands/Blue" % lbl)[0].text) 

226 

227 # read some basic information concerning the detector 

228 self.nrows = int(xml.findall("ProductComponent/%s/Data/Size/NRows" % lbl)[0].text) 

229 self.ncols = int(xml.findall("ProductComponent/%s/Data/Size/NCols" % lbl)[0].text) 

230 self.unitcode = xml.findall("ProductComponent/%s/Data/Type/UnitCode" % lbl)[0].text 

231 self.unit = xml.findall("ProductComponent/%s/Data/Type/Unit" % lbl)[0].text 

232 

233 # Read image coordinates 

234 scene_corner_coordinates = xml.findall("ProductComponent/%s/Data/SceneInformation/" 

235 "SceneCornerCoordinates" % lbl) 

236 self.lat_UL_UR_LL_LR = [ 

237 float(scene_corner_coordinates[0].findall("Latitude")[0].text), 

238 float(scene_corner_coordinates[1].findall("Latitude")[0].text), 

239 float(scene_corner_coordinates[2].findall("Latitude")[0].text), 

240 float(scene_corner_coordinates[3].findall("Latitude")[0].text) 

241 ] 

242 self.lon_UL_UR_LL_LR = [ 

243 float(scene_corner_coordinates[0].findall("Longitude")[0].text), 

244 float(scene_corner_coordinates[1].findall("Longitude")[0].text), 

245 float(scene_corner_coordinates[2].findall("Longitude")[0].text), 

246 float(scene_corner_coordinates[3].findall("Longitude")[0].text) 

247 ] 

248 

249 # read the band related information: wavelength, fwhm 

250 self.nwvl = int(xml.findall("ProductComponent/%s/Data/BandInformationList/NumberOfBands" % lbl)[0].text) 

251 self.nsmile_coef = int(xml.findall( 

252 "ProductComponent/%s/Data/BandInformationList/SmileInformation/NumberOfCoefficients" % lbl)[0].text) 

253 self.fwhm = np.zeros(self.nwvl, dtype=float) 

254 self.wvl_center = np.zeros(self.nwvl, dtype=float) 

255 self.smile_coef = np.zeros((self.nwvl, self.nsmile_coef), dtype=float) 

256 self.l_min = np.zeros(self.nwvl, dtype=float) 

257 self.l_max = np.zeros(self.nwvl, dtype=float) 

258 band_informations = xml.findall("ProductComponent/%s/Data/BandInformationList/BandInformation" % lbl) 

259 for bi in band_informations: 

260 k = np.int64(bi.attrib['Id']) - 1 

261 self.wvl_center[k] = float(bi.findall("CenterWavelength")[0].text) 

262 self.fwhm[k] = float(bi.findall("FullWidthHalfMaximum")[0].text) 

263 self.l_min[k] = float(bi.findall("L_min")[0].text) 

264 self.l_max[k] = float(bi.findall("L_max")[0].text) 

265 scl = bi.findall("Smile/Coefficient") 

266 for sc in scl: 

267 self.smile_coef[k, np.int64(sc.attrib['exponent'])] = float(sc.text) 

268 

269 self.lats = self.interpolate_corners(*self.lat_UL_UR_LL_LR, self.ncols, self.nrows) 

270 self.lons = self.interpolate_corners(*self.lon_UL_UR_LL_LR, self.ncols, self.nrows) 

271 self.geolayer_has_keystone = False 

272 self.preview_wvls = np.array([self.wvl_center[i] for i in self.preview_bands]) 

273 

274 # drop bad bands from metadata if desired 

275 if self.cfg.drop_bad_bands: 

276 wvls = list(self.wvl_center) 

277 self.goodbands_inds = gbl = [wvls.index(wvl) for wvl in wvls 

278 if not (1358 < wvl < 1453 or 

279 1814 < wvl < 1961)] 

280 

281 if len(gbl) < len(wvls): 

282 for attrN in ['wvl_center', 'fwhm', 'offsets', 'gains', 'l_min', 'l_max']: 

283 if getattr(self, attrN) is not None: 

284 setattr(self, attrN, getattr(self, attrN)[gbl]) 

285 

286 self.nwvl = len(gbl) 

287 self.smile_coef = self.smile_coef[gbl, :] 

288 self.rpc_coeffs = OrderedDict([(k, v) for i, (k, v) in enumerate(self.rpc_coeffs.items()) 

289 if i in gbl]) 

290 

291 # compute metadata derived from read data 

292 self.smile = self.calc_smile() 

293 self.srf = SRF.from_cwl_fwhm(self.wvl_center, self.fwhm) 

294 self.solar_irrad = self.calc_solar_irradiance_CWL_FWHM_per_band() 

295 self.ll_mapPoly = get_footprint_polygon(tuple(zip(self.lon_UL_UR_LL_LR, 

296 self.lat_UL_UR_LL_LR)), fix_invalid=True) 

297 from ...processors.spatial_transform import get_UTMEPSG_from_LonLat_cornersXY 

298 # NOTE: self.cfg.target_epsg is set if user-provided or in case of Lon/Lat coordinates 

299 self.epsg_ortho = \ 

300 self.cfg.target_epsg or \ 

301 get_UTMEPSG_from_LonLat_cornersXY(lons=self.lon_UL_UR_LL_LR, lats=self.lat_UL_UR_LL_LR) 

302 

303 def calc_smile(self): 

304 """Compute smile for each EnMAP column. 

305 

306 The sum in (1) is expressed as inner product of two arrays with inner dimension as the polynomial smile 

307 coefficients shape = (ncols, nsmile_coef) of polynomial x 

308 

309 :return: 

310 """ 

311 # smile(icol, iwvl) = sum_(p=0)^(nsmile_coef-1) smile_coef[iwvl, p] * icol**p (1) 

312 return np.inner( 

313 np.array([[icol ** p for p in np.arange(self.nsmile_coef)] for icol in np.arange(self.ncols)]), 

314 self.smile_coef # shape = (nwvl, nsmile_coef) 

315 ) # shape = (ncols, nwvl) 

316 

317 def calc_snr_from_radiance(self, rad_data: Union[GeoArray, np.ndarray], dir_snr_models: str): 

318 """Compute EnMAP SNR from radiance data for the given detector. 

319 

320 SNR equation: SNR = p0 + p1 * LTOA + p2 * LTOA ^ 2 [W/(m^2 sr nm)]. 

321 

322 NOTE: The SNR model files (SNR_D1.bsq/SNR_D2.bsq) contain polynomial coefficients needed to compute SNR. 

323 

324 SNR_D1.bsq: SNR model for EnMAP VNIR detector (contains high gain and low gain model coefficients) 

325 

326 - 1000 columns (for 1000 EnMAP columns) 

327 - 88 bands for 88 EnMAP VNIR bands 

328 - 7 lines: - 3 lines: high gain coefficients 

329 - 3 lines: low gain coefficients 

330 - 1 line: threshold needed to decide about high gain or low gain 

331 

332 SNR_D2.bsq: SNR model for EnMAP SWIR detector 

333 

334 - 1000 columns (for 1000 EnMAP columns) 

335 - x bands for x EnMAP SWIR bands 

336 - 3 lines for 3 coefficients 

337 

338 :param rad_data: image radiance data of EnMAP_Detector_SensorGeo 

339 :param dir_snr_models: root directory where SNR model data is stored (must contain SNR_D1.bsq/SNR_D2.bsq) 

340 """ 

341 rad_data = np.array(rad_data) 

342 assert self.unitcode == 'TOARad' 

343 self.logger.info(f"Computing SNR from {self.detector_name} TOA radiance.") 

344 

345 if self.detector_name == 'VNIR': 

346 gA = self._get_snr_model(dir_snr_models) 

347 

348 coeffs_highgain = gA[0:3, :, :] # [3 x ncols x nbands] 

349 coeffs_lowgain = gA[3:6, :, :] # [3 x ncols x nbands] 

350 gain_threshold = np.squeeze(gA[6, :, :]) 

351 

352 self.snr = np.zeros(rad_data.shape) 

353 for irow in range(self.nrows): 

354 highgain_mask = rad_data[irow, :, :] < gain_threshold # a single row 

355 rad_highgain = rad_data[irow, highgain_mask] 

356 self.snr[irow, :, :][highgain_mask] = \ 

357 coeffs_highgain[0, highgain_mask] + \ 

358 coeffs_highgain[1, highgain_mask] * rad_highgain + \ 

359 coeffs_highgain[2, highgain_mask] * rad_highgain ** 2 

360 

361 lowgain_mask = rad_data[irow, :, :] >= gain_threshold 

362 rad_lowgain = rad_data[irow, lowgain_mask] 

363 self.snr[irow, :, :][lowgain_mask] = \ 

364 coeffs_lowgain[0, lowgain_mask] + \ 

365 coeffs_lowgain[1, lowgain_mask] * rad_lowgain + \ 

366 coeffs_lowgain[2, lowgain_mask] * rad_lowgain ** 2 

367 

368 else: 

369 gA = self._get_snr_model(dir_snr_models) 

370 coeffs = gA[:] 

371 self.snr = coeffs[0, :, :] + coeffs[1, :, :] * rad_data[:, :, :] + coeffs[2, :, :] * rad_data[:, :, :] ** 2 

372 

373 def _get_snr_model(self, dir_snr_models: str) -> GeoArray: 

374 """Get the SNR model coefficients for the current detector. 

375 

376 NOTE: Missing bands are linearly interpolated. 

377 

378 :param dir_snr_models: directory containing the SNR models 

379 """ 

380 path_snr_model = os.path.join(dir_snr_models, "SNR_D1.bsq" if self.detector_name == 'VNIR' else "SNR_D2.bsq") 

381 gA = GeoArray(path_snr_model) 

382 

383 if gA.bands == self.nwvl: 

384 return gA 

385 

386 else: 

387 from scipy.interpolate import make_interp_spline 

388 # equivalent to legacy scipy.interpolate.interp1d 

389 # interp1d(wvl, gA[:], axis=2, kind='linear', fill_value="extrapolate", bounds_error=False)(self.wvl_center) 

390 coeffs_interp = make_interp_spline(gA.meta.band_meta['wavelength'], gA[:], k=1, axis=2)(self.wvl_center) 

391 gA_ = GeoArray(coeffs_interp) 

392 gA_.meta.band_meta['wavelength'] = self.wvl_center 

393 

394 return gA_ 

395 

396 @staticmethod 

397 def interpolate_corners(ul: float, ur: float, ll: float, lr: float, nx: int, ny: int): 

398 """Compute interpolated field from corner values of a scalar field given at: ul, ur, ll, lr. 

399 

400 :param ul: tbd 

401 :param ur: tbd 

402 :param ll: tbd 

403 :param lr: tbd 

404 :param nx: final shape (x-axis direction) 

405 :param ny: final shape (y-axis direction) 

406 """ 

407 # FIXME this method must later be replaced by the geolayer provided by the ground segment 

408 # => a linear interpolation between the EnMAP corner coordinates is NOT sufficient for modelling the 

409 # geometry of VNIR and SWIR 

410 # - especially at off-nadir acquisitions and with some terrain present, a linear interpolation leads 

411 # to large deviations (> 180 m y-coordinate offset for the EnPT test dataset) 

412 

413 # TODO ensure that lons/lats represent UL coordinates not pixel coordinate centers (as given by Karl / DLR(?)) 

414 

415 corner_coords = np.array([[ul, ur], 

416 [ll, lr]]) 

417 rowpos, colpos = [0, 1], [0, 1] 

418 

419 from scipy.interpolate import RegularGridInterpolator 

420 rgi = RegularGridInterpolator([rowpos, colpos], corner_coords, method='linear') 

421 out_rows_grid, out_cols_grid = np.meshgrid(np.linspace(0, 1, ny), 

422 np.linspace(0, 1, nx), 

423 indexing='ij') 

424 

425 coords = rgi(np.dstack([out_rows_grid, out_cols_grid])) 

426 

427 return coords 

428 

429 def compute_geolayer_for_cube(self): 

430 self.logger.info('Computing %s geolayer...' % self.detector_name) 

431 GeolayerGen = \ 

432 RPC_3D_Geolayer_Generator( 

433 rpc_coeffs_per_band=self.rpc_coeffs, 

434 elevation=self.cfg.path_dem if self.cfg.path_dem else self.cfg.average_elevation, 

435 enmapIm_cornerCoords=tuple(zip(self.lon_UL_UR_LL_LR, self.lat_UL_UR_LL_LR)), 

436 enmapIm_dims_sensorgeo=(self.nrows, self.ncols), 

437 CPUs=self.cfg.CPUs 

438 ) 

439 lons, lats = GeolayerGen.compute_geolayer() 

440 

441 self.geolayer_has_keystone = len(GeolayerGen.bandgroups_with_unique_rpc_coeffs) > 1 

442 

443 return lons, lats 

444 

445 def calc_solar_irradiance_CWL_FWHM_per_band(self) -> np.array: 

446 from ...io.reader import read_solar_irradiance 

447 

448 self.logger.debug('Calculating solar irradiance...') 

449 

450 sol_irr = read_solar_irradiance(path_solar_irr_model=self.cfg.path_solar_irr, wvl_min_nm=350, wvl_max_nm=2500) 

451 

452 irr_bands = [] 

453 for band in self.srf.bands: 

454 WVL_band = self.srf.srfs_wvl if self.srf.wvl_unit == 'nanometers' else self.srf.srfs_wvl * 1000 

455 RSP_band = self.srf.srfs_norm01[band] 

456 sol_irr_at_WVL = np.interp(WVL_band, sol_irr[:, 0], sol_irr[:, 1], left=0, right=0) 

457 

458 irr_bands.append(np.round(np.sum(sol_irr_at_WVL * RSP_band) / np.sum(RSP_band), 2)) 

459 

460 return np.array(irr_bands) 

461 

462 

463class EnMAP_Metadata_L1B_SensorGeo(object): 

464 """EnMAP Metadata class for the metadata of the complete EnMAP L1B product in sensor geometry incl. VNIR and SWIR. 

465 

466 Attributes: 

467 - logger(logging.Logger): None or logging instance 

468 - observation_datetime(datetime.datetime): datetime of observation time 

469 - geom_view_zenith: viewing zenith angle 

470 - geom_view_azimuth: viewing azimuth angle 

471 - geom_sun_zenith: sun zenith angle 

472 - geom_sun_azimuth: sun azimuth angle 

473 - mu_sun: needed by SICOR for TOARad > TOARef conversion 

474 - vnir(EnMAP_Metadata_VNIR_SensorGeo) 

475 - swir(EnMAP_Metadata_SWIR_SensorGeo) 

476 - detector_attrNames: attribute names of the detector objects 

477 """ 

478 

479 def __init__(self, path_metaxml, config: EnPTConfig, logger=None): 

480 """Get a metadata object instance for the given EnMAP L1B product in sensor geometry. 

481 

482 :param path_metaxml: file path of the EnMAP L1B metadata XML file 

483 :para, config: EnPT configuration object 

484 :param logger: instance of logging.logger or subclassed 

485 """ 

486 self.cfg = config 

487 self.logger = logger or logging.getLogger() 

488 self.path_xml = path_metaxml 

489 self.rootdir = os.path.dirname(path_metaxml) 

490 

491 # defaults - Common 

492 self.proc_level: Optional[str] = None # Dataset processing level 

493 self.version_provider: Optional[str] = '' # version of ground segment processing system 

494 self.observation_datetime: Optional[datetime] = None # Date and Time of image observation 

495 self.geom_view_zenith: Optional[float] = None # viewing zenith angle 

496 self.geom_view_azimuth: Optional[float] = None # viewing azimuth angle 

497 self.geom_sun_zenith: Optional[float] = None # sun zenith angle 

498 self.geom_sun_azimuth: Optional[float] = None # sun azimuth angle 

499 self.mu_sun: Optional[float] = None # needed by SICOR for TOARad > TOARef conversion 

500 self.earthSunDist: Optional[float] = None # earth-sun distance 

501 self.aot: Optional[float] = None # scene aerosol optical thickness 

502 self.water_vapour: Optional[float] = None # scene water vapour [cm] 

503 self.vnir: Optional[EnMAP_Metadata_L1B_Detector_SensorGeo] = None # metadata of VNIR only 

504 self.swir: Optional[EnMAP_Metadata_L1B_Detector_SensorGeo] = None # metadata of SWIR only 

505 self.detector_attrNames: list = ['vnir', 'swir'] # attribute names of the detector objects 

506 self.filename_metaxml: Optional[str] = None # filename of XML metadata file 

507 

508 self._scene_basename: Optional[str] = None # basename of the EnMAP image 

509 

510 @property 

511 def scene_basename(self): 

512 if self.vnir: 

513 self._scene_basename = self.vnir.scene_basename 

514 

515 return self._scene_basename 

516 

517 def read_common_meta(self, path_xml): 

518 """Read the common metadata, principally stored in General Info. 

519 

520 - the acquisition time 

521 - the geometrical observation and illumination 

522 

523 :param path_xml: path to the main xml file 

524 :return: None 

525 """ 

526 # load the metadata xml file 

527 xml = ElementTree.parse(path_xml).getroot() 

528 

529 self.filename_metaxml = os.path.basename(path_xml) 

530 

531 if not self.cfg.is_dummy_dataformat: 

532 # read processing level 

533 self.proc_level = xml.find("base/level").text 

534 if self.proc_level != 'L1B': 

535 raise RuntimeError(self.proc_level, "Unexpected input data processing level. Expected 'L1B'.") 

536 

537 # read version of ground segment processing system 

538 self.version_provider = xml.find("base/revision").text 

539 

540 # raise a warning in case of old processing version (de-striping was implemented in version 01.02.00) 

541 if parse_version(self.version_provider) < parse_version('01.02.00'): 

542 self.logger.warning( 

543 f"The input EnMAP Level-1B image was processed with an old version of the ground segment " 

544 f"processing system (version {self.version_provider}), which, e.g. did not include de-striping. " 

545 f"It is highly recommended to re-download the dataset in the latest processing version from the " 

546 f"archive via the EOWEB GeoPortal (www.eoweb.dlr.de) before passing it to EnPT.") 

547 

548 # read the acquisition time 

549 self.observation_datetime = \ 

550 datetime.strptime(xml.find("base/temporalCoverage/startTime").text, '%Y-%m-%dT%H:%M:%S.%fZ') 

551 

552 # get the distance earth sun from the acquisition date 

553 self.earthSunDist = self.get_earth_sun_distance(self.observation_datetime) 

554 

555 # read Geometry (observation/illumination) angle 

556 # NOTE: EnMAP metadata provide also the angles for the image corners 

557 # -> would allow even more precise computation (e.g., specific/sunElevationAngle/upper_left) 

558 # NOTE: alongOffNadirAngle is always near 0 and therefore ignored here (not relevant for AC) 

559 # FIXME VZA may be negative in DLR L1B data -> correct to always use the absolute value for SICOR? 

560 self.geom_view_zenith = np.abs(float(xml.find("specific/acrossOffNadirAngle/center").text)) 

561 # FIXME correct to directly use sceneAzimuthAngle (14.24 (DLR) vs. 101.1 (AlpineTest) 

562 self.geom_view_azimuth = float(xml.find("specific/sceneAzimuthAngle/center").text) 

563 self.geom_sun_zenith = 90 - float(xml.find("specific/sunElevationAngle/center").text) 

564 self.geom_sun_azimuth = float(xml.find("specific/sunAzimuthAngle/center").text) 

565 self.mu_sun = np.cos(np.deg2rad(self.geom_sun_zenith)) 

566 self.aot = float(xml.find("specific/qualityFlag/sceneAOT").text) / 1000 # scale factor is 1000 

567 self.water_vapour = float(xml.find("specific/qualityFlag/sceneWV").text) / 1000 # scale factor is 1000 

568 else: 

569 # read the acquisition time 

570 self.observation_datetime = \ 

571 datetime.strptime(xml.findall("GeneralInfo/ProductInfo/ProductStartTime")[0].text, 

572 '%Y-%m-%dT%H:%M:%S.%fZ') 

573 

574 # get the distance earth sun from the acquisition date 

575 self.earthSunDist = self.get_earth_sun_distance(self.observation_datetime) 

576 

577 # read Geometry (observation/illumination) angle 

578 self.geom_view_zenith = float(xml.findall("GeneralInfo/Geometry/Observation/ZenithAngle")[0].text) 

579 self.geom_view_azimuth = float(xml.findall("GeneralInfo/Geometry/Observation/AzimuthAngle")[0].text) 

580 self.geom_sun_zenith = float(xml.findall("GeneralInfo/Geometry/Illumination/ZenithAngle")[0].text) 

581 self.geom_sun_azimuth = float(xml.findall("GeneralInfo/Geometry/Illumination/AzimuthAngle")[0].text) 

582 self.mu_sun = np.cos(np.deg2rad(self.geom_sun_zenith)) 

583 

584 def get_earth_sun_distance(self, acqDate: datetime): 

585 """Get earth-sun distance (requires file of pre-calculated earth sun distance per day). 

586 

587 :param acqDate: 

588 """ 

589 if not os.path.exists(self.cfg.path_earthSunDist): 

590 self.logger.warning("\n\t WARNING: Earth Sun Distance is assumed to be " 

591 "1.0 because no database can be found at %s.""" % self.cfg.path_earthSunDist) 

592 return 1.0 

593 if not acqDate: 

594 self.logger.warning("\n\t WARNING: Earth Sun Distance is assumed to be 1.0 because " 

595 "acquisition date could not be read from metadata.") 

596 return 1.0 

597 

598 with open(self.cfg.path_earthSunDist, "r") as EA_dist_f: 

599 EA_dist_dict = {} 

600 for line in EA_dist_f: 

601 date, EA = [item.strip() for item in line.split(",")] 

602 EA_dist_dict[date] = EA 

603 

604 return float(EA_dist_dict[acqDate.strftime('%Y-%m-%d')]) 

605 

606 def read_metadata(self): 

607 """Read the metadata of the entire EnMAP L1B product in sensor geometry.""" 

608 # first read common metadata 

609 self.read_common_meta(self.path_xml) 

610 

611 # define and read the VNIR metadata 

612 self.vnir = EnMAP_Metadata_L1B_Detector_SensorGeo('VNIR', config=self.cfg, logger=self.logger) 

613 self.vnir.read_metadata(self.path_xml) 

614 

615 # define and read the SWIR metadata 

616 self.swir = EnMAP_Metadata_L1B_Detector_SensorGeo('SWIR', config=self.cfg, logger=self.logger) 

617 self.swir.read_metadata(self.path_xml) 

618 

619 def to_XML(self) -> str: 

620 """Generate an XML metadata string from the L1B metadata.""" 

621 from . import L1B_product_props, L1B_product_props_DLR 

622 xml = ElementTree.parse(self.path_xml).getroot() 

623 

624 if not self.cfg.is_dummy_dataformat: 

625 for detName, detMeta in zip(['VNIR', 'SWIR'], [self.vnir, self.swir]): 

626 lbl = L1B_product_props_DLR['xml_detector_label'][detName] 

627 xml.find("product/image/%s/dimension/rows" % lbl).text = str(detMeta.nrows) 

628 xml.find("product/image/%s/dimension/columns" % lbl).text = str(detMeta.ncols) 

629 xml.find("product/quicklook/%s/dimension/rows" % lbl).text = str(detMeta.nrows) 

630 xml.find("product/quicklook/%s/dimension/columns" % lbl).text = str(detMeta.ncols) 

631 

632 else: 

633 for detName, detMeta in zip(['VNIR', 'SWIR'], [self.vnir, self.swir]): 

634 lbl = L1B_product_props['xml_detector_label'][detName] 

635 xml.find("ProductComponent/%s/Data/Size/NRows" % lbl).text = str(detMeta.nrows) 

636 xml.find("ProductComponent/%s/Data/Type/UnitCode" % lbl).text = detMeta.unitcode 

637 xml.find("ProductComponent/%s/Data/Type/Unit" % lbl).text = detMeta.unit 

638 

639 xml_string = ElementTree.tostring(xml, encoding='unicode', pretty_print=True) 

640 

641 return xml_string