Coverage for enpt/model/images/images_sensorgeo.py: 93%
334 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-07 11:39 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-07 11:39 +0000
1# -*- coding: utf-8 -*-
3# EnPT, EnMAP Processing Tool - A Python package for pre-processing of EnMAP Level-1B data
4#
5# Copyright (C) 2018-2024 Karl Segl (GFZ Potsdam, segl@gfz-potsdam.de), Daniel Scheffler
6# (GFZ Potsdam, danschef@gfz-potsdam.de), Niklas Bohn (GFZ Potsdam, nbohn@gfz-potsdam.de),
7# Stéphane Guillaso (GFZ Potsdam, stephane.guillaso@gfz-potsdam.de)
8#
9# This software was developed within the context of the EnMAP project supported
10# by the DLR Space Administration with funds of the German Federal Ministry of
11# Economic Affairs and Energy (on the basis of a decision by the German Bundestag:
12# 50 EE 1529) and contributions from DLR, GFZ and OHB System AG.
13#
14# This program is free software: you can redistribute it and/or modify it under
15# the terms of the GNU General Public License as published by the Free Software
16# Foundation, either version 3 of the License, or (at your option) any later
17# version. Please note the following exception: `EnPT` depends on tqdm, which
18# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
19# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
20# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
21#
22# This program is distributed in the hope that it will be useful, but WITHOUT
23# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
24# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
25# details.
26#
27# You should have received a copy of the GNU Lesser General Public License along
28# with this program. If not, see <https://www.gnu.org/licenses/>.
30"""EnPT EnMAP objects in sensor geometry."""
32import logging
33from types import SimpleNamespace
34from typing import Tuple, Optional, List # noqa: F401
35from tempfile import TemporaryDirectory
36from zipfile import ZipFile
37import numpy as np
38from os import path, makedirs
39from glob import glob
40import utm
41from scipy.interpolate import RegularGridInterpolator
42from geoarray import GeoArray
43from py_tools_ds.geo.vector.topology import get_footprint_polygon
45from ...utils.logging import EnPT_Logger
46from .image_baseclasses import _EnMAP_Image
47from ...model.metadata import EnMAP_Metadata_L1B_SensorGeo, EnMAP_Metadata_L1B_Detector_SensorGeo
48from ...model.metadata import EnMAP_Metadata_L2A_MapGeo # noqa: F401 # only used for type hint
49from ...options.config import EnPTConfig
50from ...processors.dead_pixel_correction import Dead_Pixel_Corrector
51from ...processors.dem_preprocessing import DEM_Processor
52from ...processors.spatial_transform import compute_mapCoords_within_sensorGeoDims
54__author__ = ['Daniel Scheffler', 'Stéphane Guillaso', 'André Hollstein']
57class EnMAP_Detector_SensorGeo(_EnMAP_Image):
58 """Class representing a single detector of an EnMAP image (in sensor geometry).
60 NOTE:
61 - Inherits all attributes from _EnMAP_Image class.
62 - All functionality that VNIR and SWIR detectors (sensor geometry) have in common is to be implemented here.
64 Attributes:
65 - to be listed here. Check help(_EnMAP_Detector_SensorGeo) in the meanwhile!
67 """
69 def __init__(self, detector_name: str, root_dir: str, config: EnPTConfig, logger=None, meta=None) -> None:
70 """Get an instance of _EnMAP_Detector_SensorGeo.
72 :param detector_name: 'VNIR' or 'SWIR'
73 :param root_dir:
74 :param logger:
75 :param meta: import meta if already loaded
76 """
77 if detector_name not in ['VNIR', 'SWIR']:
78 raise ValueError("'detector_name' must be 'VNIR' or 'SWIR'. Received %s!" % detector_name)
80 self.cfg = config
81 self._root_dir = root_dir
82 self.detector_name = detector_name
83 self.logger = logger or logging.getLogger()
85 # get all attributes of base class "_EnMAP_Image"
86 super(EnMAP_Detector_SensorGeo, self).__init__()
88 # instance an empty metadata object
89 if meta is None:
90 self.detector_meta: EnMAP_Metadata_L1B_Detector_SensorGeo = \
91 EnMAP_Metadata_L1B_Detector_SensorGeo(self.detector_name, config=self.cfg, logger=self.logger)
92 else:
93 self.detector_meta: EnMAP_Metadata_L1B_Detector_SensorGeo = meta
95 def get_paths(self) -> SimpleNamespace:
96 """Get all file paths associated with the current instance of EnMAP_Detector_SensorGeo.
98 NOTE: This information is read from the detector_meta.
100 :return: paths
101 """
102 self.paths.root_dir = self._root_dir
103 self.paths.data = path.join(self._root_dir, self.detector_meta.filename_data)
105 def path_or_None(filename):
106 return path.join(self._root_dir, filename) if filename else None
108 self.paths.mask_landwater = path_or_None(self.detector_meta.filename_mask_landwater)
109 self.paths.mask_clouds = path_or_None(self.detector_meta.filename_mask_clouds)
110 self.paths.mask_cloudshadow = path_or_None(self.detector_meta.filename_mask_cloudshadow)
111 self.paths.mask_haze = path_or_None(self.detector_meta.filename_mask_haze)
112 self.paths.mask_snow = path_or_None(self.detector_meta.filename_mask_snow)
113 self.paths.mask_cirrus = path_or_None(self.detector_meta.filename_mask_cirrus)
114 self.paths.deadpixelmap = path_or_None(self.detector_meta.filename_deadpixelmap)
115 self.paths.quicklook = path_or_None(self.detector_meta.filename_quicklook)
117 return self.paths
119 def correct_dead_pixels(self):
120 """Correct dead pixels with respect to the dead pixel mask."""
121 algo = self.cfg.deadpix_P_algorithm
122 method_spectral, method_spatial = self.cfg.deadpix_P_interp_spectral, self.cfg.deadpix_P_interp_spatial
123 self.logger.info("Correcting dead pixels of %s detector...\n"
124 "Used algorithm: %s interpolation in the %s domain"
125 % (self.detector_name, method_spectral, algo if algo == 'spectral' else method_spatial))
127 self.data = \
128 Dead_Pixel_Corrector(algorithm=algo,
129 interp_spectral=method_spectral,
130 interp_spatial=method_spatial,
131 logger=self.logger)\
132 .correct(self.data, self.deadpixelmap)
134 def get_preprocessed_dem(self) -> GeoArray:
135 """Get a digital elevation model in EnMAP sensor geometry of the current detector."""
136 if self.cfg.path_dem:
137 self.logger.info('Pre-processing DEM for %s...' % self.detector_name)
138 DP = DEM_Processor(self.cfg.path_dem, enmapIm_cornerCoords=tuple(zip(self.detector_meta.lon_UL_UR_LL_LR,
139 self.detector_meta.lat_UL_UR_LL_LR)),
140 CPUs=self.cfg.CPUs)
141 DP.fill_gaps() # FIXME this will also be needed at other places
143 R, C = self.data.shape[:2]
144 if DP.dem.is_map_geo:
145 lons = self.detector_meta.lons
146 lats = self.detector_meta.lats
148 if not (lons.ndim == 2 and lats.ndim == 2) and not (lons.ndim == 3 and lats.ndim == 3):
149 raise ValueError((lons.ndim, lats.ndim), 'Geolayer must be either 2- or 3-dimensional.')
151 msg_bandinfo = ''
152 if lons.ndim == 3:
153 # 3D geolayer (the usual case for EnMAP data provided by DLR)
154 lons = lons[:, :, 0]
155 lats = lats[:, :, 0]
156 msg_bandinfo = ' (using first band of %s geolayer)' % self.detector_name
157 else:
158 # 2D geolayer (GFZ-internal test case)
159 # FIXME replace linear interpolation by native geolayers
160 if lons.shape != self.data.shape:
161 lons = self.detector_meta.interpolate_corners(*self.detector_meta.lon_UL_UR_LL_LR, nx=C, ny=R)
162 if lats.shape != self.data.shape:
163 lats = self.detector_meta.interpolate_corners(*self.detector_meta.lat_UL_UR_LL_LR, nx=C, ny=R)
165 self.logger.info(('Transforming DEM to %s sensor geometry%s...' % (self.detector_name, msg_bandinfo)))
166 self.dem = DP.to_sensor_geometry(lons=lons, lats=lats)
167 else:
168 self.dem = DP.dem
170 else:
171 self.logger.info('No DEM for the %s detector provided. Falling back to an average elevation of %d meters.'
172 % (self.detector_name, self.cfg.average_elevation))
173 self.dem = GeoArray(np.full(self.data.shape[:2], self.cfg.average_elevation).astype(np.int16))
175 return self.dem
177 def append_new_image(self, img2: 'EnMAP_Detector_SensorGeo', n_lines: int = None) -> None:
178 # TODO convert method to function?
179 """Check if a second image matches with the first image and if so, append the given number of lines below.
181 In this version we assume that the image will be added below. If it is the case, the method will create
182 temporary files that will be used in the following.
184 :param img2:
185 :param n_lines: number of lines to be added from the new image
186 :return: None
187 """
188 if not self.cfg.is_dummy_dataformat:
189 basename_img1 = self.detector_meta.filename_data.split('-SPECTRAL_IMAGE')[0] + '::%s' % self.detector_name
190 basename_img2 = img2.detector_meta.filename_data.split('-SPECTRAL_IMAGE')[0] + '::%s' % img2.detector_name
191 else:
192 basename_img1 = path.basename(self._root_dir)
193 basename_img2 = path.basename(img2._root_dir)
195 self.logger.info("Check new image for %s: %s " % (self.detector_name, basename_img2))
197 distance_min = 27.0
198 distance_max = 34.0
200 # compute distance between image1 LL and image2 UL
201 x1, y1, _, _ = utm.from_latlon(self.detector_meta.lat_UL_UR_LL_LR[2], self.detector_meta.lon_UL_UR_LL_LR[2])
202 x2, y2, _, _ = utm.from_latlon(img2.detector_meta.lat_UL_UR_LL_LR[0], img2.detector_meta.lon_UL_UR_LL_LR[0])
203 distance_left = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
205 # compute distance between image1 LR and image2 UR
206 x1, y1, _, _ = utm.from_latlon(self.detector_meta.lat_UL_UR_LL_LR[3], self.detector_meta.lon_UL_UR_LL_LR[3])
207 x2, y2, _, _ = utm.from_latlon(img2.detector_meta.lat_UL_UR_LL_LR[1], img2.detector_meta.lon_UL_UR_LL_LR[1])
208 distance_right = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
210 if distance_min < distance_left < distance_max and distance_min < distance_right < distance_max:
211 self.logger.info("Append new image to %s: %s" % (self.detector_name, basename_img2))
212 else:
213 self.logger.warning("%s and %s don't fit to be appended." % (basename_img1, basename_img2))
214 return
216 # set new number of line
217 n_lines = n_lines or img2.detector_meta.nrows
219 if n_lines > img2.detector_meta.nrows:
220 self.logger.warning("n_lines (%s) exceeds the total number of line of second image" % n_lines)
221 self.logger.warning("Set to the image number of line")
222 n_lines = img2.detector_meta.nrows
224 if n_lines < 50: # TODO: determine these values
225 self.logger.warning("A minimum of 50 lines is required, only %s were selected" % n_lines)
226 self.logger.warning("Set the number of line to 50")
227 n_lines = 50
229 self.detector_meta.nrows += n_lines
231 # Compute new lower coordinates
232 if not self.cfg.is_dummy_dataformat:
233 img2_cornerCoords = tuple(zip(img2.detector_meta.lon_UL_UR_LL_LR,
234 img2.detector_meta.lat_UL_UR_LL_LR))
235 elevation = DEM_Processor(img2.cfg.path_dem,
236 enmapIm_cornerCoords=img2_cornerCoords).dem \
237 if img2.cfg.path_dem else self.cfg.average_elevation
239 LL, LR = compute_mapCoords_within_sensorGeoDims(
240 sensorgeoCoords_YX=[(n_lines - 1, 0), # LL
241 (n_lines - 1, img2.detector_meta.ncols - 1)], # LR
242 rpc_coeffs=list(img2.detector_meta.rpc_coeffs.values())[0], # RPC coeffs of first band of the detector
243 elevation=elevation,
244 enmapIm_cornerCoords=img2_cornerCoords,
245 enmapIm_dims_sensorgeo=(img2.detector_meta.nrows, img2.detector_meta.ncols)
246 )
248 self.detector_meta.lon_UL_UR_LL_LR[2], self.detector_meta.lat_UL_UR_LL_LR[2] = LL
249 self.detector_meta.lon_UL_UR_LL_LR[3], self.detector_meta.lat_UL_UR_LL_LR[3] = LR
250 else:
251 cols_lowres, rows_lowres = [0, 1], [0, 1]
252 lats_lowres = [[img2.detector_meta.lat_UL_UR_LL_LR[0], img2.detector_meta.lat_UL_UR_LL_LR[2]],
253 [img2.detector_meta.lat_UL_UR_LL_LR[1], img2.detector_meta.lat_UL_UR_LL_LR[3]]] # [x, y]
254 lons_lowres = [[img2.detector_meta.lon_UL_UR_LL_LR[0], img2.detector_meta.lon_UL_UR_LL_LR[2]],
255 [img2.detector_meta.lon_UL_UR_LL_LR[1], img2.detector_meta.lon_UL_UR_LL_LR[3]]] # [x, y]
256 RGI_lats = RegularGridInterpolator(points=[cols_lowres, rows_lowres], values=lats_lowres, method='linear')
257 RGI_lons = RegularGridInterpolator(points=[cols_lowres, rows_lowres], values=lons_lowres, method='linear')
258 cols_full = np.array([[0, 1]])
259 rows_full = np.array([[int(n_lines / img2.detector_meta.nrows)]])
260 lats_LL_LR = RGI_lats(np.dstack(np.meshgrid(cols_full, rows_full))).flatten()
261 lons_LL_LR = RGI_lons(np.dstack(np.meshgrid(cols_full, rows_full))).flatten()
263 self.detector_meta.lat_UL_UR_LL_LR[2] = float(lats_LL_LR[0])
264 self.detector_meta.lat_UL_UR_LL_LR[3] = float(lats_LL_LR[1])
265 self.detector_meta.lon_UL_UR_LL_LR[2] = float(lons_LL_LR[0])
266 self.detector_meta.lon_UL_UR_LL_LR[3] = float(lons_LL_LR[1])
268 self.detector_meta.lats = (
269 self.detector_meta.interpolate_corners(
270 *self.detector_meta.lat_UL_UR_LL_LR,
271 self.detector_meta.ncols,
272 self.detector_meta.nrows)
273 )
274 self.detector_meta.lons = (
275 self.detector_meta.interpolate_corners(
276 *self.detector_meta.lon_UL_UR_LL_LR,
277 self.detector_meta.ncols,
278 self.detector_meta.nrows)
279 )
281 # update map polygon
282 self.detector_meta.ll_mapPoly = \
283 get_footprint_polygon(tuple(zip(self.detector_meta.lon_UL_UR_LL_LR,
284 self.detector_meta.lat_UL_UR_LL_LR)), fix_invalid=True)
286 # append the raster data
287 self.data = np.append(self.data[:], img2.data[0:n_lines, :, :], axis=0)
289 # only append masks for the VNIR as they are only provided in VNIR sensor geometry
290 if self.detector_name == 'VNIR':
291 for attrName in ['mask_landwater', 'mask_clouds', 'mask_cloudshadow',
292 'mask_haze', 'mask_snow', 'mask_cirrus']:
293 arr_img1 = getattr(self, attrName)
294 arr_img2 = getattr(img2, attrName)
296 if arr_img1 is not None and arr_img2 is not None:
297 setattr(self, attrName, np.append(arr_img1, arr_img2[0:n_lines, :], axis=0))
298 else:
299 # this mainly applies to the dummy data format that does not have all mask files
300 self.logger.warning("Could not append the '%s' attribute "
301 "as it does not exist in the current image." % attrName)
303 if not self.cfg.is_dummy_dataformat:
304 self.deadpixelmap = np.append(self.deadpixelmap[:], img2.deadpixelmap[0:n_lines, :], axis=0)
306 # NOTE: We leave the quicklook out here because merging the quicklook of adjacent scenes might cause a
307 # brightness jump that can be avoided by recomputing the quicklook after DN/radiance conversion.
309 def DN2TOARadiance(self):
310 """Convert DNs to TOA radiance.
312 Convert the radiometric unit of _EnMAP_Detector_SensorGeo.data from digital numbers to top-of-atmosphere
313 radiance.
315 :return: None
316 """
317 # TODO move to processors.radiometric_transform?
318 if self.detector_meta.unitcode == 'DN':
319 self.logger.info('Converting DN values to radiance [mW/m^2/sr/nm] for %s detector...' % self.detector_name)
321 if self.detector_meta.l_min is not None and self.detector_meta.l_max is not None:
322 # Lλ = (LMINλ + ((LMAXλ - LMINλ)/(QCALMAX-QCALMIN)) * (QCAL-QCALMIN))
323 # FIXME this asserts LMIN and LMAX in mW m^-2 sr^-1 nm^-1
325 QCALMIN = 1
326 QCALMAX = 2 ** 16 # 65535 (16-bit maximum value)
327 LMIN = self.detector_meta.l_min
328 LMAX = self.detector_meta.l_max
329 QCAL = self.data[:]
331 radiance = ((LMAX - LMIN)/(QCALMAX - QCALMIN)) * (QCAL - QCALMIN) + LMIN
333 elif self.detector_meta.gains is not None and self.detector_meta.offsets is not None:
334 # Lλ = QCAL * GAIN + OFFSET
335 # NOTE: - DLR provides gains between 2000 and 10000, so we have to DEVIDE by gains
336 # - DLR gains / offsets are provided in W/m2/sr/nm, so we have to multiply by 1000 to get
337 # mW/m2/sr/nm as needed later
338 radiance = 1000 * (self.data[:] * self.detector_meta.gains + self.detector_meta.offsets)
340 else:
341 raise ValueError("Neighter 'l_min'/'l_max' nor 'gains'/'offsets' "
342 "are available for radiance computation.")
344 self.data = radiance.astype(np.float32)
345 self.detector_meta.unit = "mW m^-2 sr^-1 nm^-1"
346 self.detector_meta.unitcode = "TOARad"
347 else:
348 self.logger.warning(
349 "No DN to Radiance conversion is performed because unitcode is not DN (found: {code}).".format(
350 code=self.detector_meta.unitcode)
351 )
353 def save_raster_attributes(self, attrNames: List[str], outputDir: str):
354 """Save the specified raster attributes to the given output directory.
356 :param attrNames: list of attribute names to be saved
357 :param outputDir: output directory
358 """
359 for attrName in attrNames:
360 attr: GeoArray = getattr(self, attrName)
361 fN = getattr(self.detector_meta, 'filename_%s' % attrName)
363 if attr is not None:
364 attr.save(path.join(outputDir, fN), fmt="GTiff")
365 else:
366 self.logger.warning("Could not save the %s attribute '%s' as it does not exist in the current image."
367 % (self.detector_name, attrName))
369 def _transform_raster_geometry_from_other_detector(self,
370 array: np.ndarray,
371 src_lons: np.ndarray,
372 src_lats: np.ndarray,
373 src_epsg: int,
374 resamp_alg: str = 'nearest',
375 respect_keystone: bool = False,
376 src_nodata: int = None,
377 tgt_nodata: int = None
378 ) -> np.ndarray:
379 """Transform the given input raster from VNIR to SWIR sensor geometry or vice-versa.
381 NOTE:
383 - The transformation target is always the EnMAP_Detector_SensorGeo instance sensor geometry
384 (e.g., VNIR sensorgeo if self.detector_name == 'VNIR').
385 - In case a 3D array is given and the array has the exact dimensions of the source detector,
386 the full geolayer is only used if 'respect_keystone' is set to True. This saves computation time
387 for input arrays where the keystone uncertainty does not matter.
390 :param array: input array to be transformed (2- or 3-dimensional)
391 :param src_lons: geolayer longitudes corresponding to the input array
392 (same nbands like the source detector)
393 :param src_lats: geolayer latitudes corresponding to the input array
394 (same nbands like the source detector)
395 :param src_epsg: projection EPSG code of the source array
396 :param resamp_alg: resampling algorithm ('nearest', 'near', 'bilinear', 'cubic', 'cubic_spline',
397 'lanczos', 'average', 'mode', 'max', 'min', 'med', 'q1', 'q3')
398 :param respect_keystone: whether to use the full geoarray (all bands) in case a 3D array
399 in the dimension of the source detector is passed (default: False)
400 :param src_nodata: nodata value of the input array
401 :param tgt_nodata: pixel value to be used for undetermined pixels in the output
402 :return:
403 """
404 detN = self.detector_name
405 if self.detector_meta.lons is None or self.detector_meta.lats is None:
406 raise RuntimeError(f"The {detN} geolayer must be computed first "
407 f"to transform arrays from {'SWIR' if detN == 'VNIR' else 'VNIR'} "
408 f"to {detN} sensor geometry.")
410 if src_lons.shape != src_lats.shape:
411 raise ValueError("'src_lons' must have the same shape as 'src_lats'.")
413 vnir_lons = self.detector_meta.lons if detN == 'VNIR' else src_lons
414 vnir_lats = self.detector_meta.lats if detN == 'VNIR' else src_lats
415 swir_lons = self.detector_meta.lons if detN == 'SWIR' else src_lons
416 swir_lats = self.detector_meta.lats if detN == 'SWIR' else src_lats
417 prj_vnir = self.detector_meta.epsg_ortho if detN == 'VNIR' else src_epsg
418 prj_swir = self.detector_meta.epsg_ortho if detN == 'SWIR' else src_epsg
420 # use first geolayer band if the input array has only one band
421 if array.ndim == 2 or \
422 array.shape[2] != src_lons.shape[2] or \
423 not respect_keystone:
424 vnir_lons = vnir_lons[:, :, 0]
425 vnir_lats = vnir_lats[:, :, 0]
426 swir_lons = swir_lons[:, :, 0]
427 swir_lats = swir_lats[:, :, 0]
429 from ...processors.spatial_transform import VNIR_SWIR_SensorGeometryTransformer
430 VS_SGT = VNIR_SWIR_SensorGeometryTransformer(lons_vnir=vnir_lons,
431 lats_vnir=vnir_lats,
432 lons_swir=swir_lons,
433 lats_swir=swir_lats,
434 prj_vnir=prj_vnir,
435 prj_swir=prj_swir,
436 res_vnir=(30, 30),
437 res_swir=(30, 30),
438 resamp_alg=resamp_alg,
439 src_nodata=src_nodata,
440 tgt_nodata=tgt_nodata,
441 nprocs=self.cfg.CPUs
442 )
443 if detN == 'VNIR':
444 return VS_SGT.transform_sensorgeo_SWIR_to_VNIR(array)
445 else:
446 return VS_SGT.transform_sensorgeo_VNIR_to_SWIR(array)
449class EnMAP_VNIR_SensorGeo(EnMAP_Detector_SensorGeo):
450 def __init__(self, root_dir: str, config: EnPTConfig, logger=None, meta=None) -> None:
451 super().__init__(detector_name='VNIR', root_dir=root_dir, config=config, logger=logger, meta=meta)
453 def read_masks(self):
454 """Read the L1B masks."""
455 self.logger.info('Reading image masks in VNIR sensor geometry.')
457 # water mask (0=backgr.; 1=land; 2=water)
458 if self.paths.mask_landwater:
459 self.mask_landwater = GeoArray(self.paths.mask_landwater)[:]
461 # cloud mask (0=none; 1=cloud)
462 if self.paths.mask_clouds:
463 self.mask_clouds = GeoArray(self.paths.mask_clouds)[:] == 1
465 # cloud shadow mask (0=none; 1=cloud shadow)
466 if self.paths.mask_cloudshadow:
467 self.mask_cloudshadow = GeoArray(self.paths.mask_cloudshadow)[:] == 1
469 # haze mask (0=none; 1=haze)
470 if self.paths.mask_landwater:
471 self.mask_haze = GeoArray(self.paths.mask_haze)[:] == 1
473 # snow mask (0=none; 1=snow)
474 if self.paths.mask_snow:
475 self.mask_snow = GeoArray(self.paths.mask_snow)[:] == 1
477 # cirrus mask (0=none; 1=thin, 2=medium, 3=thick)
478 if self.paths.mask_cirrus:
479 self.mask_cirrus = GeoArray(self.paths.mask_cirrus)[:]
481 def transform_swir_to_vnir_raster(self,
482 array_swirsensorgeo: np.ndarray,
483 swir_lons: np.ndarray,
484 swir_lats: np.ndarray,
485 swir_epsg: int,
486 resamp_alg: str = 'nearest',
487 respect_keystone: bool = False,
488 src_nodata: int = None,
489 tgt_nodata: int = None
490 ) -> np.ndarray:
491 """Transform the given SWIR sensor-geometry raster array into VNIR sensor geometry.
493 :param array_swirsensorgeo: source array in SWIR sensor geometry to be transformed
494 :param swir_lons: longitude geolayer array of the SWIR
495 :param swir_lats: latitude geolayer array of the SWIR
496 :param swir_epsg: EPSG code of the SWIR when transformed to map geometry
497 :param resamp_alg: resampling algorith ('nearest', 'near', 'bilinear', 'cubic', 'cubic_spline',
498 'lanczos', 'average', 'mode', 'max', 'min', 'med', 'q1', 'q3')
499 :param respect_keystone: whether to use the full geoarray (all bands) in case a 3D array
500 in the dimension of the SWIR detector is passed (default: False)
501 :param src_nodata: nodata value of the input array
502 :param tgt_nodata: pixel value to be used for undetermined pixels in the output
503 """
504 return self._transform_raster_geometry_from_other_detector(
505 array_swirsensorgeo, swir_lons, swir_lats, swir_epsg, resamp_alg, respect_keystone, src_nodata, tgt_nodata)
508class EnMAP_SWIR_SensorGeo(EnMAP_Detector_SensorGeo):
509 def __init__(self, root_dir: str, config: EnPTConfig, logger=None, meta=None) -> None:
510 super().__init__(detector_name='SWIR', root_dir=root_dir, config=config, logger=logger, meta=meta)
512 def __getattribute__(self, item): # called whenever an instance attribute is accessed
513 if item in ['mask_landwater', 'mask_clouds', 'mask_cloudshadow',
514 'mask_haze', 'mask_snow', 'mask_cirrus'] \
515 and getattr(self, '_%s' % item) is None:
516 self.logger.warning('The %s is not yet available in SWIR sensor geometry. '
517 'Use EnMAP_SWIR_SensorGeo.transform_vnir_to_swir_raster() to set it with a '
518 'transformed version of the one provided in VNIR sensor geometry.' % item)
519 return super().__getattribute__(item)
521 def transform_vnir_to_swir_raster(self,
522 array_vnirsensorgeo: np.ndarray,
523 vnir_lons: np.ndarray,
524 vnir_lats: np.ndarray,
525 vnir_epsg: int,
526 resamp_alg: str = 'nearest',
527 respect_keystone: bool = False,
528 src_nodata: int = None,
529 tgt_nodata: int = None
530 ) -> np.ndarray:
531 """Transform the given VNIR sensor-geometry raster array into SWIR sensor geometry.
533 :param array_vnirsensorgeo: source array in VNIR sensor geometry to be transformed
534 :param vnir_lons: longitude geolayer array of the VNIR
535 :param vnir_lats: latitude geolayer array of the VNIR
536 :param vnir_epsg: EPSG code of the VNIR when transformed to map geometry
537 :param resamp_alg: resampling algorith ('nearest', 'near', 'bilinear', 'cubic', 'cubic_spline',
538 'lanczos', 'average', 'mode', 'max', 'min', 'med', 'q1', 'q3')
539 :param respect_keystone: whether to use the full geoarray (all bands) in case a 3D array
540 in the dimension of the VNIR detector is passed (default: False)
541 :param src_nodata: nodata value of the input array
542 :param tgt_nodata: pixel value to be used for undetermined pixels in the output
543 """
544 return self._transform_raster_geometry_from_other_detector(
545 array_vnirsensorgeo, vnir_lons, vnir_lats, vnir_epsg, resamp_alg, respect_keystone, src_nodata, tgt_nodata)
548class EnMAPL1Product_SensorGeo(object):
549 """Class for EnPT EnMAP object in sensor geometry.
551 Attributes:
552 - logger:
553 - logging.Logger instance or subclass instance
554 - vnir
555 - instance of EnMAP_VNIR_SensorGeo class
556 - swir
557 - instance of EnMAP_SWIR_SensorGeo class
558 - paths:
559 - paths belonging to the EnMAP product
560 - meta:
561 - instance of EnMAP_Metadata_SensorGeo class
562 - detector_attrNames:
563 - list of attribute names for VNIR and SWIR detectors,
565 """
567 def __init__(self, root_dir: str, config: EnPTConfig, logger=None):
568 """Get instance of EnPT EnMAP object in sensor geometry.
570 :param root_dir: Root directory of EnMAP Level-1B product
571 :param config: EnPT configuration object
572 :param logger: None or logging instance to be appended to EnMAPL1Product_SensorGeo instance
573 (If None, a default EnPT_Logger is used).
574 """
575 # protected attributes
576 self._logger = None
578 # populate attributes
579 self.cfg = config
580 if logger:
581 self.logger = logger
583 # Read metadata here in order to get all information needed by to get paths.
584 if not self.cfg.is_dummy_dataformat:
585 self.meta = EnMAP_Metadata_L1B_SensorGeo(glob(path.join(root_dir, "*METADATA.XML"))[0],
586 config=self.cfg, logger=self.logger)
587 else:
588 self.meta = EnMAP_Metadata_L1B_SensorGeo(glob(path.join(root_dir, "*_header.xml"))[0],
589 config=self.cfg, logger=self.logger)
590 self.meta.read_metadata()
592 # define VNIR and SWIR detector
593 self.detector_attrNames = ['vnir', 'swir']
594 self.vnir = EnMAP_VNIR_SensorGeo(root_dir, config=self.cfg, logger=self.logger, meta=self.meta.vnir)
595 self.swir = EnMAP_SWIR_SensorGeo(root_dir, config=self.cfg, logger=self.logger, meta=self.meta.swir)
597 # Get the paths according information delivered in the metadata
598 self.paths = self.get_paths()
600 # associate raster attributes with file links (raster data is read lazily / on demand)
601 # or directly read here in case the user does not want to include all L1B bands into the processing
602 self.vnir.data = self.paths.vnir.data
603 self.swir.data = self.paths.swir.data
604 self.vnir.read_masks()
606 if self.cfg.drop_bad_bands:
607 self.vnir.data = self.vnir.data[:, :, self.meta.vnir.goodbands_inds]
608 self.swir.data = self.swir.data[:, :, self.meta.swir.goodbands_inds]
610 try:
611 vnir_dpm = GeoArray(self.paths.vnir.deadpixelmap)
612 swir_dpm = GeoArray(self.paths.swir.deadpixelmap)
614 if self.cfg.drop_bad_bands:
615 if vnir_dpm.ndim == 3:
616 self.vnir.deadpixelmap = vnir_dpm[:, :, self.meta.vnir.goodbands_inds]
617 self.swir.deadpixelmap = swir_dpm[:, :, self.meta.swir.goodbands_inds]
618 else: # 2D
619 self.vnir.deadpixelmap = vnir_dpm[self.meta.vnir.goodbands_inds, :]
620 self.swir.deadpixelmap = swir_dpm[self.meta.swir.goodbands_inds, :]
621 else:
622 self.vnir.deadpixelmap = vnir_dpm
623 self.swir.deadpixelmap = swir_dpm
625 except ValueError:
626 self.logger.warning("Unexpected dimensions of dead pixel mask. Setting all pixels to 'normal'.")
627 self.vnir.deadpixelmap = np.zeros(self.vnir.data.shape)
628 self.swir.deadpixelmap = np.zeros(self.swir.data.shape)
630 # NOTE: We leave the quicklook out here because merging the quicklook of adjacent scenes might cause a
631 # brightness jump that can be avoided by recomputing the quicklook after DN/radiance conversion.
633 # compute radiance
634 self.DN2TOARadiance()
636 def get_paths(self) -> SimpleNamespace:
637 """Get all file paths associated with the current instance of EnMAPL1Product_SensorGeo.
639 :return: paths.SimpleNamespace()
640 """
641 paths = SimpleNamespace()
642 paths.vnir = self.vnir.get_paths()
643 paths.swir = self.swir.get_paths()
644 paths.root_dir = self.meta.rootdir
645 paths.metaxml = self.meta.path_xml
647 return paths
649 @property
650 def logger(self) -> EnPT_Logger:
651 """Get a an instance of enpt.utils.logging.EnPT_Logger.
653 NOTE:
654 - The logging level will be set according to the user inputs of EnPT.
655 - The path of the log file is directly derived from the attributes of the _EnMAP_Image instance.
657 Usage:
658 - get the logger:
659 logger = self.logger
660 - set the logger
661 self.logger = logging.getLogger() # NOTE: only instances of logging.Logger are allowed here
662 - delete the logger:
663 del self.logger # or "self.logger = None"
665 :return: EnPT_Logger instance
666 """
667 if self._logger and self._logger.handlers[:]:
668 return self._logger
669 else:
670 basename = path.splitext(path.basename(self.cfg.path_l1b_enmap_image))[0]
671 path_logfile = path.join(self.cfg.output_dir, basename + '.log') \
672 if self.cfg.create_logfile and self.cfg.output_dir else ''
673 self._logger = EnPT_Logger('log__' + basename, fmt_suffix=None, path_logfile=path_logfile,
674 log_level=self.cfg.log_level, append=False)
676 return self._logger
678 @logger.setter
679 def logger(self, logger: logging.Logger):
680 assert isinstance(logger, logging.Logger) or logger in ['not set', None], \
681 "%s.logger can not be set to %s." % (self.__class__.__name__, logger)
683 # save prior logs
684 # if logger is None and self._logger is not None:
685 # self.log += self.logger.captured_stream
686 self._logger = logger
688 @property
689 def log(self) -> str:
690 """Return a string of all logged messages until now.
692 NOTE: self.log can also be set to a string.
693 """
694 return self.logger.captured_stream
696 @log.setter
697 def log(self, string: str):
698 assert isinstance(string, str), "'log' can only be set to a string. Got %s." % type(string)
699 self.logger.captured_stream = string
701 # @classmethod
702 # def from_L1B_provider_data(cls, path_enmap_image: str, config: EnPTConfig=None) -> EnMAPL1Product_SensorGeo:
703 # """
704 #
705 # :param path_enmap_image:
706 # :param config:
707 # :return:
708 # """
709 # # input validation
710 # from zipfile import is_zipfile
711 # if not path.isdir(path_enmap_image) and \
712 # not (path.exists(path_enmap_image) and is_zipfile(path_enmap_image)):
713 # raise ValueError("The parameter 'path_enmap_image' must be a directory or the path to an existing zip "
714 # "archive.")
715 #
716 # # extract L1B image archive if needed
717 # if is_zipfile(path_enmap_image):
718 # path_enmap_image = self.extract_zip_archive(path_enmap_image)
719 # if not path.isdir(path_enmap_image):
720 # raise NotADirectoryError(path_enmap_image)
721 #
722 # # run the reader
723 # from ..io.reader import L1B_Reader
724 # RD = L1B_Reader(config=config)
725 # L1_obj = RD.read_inputdata(path_enmap_image, observation_time=datetime(2015, 12, 7, 10))
726 #
727 # return L1_obj
729 def DN2TOARadiance(self):
730 """Convert self.vnir.data and self.swir.data from digital numbers to top-of-atmosphere radiance.
732 :return: None
733 """
734 if self.vnir.detector_meta.unitcode != 'TOARad':
735 self.vnir.DN2TOARadiance()
736 self.meta.vnir.unitcode = self.vnir.detector_meta.unitcode # FIXME possible duplicate?
737 self.meta.vnir.unit = self.vnir.detector_meta.unit # FIXME possible duplicate?
738 if self.swir.detector_meta.unitcode != 'TOARad':
739 self.swir.DN2TOARadiance()
740 self.meta.swir.unitcode = self.swir.detector_meta.unitcode
741 self.meta.swir.unit = self.swir.detector_meta.unit
743 def append_new_image(self, root_dir, n_line_ext):
744 """Append a second EnMAP Level-1B product below the last line of the current L1B product.
746 NOTE: We create new files that will be saved into a temporary directory and their path will be stored in the
747 paths of self. We assume that the dead pixel map does not change between two adjacent images.
749 :param root_dir: root directory of EnMAP Level-1B product to be appended
750 :param n_line_ext: number of lines to be added from the new image
751 :return:
752 """
753 l1b_ext_obj = EnMAPL1Product_SensorGeo(root_dir, config=self.cfg)
755 self.vnir.append_new_image(l1b_ext_obj.vnir, n_line_ext)
756 self.swir.append_new_image(l1b_ext_obj.swir, n_line_ext)
758 def calc_snr_from_radiance(self):
759 """Compute EnMAP SNR from radiance data.
761 SNR equation: SNR = p0 + p1 * LTOA + p2 * LTOA ^ 2 [W/(m^2 sr nm)].
762 """
763 with TemporaryDirectory(dir=self.cfg.working_dir) as tmpDir, \
764 ZipFile(self.cfg.path_l1b_snr_model, "r") as zf:
766 zf.extractall(tmpDir)
768 if self.meta.vnir.unitcode == 'TOARad':
769 self.vnir.detector_meta.calc_snr_from_radiance(rad_data=self.vnir.data, dir_snr_models=tmpDir)
771 if self.meta.swir.unitcode == 'TOARad':
772 self.swir.detector_meta.calc_snr_from_radiance(rad_data=self.swir.data, dir_snr_models=tmpDir)
774 def correct_dead_pixels(self):
775 """Correct dead pixels of both detectors."""
776 self.vnir.correct_dead_pixels()
777 self.swir.correct_dead_pixels()
779 # def correct_VNIR_SWIR_shift(self):
780 # # use first geolayer bands for VNIR and SWIR
781 # Vlons, Vlats = self.vnir.detector_meta.lons[:, :, 0], self.vnir.detector_meta.lats[:, :, 0]
782 # Slons, Slats = self.swir.detector_meta.lons[:, :, 0], self.swir.detector_meta.lats[:, :, 0]
783 #
784 # # get corner coordinates of VNIR and SWIR according to geolayer
785 # def get_coords(lons, lats):
786 # return tuple([(lons[Y, X], lats[Y, X]) for Y, X in [(0, 0), (0, -1), (-1, 0), (-1, -1)]])
787 #
788 # VUL, VUR, VLL, VLR = get_coords(Vlons, Vlats)
789 # SUL, SUR, SLL, SLR = get_coords(Slons, Slats)
790 #
791 # # get map coordinates of VNIR/SWIR overlap
792 # ovUL = max(VUL[0], SUL[0]), min(VUL[1], SUL[1])
793 # ovUR = min(VUR[0], SUR[0]), min(VUR[1], SUR[1])
794 # ovLL = max(VLL[0], SLL[0]), max(VLL[1], SLL[1])
795 # ovLR = min(VLR[0], SLR[0]), max(VLR[1], SLR[1])
796 #
797 # # find nearest image positions for VNIR and SWIR to the map coordinates of the VNIR/SWIR overlap
798 # def nearest_imCoord(lons_arr, lats_arr, lon, lat):
799 # dists = np.sqrt((lons_arr - lon) ** 2 + (lats_arr - lat) ** 2)
800 # row, col = np.unravel_index(dists.argmin(), dists.shape)
801 #
802 # return row, col
803 #
804 # overlapImVNIR = tuple([nearest_imCoord(Vlons, Vlats, *ovCoords) for ovCoords in [ovUL, ovUR, ovLL, ovLR]])
805 # overlapImSWIR = tuple([nearest_imCoord(Slons, Slats, *ovCoords) for ovCoords in [ovUL, ovUR, ovLL, ovLR]])
806 #
807 # raise NotImplementedError() # FIXME
808 # self.vnir.data.get_mapPos() # FIXME
810 def get_preprocessed_dem(self):
811 self.vnir.get_preprocessed_dem()
812 self.swir.get_preprocessed_dem()
814 def transform_vnir_to_swir_raster(self,
815 array_vnirsensorgeo: np.ndarray,
816 resamp_alg: str = 'nearest',
817 respect_keystone: bool = False,
818 src_nodata: int = None,
819 tgt_nodata: int = None
820 ) -> np.ndarray:
821 """Transform the given array from VNIR into SWIR sensor geometry.
823 :param array_vnirsensorgeo: raster array in VNIR sensor geometry to be transformed into SWIR sensor geometry
824 :param resamp_alg: resampling algorithm ('nearest', 'near', 'bilinear', 'cubic', 'cubic_spline',
825 'lanczos', 'average', 'mode', 'max', 'min', 'med', 'q1', 'q3')
826 :param respect_keystone: whether to use the full geoarray (all bands) in case a 3D array
827 in the dimension of the VNIR detector is passed (default: False)
828 :param src_nodata: nodata value of the input array
829 :param tgt_nodata: pixel value to be used for undetermined pixels in the output
830 """
831 if self.meta.vnir.lons is None or self.meta.vnir.lats is None or \
832 self.meta.swir.lons is None or self.meta.swir.lats is None:
833 raise RuntimeError('The VNIR/SWIR geolayers must be computed first '
834 'to transform arrays from VNIR to SWIR sensor geometry.')
836 return self.swir.transform_vnir_to_swir_raster(array_vnirsensorgeo=array_vnirsensorgeo,
837 vnir_lons=self.meta.vnir.lons,
838 vnir_lats=self.meta.vnir.lats,
839 vnir_epsg=self.meta.vnir.epsg_ortho,
840 resamp_alg=resamp_alg,
841 respect_keystone=respect_keystone,
842 src_nodata=src_nodata,
843 tgt_nodata=tgt_nodata
844 )
846 def transform_swir_to_vnir_raster(self, array_swirsensorgeo: np.ndarray,
847 resamp_alg: str = 'nearest',
848 respect_keystone: bool = False,
849 src_nodata: int = None,
850 tgt_nodata: int = None
851 ) -> np.ndarray:
852 """Transform the given array from SWIR into VNIR sensor geometry.
854 :param array_swirsensorgeo: raster array in SWIR sensor geometry to be transformed into VNIR sensor geometry
855 :param resamp_alg: resampling algorithm ('nearest', 'near', 'bilinear', 'cubic', 'cubic_spline',
856 'lanczos', 'average', 'mode', 'max', 'min', 'med', 'q1', 'q3')
857 :param respect_keystone: whether to use the full geoarray (all bands) in case a 3D array
858 in the dimension of the VNIR detector is passed (default: False)
859 :param src_nodata: nodata value of the input array
860 :param tgt_nodata: pixel value to be used for undetermined pixels in the output
861 """
862 if self.meta.vnir.lons is None or self.meta.vnir.lats is None or \
863 self.meta.swir.lons is None or self.meta.swir.lats is None:
864 raise RuntimeError('The VNIR/SWIR geolayers must be computed first '
865 'to transform arrays from VNIR to SWIR sensor geometry.')
867 return self.vnir.transform_swir_to_vnir_raster(array_swirsensorgeo=array_swirsensorgeo,
868 swir_lons=self.meta.swir.lons,
869 swir_lats=self.meta.swir.lats,
870 swir_epsg=self.meta.swir.epsg_ortho,
871 resamp_alg=resamp_alg,
872 respect_keystone=respect_keystone,
873 src_nodata=src_nodata,
874 tgt_nodata=tgt_nodata
875 )
877 def set_SWIRattr_with_transformedVNIRattr(self, attrName: str,
878 resamp_alg: str = 'nearest',
879 respect_keystone: bool = False,
880 src_nodata: int = None,
881 tgt_nodata: int = None
882 ) -> None:
883 """Set the specified SWIR raster attribute with a VNIR attribute transformed to SWIR sensor geometry.
885 :param attrName: name of the attribute to be set
886 :param resamp_alg: resampling algorithm ('nearest', 'near', 'bilinear', 'cubic', 'cubic_spline',
887 'lanczos', 'average', 'mode', 'max', 'min', 'med', 'q1', 'q3')
888 :param respect_keystone: whether to use the full geoarray (all bands) in case the attribute
889 to be transformed is 'data' (default: False)
890 :param src_nodata: nodata value of the input array
891 :param tgt_nodata: pixel value to be used for undetermined pixels in the output
892 """
893 self.logger.info("Transforming the '%s' attribute from VNIR to SWIR sensor geometry." % attrName)
895 vnir_rasterAttr = getattr(self.vnir, attrName)
897 if vnir_rasterAttr is None:
898 raise RuntimeError("%s.vnir.%s has not yet been set." % (self.__class__.__name__, attrName))
900 attr_transformed = self.transform_vnir_to_swir_raster(array_vnirsensorgeo=np.array(vnir_rasterAttr),
901 resamp_alg=resamp_alg,
902 respect_keystone=respect_keystone,
903 src_nodata=src_nodata,
904 tgt_nodata=tgt_nodata,
905 )
906 setattr(self.swir, attrName, attr_transformed)
908 def run_AC(self):
909 from ...processors.atmospheric_correction import AtmosphericCorrector
910 AC = AtmosphericCorrector(config=self.cfg)
911 AC.run_ac(self)
913 def save(self, outdir: str, suffix="") -> str:
914 """Save the product to disk using almost the same input format.
916 NOTE: Radiance is saved instead of DNs to avoid a radiometric jump between concatenated images.
918 :param outdir: path to the output directory
919 :param suffix: suffix to be appended to the output filename (???)
920 :return: root path (root directory) where products were written
921 """
922 product_dir = path.join(path.abspath(outdir),
923 "{name}{suffix}".format(name=self.meta.scene_basename, suffix=suffix))
925 self.logger.info("Write product to: %s" % product_dir)
926 makedirs(product_dir, exist_ok=True)
928 # write the VNIR
929 self.vnir.save_raster_attributes(['data', 'mask_landwater', 'mask_clouds', 'mask_cloudshadow',
930 'mask_haze', 'mask_snow', 'mask_cirrus', 'deadpixelmap'],
931 product_dir)
933 # FIXME we could also write the quicklook included in DLR L1B format instead of generating an own one
934 self.vnir.generate_quicklook(bands2use=self.vnir.detector_meta.preview_bands) \
935 .save(path.join(product_dir, path.splitext(self.meta.vnir.filename_quicklook)[0] + '.png'), fmt='PNG')
937 # write the SWIR
938 # NOTE: The masks only exist in VNIR sensor geometry (would have to be transformed before to match for the SWIR)
939 self.swir.save_raster_attributes(['data', 'deadpixelmap'], product_dir)
941 self.swir.generate_quicklook(bands2use=self.swir.detector_meta.preview_bands) \
942 .save(path.join(product_dir, path.splitext(self.meta.swir.filename_quicklook)[0] + '.png'), fmt='PNG')
944 # Update the xml file
945 metadata_string = self.meta.to_XML()
946 with open(product_dir + path.sep + path.basename(self.paths.metaxml), "w") as xml_file:
947 xml_file.write(metadata_string)
949 self.logger.info("L1B product successfully written!")
951 return product_dir