Coverage for enpt/execution/controller.py: 86%
117 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-07 11:39 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-07 11:39 +0000
1# -*- coding: utf-8 -*-
3# EnPT, EnMAP Processing Tool - A Python package for pre-processing of EnMAP Level-1B data
4#
5# Copyright (C) 2018-2024 Karl Segl (GFZ Potsdam, segl@gfz-potsdam.de), Daniel Scheffler
6# (GFZ Potsdam, danschef@gfz-potsdam.de), Niklas Bohn (GFZ Potsdam, nbohn@gfz-potsdam.de),
7# Stéphane Guillaso (GFZ Potsdam, stephane.guillaso@gfz-potsdam.de)
8#
9# This software was developed within the context of the EnMAP project supported
10# by the DLR Space Administration with funds of the German Federal Ministry of
11# Economic Affairs and Energy (on the basis of a decision by the German Bundestag:
12# 50 EE 1529) and contributions from DLR, GFZ and OHB System AG.
13#
14# This program is free software: you can redistribute it and/or modify it under
15# the terms of the GNU General Public License as published by the Free Software
16# Foundation, either version 3 of the License, or (at your option) any later
17# version. Please note the following exception: `EnPT` depends on tqdm, which
18# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
19# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
20# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
21#
22# This program is distributed in the hope that it will be useful, but WITHOUT
23# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
24# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
25# details.
26#
27# You should have received a copy of the GNU Lesser General Public License along
28# with this program. If not, see <https://www.gnu.org/licenses/>.
30"""EnPT process controller module."""
32import os
33import sys
34import tempfile
35import zipfile
36import shutil
37import weakref
38import warnings
39import pickle
40from typing import Optional
41from time import time
42from datetime import timedelta
43from glob import glob
45from ..options.config import EnPTConfig
46from ..io.reader import L1B_Reader
47from ..model.images import EnMAPL1Product_SensorGeo, EnMAPL2Product_MapGeo # noqa: F401
49__author__ = 'Daniel Scheffler'
52class EnPT_Controller(object):
53 """Class of EnPT process controller."""
55 def __init__(self, config: EnPTConfig = None, **config_kwargs):
56 """Initialize the Process Controller.
58 :param config: an instance of the EnPTConfig class (overrides config_kwargs)
59 :param config_kwargs: configuration parameters to be passed to EnPTConfig class
60 """
61 self.cfg: EnPTConfig = config or EnPTConfig(**config_kwargs)
63 # generate temporary directory (must be deleted at the end of the pipeline)
64 self.tempDir = tempfile.mkdtemp()
66 # setup a finalizer that destroys remaining data (directories, etc.) in case of unexpected exit
67 self._finalizer = weakref.finalize(self, self._cleanup, self.tempDir,
68 warn_message="Implicitly cleaning up {!r}".format(self))
70 # record startup time
71 self._time_startup = time()
73 # defaults
74 self.L1_obj: Optional[EnMAPL1Product_SensorGeo] = None
75 self.L2_obj: Optional[EnMAPL2Product_MapGeo] = None
77 def extract_zip_archive(self, path_zipfile: str, subdir: str = '') -> str:
78 """Extract the given EnMAP image zip archive and return the L1B image root directory path.
80 :param path_zipfile: /path/to/zipfile.zip
81 :param subdir: subdirectory name to be created within temporary directory
82 :return: /tmp/tmpk2qp0yri/rootdir/
83 """
84 outdir = os.path.join(self.tempDir, subdir) if not self.cfg.is_dummy_dataformat else self.tempDir
86 with zipfile.ZipFile(path_zipfile, "r") as zf:
87 zf.extractall(outdir)
89 # move the data one level up in case they are within a sub-folder in the zip file
90 if not self.cfg.is_dummy_dataformat:
91 content = glob(os.path.join(outdir, '*'))
93 if len(content) == 1 and os.path.isdir(content[0]):
94 for fp in glob(os.path.join(outdir, '**', '*')):
95 shutil.move(fp, outdir)
96 shutil.rmtree(content[0])
98 if not os.path.isdir(outdir):
99 raise NotADirectoryError(outdir)
101 if not self.cfg.is_dummy_dataformat:
102 return outdir
103 else:
104 return os.path.join(self.tempDir, os.path.splitext(os.path.basename(path_zipfile))[0])
106 def read_L1B_data(self) -> None:
107 """Read the provider L1B data given in config and return an EnMAP image object."""
108 path_enmap_image = self.cfg.path_l1b_enmap_image
109 path_enmap_image_gapfill = self.cfg.path_l1b_enmap_image_gapfill
111 # input validation
112 if not os.path.isdir(path_enmap_image) and \
113 not (os.path.exists(path_enmap_image) and zipfile.is_zipfile(path_enmap_image)):
114 raise ValueError("The parameter 'path_enmap_image' must be a directory or the path to an existing zip "
115 "archive. Received %s." % path_enmap_image)
117 # extract L1B image archive if needed
118 if zipfile.is_zipfile(path_enmap_image):
119 path_enmap_image = self.extract_zip_archive(path_enmap_image, subdir='image_main')
121 # extract L1B gap fill image archive if needed
122 if path_enmap_image_gapfill and zipfile.is_zipfile(path_enmap_image_gapfill):
123 path_enmap_image_gapfill = self.extract_zip_archive(path_enmap_image_gapfill, subdir='image_gapfill')
125 # run the reader
126 RD = L1B_Reader(config=self.cfg)
127 self.L1_obj = RD.read_inputdata(root_dir_main=path_enmap_image, root_dir_ext=path_enmap_image_gapfill,
128 n_line_ext=self.cfg.n_lines_to_append)
130 def run_toaRad2toaRef(self):
131 """Run conversion from TOA radiance to TOA reflectance."""
132 # get a new instance of radiometric transformer
133 from ..processors.radiometric_transform import Radiometric_Transformer
134 RT = Radiometric_Transformer(self.cfg)
136 # run transformation to TOARef
137 self.L1_obj = RT.transform_TOARad2TOARef(self.L1_obj)
139 def run_dem_processor(self):
140 self.L1_obj.get_preprocessed_dem()
142 def run_spatial_optimization(self):
143 # get a new instance of Spatial_Optimizer
144 from ..processors.spatial_optimization import Spatial_Optimizer
145 SpO = Spatial_Optimizer(self.cfg)
147 # run optimization
148 self.L1_obj = SpO.optimize_geolayer(self.L1_obj)
150 def run_atmospheric_correction(self):
151 """Run atmospheric correction only."""
152 self.L1_obj.run_AC()
154 def run_orthorectification(self):
155 """Run orthorectification to transform L1 sensor geometry image to L2 map geometry."""
156 # run transformation from sensor to map geometry
157 self.L2_obj = EnMAPL2Product_MapGeo.from_L1B_sensorgeo(config=self.cfg, enmap_ImageL1=self.L1_obj)
159 def write_output(self):
160 if self.cfg.output_dir:
161 if self.L2_obj is not None:
162 self.L2_obj.save(self.cfg.output_dir)
163 else:
164 self.L1_obj.save(self.cfg.output_dir)
166 def run_all_processors(self):
167 """Run all processors at once."""
168 if os.getenv('IS_ENPT_GUI_TEST') != "1":
169 try:
170 if os.getenv('IS_ENPT_GUI_CALL') == "1":
171 self._write_to_stdout_stderr()
173 if self.cfg.log_level == 'DEBUG':
174 self._print_received_configuration()
176 self.read_L1B_data()
177 if self.cfg.run_deadpix_P:
178 self.L1_obj.correct_dead_pixels()
179 if self.cfg.enable_absolute_coreg:
180 # self.run_toaRad2toaRef() # this is only needed for geometry processor but AC expects radiance
181 self.run_spatial_optimization()
182 self.run_dem_processor()
183 if self.cfg.enable_ac:
184 self.run_atmospheric_correction()
186 if self.cfg.run_deadpix_P:
187 # re-apply dead pixel correction
188 self.L1_obj.logger.info(
189 'Re-applying dead pixel correction to correct for spectral spikes due to fringe effect.')
190 self.L1_obj.correct_dead_pixels()
191 else:
192 self.L1_obj.logger.info('Skipping atmospheric correction as configured and '
193 'computing top-of-atmosphere reflectance instead.')
194 self.run_toaRad2toaRef()
195 # self.run_spatial_optimization()
196 self.run_orthorectification()
197 self.write_output()
199 self.L1_obj.logger.info('Total runtime of the processing chain: %s'
200 % timedelta(seconds=time() - self._time_startup))
201 finally:
202 self.cleanup()
204 else:
205 self._print_received_configuration()
207 if not os.path.isdir(self.cfg.output_dir):
208 raise NotADirectoryError(self.cfg.output_dir)
210 with open(os.path.join(self.cfg.output_dir, 'received_args_kwargs.pkl'), 'wb') as outF:
211 pickle.dump(
212 dict(
213 json_config=self.cfg.json_config,
214 kwargs=self.cfg.kwargs
215 ),
216 outF)
218 @staticmethod
219 def _write_to_stdout_stderr():
220 """Write to STDOUT and STDERR to reveal Queue.name of these streams to enpt_enmapboxapp."""
221 sys.stdout.write('Connecting to EnPT STDOUT stream.\n')
222 sys.stdout.flush()
223 sys.stderr.write('Connecting to EnPT STDERR stream.\n')
224 sys.stderr.flush()
226 def _print_received_configuration(self):
227 print('EnPT Controller received the following configuration:')
228 print(repr(self.cfg))
230 @classmethod
231 def _cleanup(cls, tempDir, warn_message):
232 """Clean up implicitly (not to be called directly)."""
233 if tempDir:
234 shutil.rmtree(tempDir)
235 warnings.warn(warn_message, ResourceWarning)
237 def cleanup(self):
238 """Clean up (to be called directly)."""
239 if self._finalizer.detach():
240 if self.tempDir:
241 shutil.rmtree(self.tempDir)