# -*- coding: utf-8 -*-
# EnPT, EnMAP Processing Tool - A Python package for pre-processing of EnMAP Level-1B data
#
# Copyright (C) 2018-2024 Karl Segl (GFZ Potsdam, segl@gfz-potsdam.de), Daniel Scheffler
# (GFZ Potsdam, danschef@gfz-potsdam.de), Niklas Bohn (GFZ Potsdam, nbohn@gfz-potsdam.de),
# Stéphane Guillaso (GFZ Potsdam, stephane.guillaso@gfz-potsdam.de)
#
# This software was developed within the context of the EnMAP project supported
# by the DLR Space Administration with funds of the German Federal Ministry of
# Economic Affairs and Energy (on the basis of a decision by the German Bundestag:
# 50 EE 1529) and contributions from DLR, GFZ and OHB System AG.
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version. Please note the following exception: `EnPT` depends on tqdm, which
# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <https://www.gnu.org/licenses/>.
"""EnPT process controller module."""
import os
import sys
import tempfile
import zipfile
import shutil
import weakref
import warnings
import pickle
from typing import Optional
from time import time
from datetime import timedelta
from glob import glob
from ..options.config import EnPTConfig
from ..io.reader import L1B_Reader
from ..model.images import EnMAPL1Product_SensorGeo, EnMAPL2Product_MapGeo # noqa: F401
__author__ = 'Daniel Scheffler'
[docs]
class EnPT_Controller(object):
"""Class of EnPT process controller."""
def __init__(self, config: EnPTConfig = None, **config_kwargs):
"""Initialize the Process Controller.
:param config: an instance of the EnPTConfig class (overrides config_kwargs)
:param config_kwargs: configuration parameters to be passed to EnPTConfig class
"""
self.cfg: EnPTConfig = config or EnPTConfig(**config_kwargs)
# generate temporary directory (must be deleted at the end of the pipeline)
self.tempDir = tempfile.mkdtemp()
# setup a finalizer that destroys remaining data (directories, etc.) in case of unexpected exit
self._finalizer = weakref.finalize(self, self._cleanup, self.tempDir,
warn_message="Implicitly cleaning up {!r}".format(self))
# record startup time
self._time_startup = time()
# defaults
self.L1_obj: Optional[EnMAPL1Product_SensorGeo] = None
self.L2_obj: Optional[EnMAPL2Product_MapGeo] = None
[docs]
def read_L1B_data(self) -> None:
"""Read the provider L1B data given in config and return an EnMAP image object."""
path_enmap_image = self.cfg.path_l1b_enmap_image
path_enmap_image_gapfill = self.cfg.path_l1b_enmap_image_gapfill
# input validation
if not os.path.isdir(path_enmap_image) and \
not (os.path.exists(path_enmap_image) and zipfile.is_zipfile(path_enmap_image)):
raise ValueError("The parameter 'path_enmap_image' must be a directory or the path to an existing zip "
"archive. Received %s." % path_enmap_image)
# extract L1B image archive if needed
if zipfile.is_zipfile(path_enmap_image):
path_enmap_image = self.extract_zip_archive(path_enmap_image, subdir='image_main')
# extract L1B gap fill image archive if needed
if path_enmap_image_gapfill and zipfile.is_zipfile(path_enmap_image_gapfill):
path_enmap_image_gapfill = self.extract_zip_archive(path_enmap_image_gapfill, subdir='image_gapfill')
# run the reader
RD = L1B_Reader(config=self.cfg)
self.L1_obj = RD.read_inputdata(root_dir_main=path_enmap_image, root_dir_ext=path_enmap_image_gapfill,
n_line_ext=self.cfg.n_lines_to_append)
[docs]
def run_toaRad2toaRef(self):
"""Run conversion from TOA radiance to TOA reflectance."""
# get a new instance of radiometric transformer
from ..processors.radiometric_transform import Radiometric_Transformer
RT = Radiometric_Transformer(self.cfg)
# run transformation to TOARef
self.L1_obj = RT.transform_TOARad2TOARef(self.L1_obj)
[docs]
def run_dem_processor(self):
self.L1_obj.get_preprocessed_dem()
[docs]
def run_spatial_optimization(self):
# get a new instance of Spatial_Optimizer
from ..processors.spatial_optimization import Spatial_Optimizer
SpO = Spatial_Optimizer(self.cfg)
# run optimization
self.L1_obj = SpO.optimize_geolayer(self.L1_obj)
[docs]
def run_atmospheric_correction(self):
"""Run atmospheric correction only."""
self.L1_obj.run_AC()
[docs]
def run_orthorectification(self):
"""Run orthorectification to transform L1 sensor geometry image to L2 map geometry."""
# run transformation from sensor to map geometry
self.L2_obj = EnMAPL2Product_MapGeo.from_L1B_sensorgeo(config=self.cfg, enmap_ImageL1=self.L1_obj)
[docs]
def write_output(self):
if self.cfg.output_dir:
if self.L2_obj is not None:
self.L2_obj.save(self.cfg.output_dir)
else:
self.L1_obj.save(self.cfg.output_dir)
[docs]
def run_all_processors(self):
"""Run all processors at once."""
if os.getenv('IS_ENPT_GUI_TEST') != "1":
try:
if os.getenv('IS_ENPT_GUI_CALL') == "1":
self._write_to_stdout_stderr()
if self.cfg.log_level == 'DEBUG':
self._print_received_configuration()
self.read_L1B_data()
if self.cfg.run_deadpix_P:
self.L1_obj.correct_dead_pixels()
if self.cfg.enable_absolute_coreg:
# self.run_toaRad2toaRef() # this is only needed for geometry processor but AC expects radiance
self.run_spatial_optimization()
self.run_dem_processor()
if self.cfg.enable_ac:
self.run_atmospheric_correction()
if self.cfg.run_deadpix_P:
# re-apply dead pixel correction
self.L1_obj.logger.info(
'Re-applying dead pixel correction to correct for spectral spikes due to fringe effect.')
self.L1_obj.correct_dead_pixels()
else:
self.L1_obj.logger.info('Skipping atmospheric correction as configured and '
'computing top-of-atmosphere reflectance instead.')
self.run_toaRad2toaRef()
# self.run_spatial_optimization()
self.run_orthorectification()
self.write_output()
self.L1_obj.logger.info('Total runtime of the processing chain: %s'
% timedelta(seconds=time() - self._time_startup))
finally:
self.cleanup()
else:
self._print_received_configuration()
if not os.path.isdir(self.cfg.output_dir):
raise NotADirectoryError(self.cfg.output_dir)
with open(os.path.join(self.cfg.output_dir, 'received_args_kwargs.pkl'), 'wb') as outF:
pickle.dump(
dict(
json_config=self.cfg.json_config,
kwargs=self.cfg.kwargs
),
outF)
[docs]
@staticmethod
def _write_to_stdout_stderr():
"""Write to STDOUT and STDERR to reveal Queue.name of these streams to enpt_enmapboxapp."""
sys.stdout.write('Connecting to EnPT STDOUT stream.\n')
sys.stdout.flush()
sys.stderr.write('Connecting to EnPT STDERR stream.\n')
sys.stderr.flush()
[docs]
def _print_received_configuration(self):
print('EnPT Controller received the following configuration:')
print(repr(self.cfg))
[docs]
@classmethod
def _cleanup(cls, tempDir, warn_message):
"""Clean up implicitly (not to be called directly)."""
if tempDir:
shutil.rmtree(tempDir)
warnings.warn(warn_message, ResourceWarning)
[docs]
def cleanup(self):
"""Clean up (to be called directly)."""
if self._finalizer.detach():
if self.tempDir:
shutil.rmtree(self.tempDir)