Coverage for enpt/execution/controller.py: 86%

117 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-03-07 11:39 +0000

1# -*- coding: utf-8 -*- 

2 

3# EnPT, EnMAP Processing Tool - A Python package for pre-processing of EnMAP Level-1B data 

4# 

5# Copyright (C) 2018-2024 Karl Segl (GFZ Potsdam, segl@gfz-potsdam.de), Daniel Scheffler 

6# (GFZ Potsdam, danschef@gfz-potsdam.de), Niklas Bohn (GFZ Potsdam, nbohn@gfz-potsdam.de), 

7# Stéphane Guillaso (GFZ Potsdam, stephane.guillaso@gfz-potsdam.de) 

8# 

9# This software was developed within the context of the EnMAP project supported 

10# by the DLR Space Administration with funds of the German Federal Ministry of 

11# Economic Affairs and Energy (on the basis of a decision by the German Bundestag: 

12# 50 EE 1529) and contributions from DLR, GFZ and OHB System AG. 

13# 

14# This program is free software: you can redistribute it and/or modify it under 

15# the terms of the GNU General Public License as published by the Free Software 

16# Foundation, either version 3 of the License, or (at your option) any later 

17# version. Please note the following exception: `EnPT` depends on tqdm, which 

18# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files 

19# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore". 

20# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE. 

21# 

22# This program is distributed in the hope that it will be useful, but WITHOUT 

23# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

24# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

25# details. 

26# 

27# You should have received a copy of the GNU Lesser General Public License along 

28# with this program. If not, see <https://www.gnu.org/licenses/>. 

29 

30"""EnPT process controller module.""" 

31 

32import os 

33import sys 

34import tempfile 

35import zipfile 

36import shutil 

37import weakref 

38import warnings 

39import pickle 

40from typing import Optional 

41from time import time 

42from datetime import timedelta 

43from glob import glob 

44 

45from ..options.config import EnPTConfig 

46from ..io.reader import L1B_Reader 

47from ..model.images import EnMAPL1Product_SensorGeo, EnMAPL2Product_MapGeo # noqa: F401 

48 

49__author__ = 'Daniel Scheffler' 

50 

51 

52class EnPT_Controller(object): 

53 """Class of EnPT process controller.""" 

54 

55 def __init__(self, config: EnPTConfig = None, **config_kwargs): 

56 """Initialize the Process Controller. 

57 

58 :param config: an instance of the EnPTConfig class (overrides config_kwargs) 

59 :param config_kwargs: configuration parameters to be passed to EnPTConfig class 

60 """ 

61 self.cfg: EnPTConfig = config or EnPTConfig(**config_kwargs) 

62 

63 # generate temporary directory (must be deleted at the end of the pipeline) 

64 self.tempDir = tempfile.mkdtemp() 

65 

66 # setup a finalizer that destroys remaining data (directories, etc.) in case of unexpected exit 

67 self._finalizer = weakref.finalize(self, self._cleanup, self.tempDir, 

68 warn_message="Implicitly cleaning up {!r}".format(self)) 

69 

70 # record startup time 

71 self._time_startup = time() 

72 

73 # defaults 

74 self.L1_obj: Optional[EnMAPL1Product_SensorGeo] = None 

75 self.L2_obj: Optional[EnMAPL2Product_MapGeo] = None 

76 

77 def extract_zip_archive(self, path_zipfile: str, subdir: str = '') -> str: 

78 """Extract the given EnMAP image zip archive and return the L1B image root directory path. 

79 

80 :param path_zipfile: /path/to/zipfile.zip 

81 :param subdir: subdirectory name to be created within temporary directory 

82 :return: /tmp/tmpk2qp0yri/rootdir/ 

83 """ 

84 outdir = os.path.join(self.tempDir, subdir) if not self.cfg.is_dummy_dataformat else self.tempDir 

85 

86 with zipfile.ZipFile(path_zipfile, "r") as zf: 

87 zf.extractall(outdir) 

88 

89 # move the data one level up in case they are within a sub-folder in the zip file 

90 if not self.cfg.is_dummy_dataformat: 

91 content = glob(os.path.join(outdir, '*')) 

92 

93 if len(content) == 1 and os.path.isdir(content[0]): 

94 for fp in glob(os.path.join(outdir, '**', '*')): 

95 shutil.move(fp, outdir) 

96 shutil.rmtree(content[0]) 

97 

98 if not os.path.isdir(outdir): 

99 raise NotADirectoryError(outdir) 

100 

101 if not self.cfg.is_dummy_dataformat: 

102 return outdir 

103 else: 

104 return os.path.join(self.tempDir, os.path.splitext(os.path.basename(path_zipfile))[0]) 

105 

106 def read_L1B_data(self) -> None: 

107 """Read the provider L1B data given in config and return an EnMAP image object.""" 

108 path_enmap_image = self.cfg.path_l1b_enmap_image 

109 path_enmap_image_gapfill = self.cfg.path_l1b_enmap_image_gapfill 

110 

111 # input validation 

112 if not os.path.isdir(path_enmap_image) and \ 

113 not (os.path.exists(path_enmap_image) and zipfile.is_zipfile(path_enmap_image)): 

114 raise ValueError("The parameter 'path_enmap_image' must be a directory or the path to an existing zip " 

115 "archive. Received %s." % path_enmap_image) 

116 

117 # extract L1B image archive if needed 

118 if zipfile.is_zipfile(path_enmap_image): 

119 path_enmap_image = self.extract_zip_archive(path_enmap_image, subdir='image_main') 

120 

121 # extract L1B gap fill image archive if needed 

122 if path_enmap_image_gapfill and zipfile.is_zipfile(path_enmap_image_gapfill): 

123 path_enmap_image_gapfill = self.extract_zip_archive(path_enmap_image_gapfill, subdir='image_gapfill') 

124 

125 # run the reader 

126 RD = L1B_Reader(config=self.cfg) 

127 self.L1_obj = RD.read_inputdata(root_dir_main=path_enmap_image, root_dir_ext=path_enmap_image_gapfill, 

128 n_line_ext=self.cfg.n_lines_to_append) 

129 

130 def run_toaRad2toaRef(self): 

131 """Run conversion from TOA radiance to TOA reflectance.""" 

132 # get a new instance of radiometric transformer 

133 from ..processors.radiometric_transform import Radiometric_Transformer 

134 RT = Radiometric_Transformer(self.cfg) 

135 

136 # run transformation to TOARef 

137 self.L1_obj = RT.transform_TOARad2TOARef(self.L1_obj) 

138 

139 def run_dem_processor(self): 

140 self.L1_obj.get_preprocessed_dem() 

141 

142 def run_spatial_optimization(self): 

143 # get a new instance of Spatial_Optimizer 

144 from ..processors.spatial_optimization import Spatial_Optimizer 

145 SpO = Spatial_Optimizer(self.cfg) 

146 

147 # run optimization 

148 self.L1_obj = SpO.optimize_geolayer(self.L1_obj) 

149 

150 def run_atmospheric_correction(self): 

151 """Run atmospheric correction only.""" 

152 self.L1_obj.run_AC() 

153 

154 def run_orthorectification(self): 

155 """Run orthorectification to transform L1 sensor geometry image to L2 map geometry.""" 

156 # run transformation from sensor to map geometry 

157 self.L2_obj = EnMAPL2Product_MapGeo.from_L1B_sensorgeo(config=self.cfg, enmap_ImageL1=self.L1_obj) 

158 

159 def write_output(self): 

160 if self.cfg.output_dir: 

161 if self.L2_obj is not None: 

162 self.L2_obj.save(self.cfg.output_dir) 

163 else: 

164 self.L1_obj.save(self.cfg.output_dir) 

165 

166 def run_all_processors(self): 

167 """Run all processors at once.""" 

168 if os.getenv('IS_ENPT_GUI_TEST') != "1": 

169 try: 

170 if os.getenv('IS_ENPT_GUI_CALL') == "1": 

171 self._write_to_stdout_stderr() 

172 

173 if self.cfg.log_level == 'DEBUG': 

174 self._print_received_configuration() 

175 

176 self.read_L1B_data() 

177 if self.cfg.run_deadpix_P: 

178 self.L1_obj.correct_dead_pixels() 

179 if self.cfg.enable_absolute_coreg: 

180 # self.run_toaRad2toaRef() # this is only needed for geometry processor but AC expects radiance 

181 self.run_spatial_optimization() 

182 self.run_dem_processor() 

183 if self.cfg.enable_ac: 

184 self.run_atmospheric_correction() 

185 

186 if self.cfg.run_deadpix_P: 

187 # re-apply dead pixel correction 

188 self.L1_obj.logger.info( 

189 'Re-applying dead pixel correction to correct for spectral spikes due to fringe effect.') 

190 self.L1_obj.correct_dead_pixels() 

191 else: 

192 self.L1_obj.logger.info('Skipping atmospheric correction as configured and ' 

193 'computing top-of-atmosphere reflectance instead.') 

194 self.run_toaRad2toaRef() 

195 # self.run_spatial_optimization() 

196 self.run_orthorectification() 

197 self.write_output() 

198 

199 self.L1_obj.logger.info('Total runtime of the processing chain: %s' 

200 % timedelta(seconds=time() - self._time_startup)) 

201 finally: 

202 self.cleanup() 

203 

204 else: 

205 self._print_received_configuration() 

206 

207 if not os.path.isdir(self.cfg.output_dir): 

208 raise NotADirectoryError(self.cfg.output_dir) 

209 

210 with open(os.path.join(self.cfg.output_dir, 'received_args_kwargs.pkl'), 'wb') as outF: 

211 pickle.dump( 

212 dict( 

213 json_config=self.cfg.json_config, 

214 kwargs=self.cfg.kwargs 

215 ), 

216 outF) 

217 

218 @staticmethod 

219 def _write_to_stdout_stderr(): 

220 """Write to STDOUT and STDERR to reveal Queue.name of these streams to enpt_enmapboxapp.""" 

221 sys.stdout.write('Connecting to EnPT STDOUT stream.\n') 

222 sys.stdout.flush() 

223 sys.stderr.write('Connecting to EnPT STDERR stream.\n') 

224 sys.stderr.flush() 

225 

226 def _print_received_configuration(self): 

227 print('EnPT Controller received the following configuration:') 

228 print(repr(self.cfg)) 

229 

230 @classmethod 

231 def _cleanup(cls, tempDir, warn_message): 

232 """Clean up implicitly (not to be called directly).""" 

233 if tempDir: 

234 shutil.rmtree(tempDir) 

235 warnings.warn(warn_message, ResourceWarning) 

236 

237 def cleanup(self): 

238 """Clean up (to be called directly).""" 

239 if self._finalizer.detach(): 

240 if self.tempDir: 

241 shutil.rmtree(self.tempDir)