Source code for enpt_enmapboxapp._enpt_alg_base

# -*- coding: utf-8 -*-

# enpt_enmapboxapp, A QGIS EnMAPBox plugin providing a GUI for the EnMAP processing tools (EnPT)
# Copyright (C) 2018-2024 Daniel Scheffler (GFZ Potsdam,
# This software was developed within the context of the EnMAP project supported
# by the DLR Space Administration with funds of the German Federal Ministry of
# Economic Affairs and Energy (on the basis of a decision by the German Bundestag:
# 50 EE 1529) and contributions from DLR, GFZ and OHB System AG.
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <>.

"""This module provides the base class for EnPTAlgorithm and ExternalEnPTAlgorithm."""

import os
from os.path import expanduser
import psutil
from importlib.util import find_spec
from datetime import date
from multiprocessing import cpu_count
from threading import Thread
from queue import Queue
from subprocess import Popen, PIPE
from glob import glob
from warnings import warn

from qgis.core import (

from .version import __version__

[docs] class _EnPTBaseAlgorithm(QgsProcessingAlgorithm): # NOTE: The parameter assignments made here follow the parameter names in enpt/options/ # Input parameters P_json_config = 'json_config' P_CPUs = 'CPUs' P_path_l1b_enmap_image = 'path_l1b_enmap_image' P_path_l1b_enmap_image_gapfill = 'path_l1b_enmap_image_gapfill' P_path_dem = 'path_dem' P_average_elevation = 'average_elevation' P_output_dir = 'output_dir' P_working_dir = 'working_dir' P_n_lines_to_append = 'n_lines_to_append' P_drop_bad_bands = 'drop_bad_bands' P_disable_progress_bars = 'disable_progress_bars' P_output_format = 'output_format' P_output_interleave = 'output_interleave' P_output_nodata_value = 'output_nodata_value' P_path_earthSunDist = 'path_earthSunDist' P_path_solar_irr = 'path_solar_irr' P_scale_factor_toa_ref = 'scale_factor_toa_ref' P_enable_keystone_correction = 'enable_keystone_correction' P_enable_vnir_swir_coreg = 'enable_vnir_swir_coreg' P_path_reference_image = 'path_reference_image' P_enable_ac = 'enable_ac' P_mode_ac = 'mode_ac' P_polymer_additional_results = 'polymer_additional_results' P_polymer_root = 'polymer_root' P_threads = 'threads' P_blocksize = 'blocksize' P_auto_download_ecmwf = 'auto_download_ecmwf' P_scale_factor_boa_ref = 'scale_factor_boa_ref' P_run_smile_P = 'run_smile_P' P_run_deadpix_P = 'run_deadpix_P' P_deadpix_P_algorithm = 'deadpix_P_algorithm' P_deadpix_P_interp_spectral = 'deadpix_P_interp_spectral' P_deadpix_P_interp_spatial = 'deadpix_P_interp_spatial' P_ortho_resampAlg = 'ortho_resampAlg' P_vswir_overlap_algorithm = 'vswir_overlap_algorithm' P_target_projection_type = 'target_projection_type' P_target_epsg = 'target_epsg' # # Output parameters P_OUTPUT_RASTER = 'outraster' # P_OUTPUT_VECTOR = 'outvector' # P_OUTPUT_FILE = 'outfile' P_OUTPUT_FOLDER = 'outfolder'
[docs] def group(self): return 'Pre-Processing'
[docs] def groupId(self): return 'PreProcessing'
[docs] def name(self): return 'EnPTAlgorithm'
[docs] def displayName(self): return f'EnPT - EnMAP Processing Tool (v{__version__})'
[docs] def createInstance(self, *args, **kwargs): return type(self)()
[docs] @staticmethod def _get_default_polymer_root(): if not find_spec('polymer'): return '' elif not find_spec('polymer').origin: # this, e.g., happens when installing POLYMER with 'pip install' instead of 'pip install -e' print('POLYMER package found, but it is not correctly installed.') warn('POLYMER does not seem to be correctly installed. Find the installation instructions here: ' '' 'installation.html#optional-install-polymer-for-advanced-atmospheric-correction-over-water-surfaces') return '' else: return os.path.abspath(os.path.join(os.path.dirname(find_spec('polymer').origin), os.pardir))
[docs] @staticmethod def _get_default_output_dir(): userhomedir = expanduser('~') default_enpt_dir = \ os.path.join(userhomedir, 'Documents', 'EnPT', 'Output') if == 'nt' else\ os.path.join(userhomedir, 'EnPT', 'Output') outdir_nocounter = os.path.join(default_enpt_dir,'%Y%m%d')) counter = 1 while os.path.isdir('%s__%s' % (outdir_nocounter, counter)): counter += 1 return '%s__%s' % (outdir_nocounter, counter)
[docs] def addParameter(self, param, *args, advanced=False, **kwargs): """Add a parameter to the QgsProcessingAlgorithm. This overrides the parent method to make it accept an 'advanced' parameter. :param param: the parameter to be added :param args: arguments to be passed to the parent method :param advanced: whether the parameter should be flagged as 'advanced' :param kwargs: keyword arguments to be passed to the parent method """ if advanced: param.setFlags(param.flags() | QgsProcessingParameterDefinition.FlagAdvanced) super(_EnPTBaseAlgorithm, self).addParameter(param, *args, **kwargs)
[docs] def initAlgorithm(self, configuration=None): self.addParameter( QgsProcessingParameterFile( name=self.P_json_config, description='Configuration JSON template file', behavior=QgsProcessingParameterFile.File, extension='json', defaultValue=None, optional=True)) self.addParameter( QgsProcessingParameterNumber( name=self.P_CPUs, description='Number of CPU cores to be used for processing', type=QgsProcessingParameterNumber.Integer, defaultValue=cpu_count(), minValue=0, maxValue=cpu_count()), advanced=True) self.addParameter( QgsProcessingParameterFile( name=self.P_path_l1b_enmap_image, description='EnMAP Level-1B image (zip-archive or root directory)')) self.addParameter( QgsProcessingParameterFile( name=self.P_path_l1b_enmap_image_gapfill, description='Adjacent EnMAP Level-1B image to be used for gap-filling (zip-archive or root directory)', optional=True), advanced=True) self.addParameter( QgsProcessingParameterRasterLayer( name=self.P_path_dem, description='Input path of digital elevation model in map or sensor geometry; GDAL compatible file ' 'format \n(must cover the EnMAP L1B data completely if given in map geometry or must have ' 'the same \npixel dimensions like the EnMAP L1B data if given in sensor geometry)', optional=True)) self.addParameter( QgsProcessingParameterNumber( name=self.P_average_elevation, description='Average elevation in meters above sea level \n' '(may be provided if no DEM is available and ignored if DEM is given)', type=QgsProcessingParameterNumber.Integer, defaultValue=0), advanced=True) self.addParameter( QgsProcessingParameterFolderDestination( name=self.P_output_dir, description='Output directory where processed data and log files are saved', defaultValue=self._get_default_output_dir(), optional=True)) self.addParameter( QgsProcessingParameterFile( name=self.P_working_dir, description='Directory to be used for temporary files', behavior=QgsProcessingParameterFile.Folder, defaultValue=None, optional=True)) self.addParameter( QgsProcessingParameterNumber( name=self.P_n_lines_to_append, description='Number of lines to be added to the main image [if not given, use the whole imgap]', type=QgsProcessingParameterNumber.Integer, defaultValue=None, optional=True), advanced=True) self.addParameter( QgsProcessingParameterBoolean( name=self.P_drop_bad_bands, description='Do not include bad bands (water absorption bands 1358-1453 nm / 1814-1961 nm) ' 'in the L2A product', defaultValue=True), advanced=True) self.addParameter( QgsProcessingParameterBoolean( name=self.P_disable_progress_bars, description='Disable all progress bars during processing', defaultValue=True), advanced=True) self.addParameter( QgsProcessingParameterEnum( name=self.P_output_format, description="Output format (file format of all raster output files)", options=['GTiff', 'ENVI'], defaultValue=0), advanced=True) self.addParameter( QgsProcessingParameterEnum( name=self.P_output_interleave, description="Output raster data interleaving type", options=['band (BSQ)', 'line (BIL)', 'pixel (BIP)'], defaultValue=2), advanced=True) self.addParameter( QgsProcessingParameterNumber( name=self.P_output_nodata_value, description="Output no-data/background value (should be within the signed integer 16-bit range)", type=QgsProcessingParameterNumber.Integer, defaultValue=-32768, optional=True), advanced=True) self.addParameter( QgsProcessingParameterFile( name=self.P_path_earthSunDist, description='Input path of the earth sun distance model', defaultValue=None, optional=True), advanced=True) self.addParameter( QgsProcessingParameterFile( name=self.P_path_solar_irr, description='Input path of the solar irradiance model', defaultValue=None, optional=True), advanced=True) self.addParameter( QgsProcessingParameterNumber( name=self.P_scale_factor_toa_ref, description='Scale factor to be applied to TOA reflectance result', type=QgsProcessingParameterNumber.Integer, defaultValue=10000, minValue=0), advanced=True) # self.addParameter( # QgsProcessingParameterBoolean( # name=self.P_enable_keystone_correction, # description='Keystone correction', # defaultValue=False)) # self.addParameter( # QgsProcessingParameterBoolean( # name=self.P_enable_vnir_swir_coreg, # description='VNIR/SWIR co-registration', # defaultValue=False)) self.addParameter( QgsProcessingParameterRasterLayer( name=self.P_path_reference_image, description='Reference image for absolute co-registration.', defaultValue=None, optional=True)) self.addParameter( QgsProcessingParameterBoolean( name=self.P_enable_ac, description='Enable atmospheric correction', defaultValue=True)) self.addParameter( QgsProcessingParameterEnum( name=self.P_mode_ac, description="Atmospheric correction mode", options=['land - SICOR is applied to land AND water', 'water - POLYMER is applied to water only; land is cleared ', 'combined - SICOR is applied to land and POLYMER to water'], defaultValue=2)) self.addParameter( QgsProcessingParameterBoolean( name=self.P_polymer_additional_results, description="Enable generation of additional results from ACwater/POLYMER (if executed)", defaultValue=True)) self.addParameter( QgsProcessingParameterFile( name=self.P_polymer_root, description='Polymer root directory (that contains the subdirectory for ancillary data)', behavior=QgsProcessingParameterFile.Folder, defaultValue=self._get_default_polymer_root(), optional=True), advanced=True) self.addParameter( QgsProcessingParameterNumber( name=self.P_threads, description='Number of threads for multiprocessing when running ACwater/Polymer \n' "('0: no threads', '-1: automatic', '>0: number of threads')", type=QgsProcessingParameterNumber.Integer, defaultValue=-1, minValue=-1, maxValue=cpu_count()), advanced=True) self.addParameter( QgsProcessingParameterNumber( name=self.P_blocksize, description='Block size for multiprocessing when running ACwater/Polymer', type=QgsProcessingParameterNumber.Integer, defaultValue=100, minValue=1), advanced=True) self.addParameter( QgsProcessingParameterBoolean( name=self.P_auto_download_ecmwf, description='Automatically download ECMWF data for atmospheric correction ' 'of water surfaces in ACwater/Polymer', defaultValue=True), advanced=True) self.addParameter( QgsProcessingParameterNumber( name=self.P_scale_factor_boa_ref, description='Scale factor to be applied to BOA reflectance result', type=QgsProcessingParameterNumber.Integer, defaultValue=10000, minValue=0), advanced=True) # self.addParameter( # QgsProcessingParameterBoolean( # name=self.P_run_smile_P, # description='Smile detection and correction (provider smile coefficients are ignored)', # defaultValue=False)) self.addParameter( QgsProcessingParameterBoolean( name=self.P_run_deadpix_P, description='Dead pixel correction (based on L1B dead pixel map)', defaultValue=False)) self.addParameter( QgsProcessingParameterEnum( name=self.P_deadpix_P_algorithm, description="Algorithm for dead pixel correction", options=['spectral', 'spatial'], defaultValue=1), advanced=True) self.addParameter( QgsProcessingParameterEnum( name=self.P_deadpix_P_interp_spectral, description="Spectral interpolation algorithm to be used during dead pixel correction ", options=['linear', 'quadratic', 'cubic'], defaultValue=0), advanced=True) self.addParameter( QgsProcessingParameterEnum( name=self.P_deadpix_P_interp_spatial, description="Spatial interpolation algorithm to be used during dead pixel correction", options=['linear', 'bilinear', 'cubic', 'spline'], defaultValue=0), advanced=True) self.addParameter( QgsProcessingParameterEnum( name=self.P_ortho_resampAlg, description="Ortho-rectification resampling algorithm", options=['nearest', 'bilinear', 'gauss', 'cubic', 'cubic_spline', 'lanczos', 'average'], defaultValue=1), advanced=True) self.addParameter( QgsProcessingParameterEnum( name=self.P_vswir_overlap_algorithm, description="Algorithm specifying how to deal with the spectral bands " "in the VNIR/SWIR spectral overlap region", options=['VNIR and SWIR bands, order by wavelength', 'average VNIR and SWIR bands', 'VNIR bands only', 'SWIR bands only'], defaultValue=3), advanced=True) self.addParameter( QgsProcessingParameterEnum( self.P_target_projection_type, description='Projection type of the raster output files', options=['UTM', 'Geographic'], defaultValue=0), advanced=True) self.addParameter( QgsProcessingParameterNumber( name=self.P_target_epsg, description='Custom EPSG code of the target projection (overrides target_projection_type)', type=QgsProcessingParameterNumber.Integer, defaultValue=None, optional=True), advanced=True)
# TODO: # "target_coord_grid": "None" /*custom target coordinate grid to which the output is resampled # ([x0, x1, y0, y1], e.g., [0, 30, 0, 30])*/
[docs] @staticmethod def shortHelpString(*args, **kwargs): """Display help string. Example: '<p>Here comes the HTML documentation.</p>' \ '<h3>With Headers...</h3>' \ '<p>and Hyperlinks: <a href="">Google</a></p>' :param args: :param kwargs: """ text = \ '<p>General information about this EnMAP box app can be found ' \ '<a href="">here</a>. ' \ 'For details, e.g., about all the algorithms implemented in EnPT, take a look at the ' \ '<a href="">EnPT backend ' \ 'documentation</a>.</p>' \ '<p>Type <i>enpt -h</i> into a shell to get further information about individual parameters or check out ' \ 'the <a href="' \ 'command-line-utilities">documentation</a>.</p>' return text
[docs] def helpString(self): return self.shortHelpString()
[docs] @staticmethod def helpUrl(*args, **kwargs): return ''
[docs] @staticmethod def _get_preprocessed_parameters(parameters): # replace Enum parameters with corresponding strings (not needed in case of unittest) for n, opts in [ ('output_format', {0: 'GTiff', 1: 'ENVI'}), ('output_interleave', {0: 'band', 1: 'line', 2: 'pixel'}), ('mode_ac', {0: 'land', 1: 'water', 2: 'combined'}), ('deadpix_P_algorithm', {0: 'spectral', 1: 'spatial'}), ('deadpix_P_interp_spectral', {0: 'linear', 1: 'quadratic', 2: 'cubic'}), ('deadpix_P_interp_spatial', {0: 'linear', 1: 'bilinear', 2: 'cubic', 3: 'spline'}), ('ortho_resampAlg', {0: 'nearest', 1: 'bilinear', 2: 'gauss', 3: 'cubic', 4: 'cubic_spline', 5: 'lanczos', 6: 'average'}), ('vswir_overlap_algorithm', {0: 'order_by_wvl', 1: 'average', 2: 'vnir_only', 3: 'swir_only'}), ('target_projection_type', {0: 'UTM', 1: 'Geographic'}), ]: if isinstance(parameters[n], int): parameters[n] = opts[parameters[n]] # remove all parameters not to be forwarded to the EnPT CLI parameters = {k: v for k, v in parameters.items() if k not in ['conda_root'] and v not in [None, NULL, 'NULL', '']} return parameters
[docs] @staticmethod def _run_cmd(cmd, qgis_feedback=None, **kwargs): """Execute external command and get its stdout, exitcode and stderr. Code based on: :param cmd: a normal shell command including parameters """ def reader(pipe, queue): try: with pipe: for line in iter(pipe.readline, b''): queue.put((pipe, line)) finally: queue.put(None) process = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True, **kwargs) q = Queue() Thread(target=reader, args=[process.stdout, q]).start() Thread(target=reader, args=[process.stderr, q]).start() stdout_qname = None stderr_qname = None # for _ in range(2): for source, line in iter(q.get, None): if qgis_feedback.isCanceled(): # qgis_feedback.reportError('CANCELED') proc2kill = psutil.Process( for proc in proc2kill.children(recursive=True): proc.kill() proc2kill.kill() raise KeyboardInterrupt linestr = line.decode('latin-1').rstrip() # print("%s: %s" % (source, linestr)) # source name seems to be platfor/environment specific, so grab it from dummy STDOUT/STDERR messages. if linestr == 'Connecting to EnPT STDOUT stream.': stdout_qname = continue if linestr == 'Connecting to EnPT STDERR stream.': stderr_qname = continue if == stdout_qname: qgis_feedback.pushInfo(linestr) elif == stderr_qname: qgis_feedback.reportError(linestr) else: qgis_feedback.reportError(linestr) exitcode = process.poll() return exitcode
[docs] def _handle_results(self, parameters: dict, feedback, exitcode: int) -> dict: success = False if exitcode: feedback.reportError("\n" + "=" * 60 + "\n" + "An exception occurred. Processing failed.") # list output dir if 'output_dir' in parameters: outdir = parameters['output_dir'] outraster_matches = \ glob(os.path.join(outdir, '*', '*SPECTRAL_IMAGE.TIF')) or \ glob(os.path.join(outdir, '*', '*SPECTRAL_IMAGE.bsq')) or \ glob(os.path.join(outdir, '*', '*SPECTRAL_IMAGE.bil')) or \ glob(os.path.join(outdir, '*', '*SPECTRAL_IMAGE.bip')) outraster = outraster_matches[0] if len(outraster_matches) > 0 else None if os.path.isdir(outdir): if os.listdir(outdir): feedback.pushInfo("The output folder '%s' contains:\n" % outdir) feedback.pushCommandInfo('\n'.join([os.path.basename(f) for f in os.listdir(outdir)]) + '\n') if outraster: subdir = os.path.dirname(outraster_matches[0]) feedback.pushInfo(subdir) feedback.pushInfo("...where the folder '%s' contains:\n" % os.path.split(subdir)[-1]) feedback.pushCommandInfo('\n'.join(sorted([os.path.basename(f) for f in os.listdir(subdir)])) + '\n') success = True else: feedback.reportError("No output raster was written.") else: feedback.reportError("The output folder is empty.") else: feedback.reportError("No output folder created.") # return outputs if success: return { 'success': True, self.P_OUTPUT_RASTER: outraster, # self.P_OUTPUT_VECTOR: parameters[self.P_OUTPUT_RASTER], # self.P_OUTPUT_FILE: parameters[self.P_OUTPUT_RASTER], self.P_OUTPUT_FOLDER: outdir } else: return {'success': False} else: feedback.pushInfo('The output was skipped according to user setting.') return {'success': True}