Coverage for enpt/processors/spatial_transform/spatial_transform.py: 94%

250 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-03-07 11:39 +0000

1# -*- coding: utf-8 -*- 

2 

3# EnPT, EnMAP Processing Tool - A Python package for pre-processing of EnMAP Level-1B data 

4# 

5# Copyright (C) 2018-2024 Karl Segl (GFZ Potsdam, segl@gfz-potsdam.de), Daniel Scheffler 

6# (GFZ Potsdam, danschef@gfz-potsdam.de), Niklas Bohn (GFZ Potsdam, nbohn@gfz-potsdam.de), 

7# Stéphane Guillaso (GFZ Potsdam, stephane.guillaso@gfz-potsdam.de) 

8# 

9# This software was developed within the context of the EnMAP project supported 

10# by the DLR Space Administration with funds of the German Federal Ministry of 

11# Economic Affairs and Energy (on the basis of a decision by the German Bundestag: 

12# 50 EE 1529) and contributions from DLR, GFZ and OHB System AG. 

13# 

14# This program is free software: you can redistribute it and/or modify it under 

15# the terms of the GNU General Public License as published by the Free Software 

16# Foundation, either version 3 of the License, or (at your option) any later 

17# version. Please note the following exception: `EnPT` depends on tqdm, which 

18# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files 

19# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore". 

20# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE. 

21# 

22# This program is distributed in the hope that it will be useful, but WITHOUT 

23# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

24# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

25# details. 

26# 

27# You should have received a copy of the GNU Lesser General Public License along 

28# with this program. If not, see <https://www.gnu.org/licenses/>. 

29 

30"""EnPT module 'spatial transform', containing everything related to spatial transformations.""" 

31from typing import Union, Tuple, List, Optional, Sequence # noqa: F401 

32from multiprocessing import Pool, cpu_count 

33from collections import OrderedDict 

34import numpy as np 

35from scipy.interpolate import LinearNDInterpolator, make_interp_spline 

36from scipy.spatial import Delaunay 

37from geoarray import GeoArray 

38from natsort import natsorted 

39import numpy_indexed as npi 

40from pyproj import CRS 

41 

42from sensormapgeo import Transformer 

43from sensormapgeo.pyresample_backend.transformer_2d import AreaDefinition 

44from py_tools_ds.geo.projection import prj_equal 

45from py_tools_ds.geo.coord_grid import find_nearest 

46from py_tools_ds.geo.coord_trafo import transform_any_prj, transform_coordArray 

47 

48from ...options.config import enmap_coordinate_grid_utm 

49 

50__author__ = 'Daniel Scheffler' 

51 

52 

53class Geometry_Transformer(Transformer): 

54 # use Sentinel-2 grid (30m grid with origin at 0/0) 

55 # EnMAP geolayer contains pixel center coordinate 

56 

57 def to_sensor_geometry(self, 

58 path_or_geoarray_mapgeo: Union[str, GeoArray], 

59 src_gt: Tuple[float, float, float, float, float, float] = None, 

60 src_prj: Union[str, int] = None, 

61 src_nodata: int = None, 

62 tgt_nodata: int = None 

63 ) -> np.ndarray: 

64 data_mapgeo = GeoArray(path_or_geoarray_mapgeo) 

65 

66 if not data_mapgeo.is_map_geo: 

67 raise RuntimeError('The dataset to be transformed into sensor geometry already represents sensor geometry.') 

68 

69 return super().to_sensor_geometry( 

70 data_mapgeo[:], 

71 src_gt=src_gt or data_mapgeo.gt, 

72 src_prj=src_prj or data_mapgeo.prj, 

73 src_nodata=src_nodata, 

74 tgt_nodata=tgt_nodata 

75 ) 

76 

77 def to_map_geometry(self, 

78 path_or_geoarray_sensorgeo: Union[str, GeoArray, np.ndarray], 

79 tgt_prj: Union[str, int], 

80 tgt_extent: Tuple[float, float, float, float] = None, 

81 tgt_res: Tuple = None, 

82 tgt_coordgrid: Tuple[Tuple, Tuple] = None, 

83 src_nodata: int = None, 

84 tgt_nodata: int = None, 

85 area_definition: AreaDefinition = None 

86 ) -> Tuple[np.ndarray, tuple, str]: 

87 data_sensorgeo = GeoArray(path_or_geoarray_sensorgeo) 

88 

89 if data_sensorgeo.is_map_geo and not data_sensorgeo.is_rotated: 

90 raise RuntimeError('The dataset to be transformed into map geometry already represents map geometry.') 

91 

92 # run transformation (output extent/area definition etc. is internally computed from the geolayers if not given) 

93 out_data, out_gt, out_prj = \ 

94 super().to_map_geometry( 

95 data_sensorgeo[:], 

96 tgt_prj=tgt_prj, 

97 tgt_extent=tgt_extent, 

98 tgt_res=tgt_res, 

99 tgt_coordgrid=tgt_coordgrid, 

100 src_nodata=src_nodata, 

101 tgt_nodata=tgt_nodata, 

102 area_definition=area_definition 

103 ) 

104 

105 return out_data, out_gt, out_prj 

106 

107 

108class VNIR_SWIR_SensorGeometryTransformer(object): 

109 """Class to transform between EnMAP VNIR and SWIR sensor geometry.""" 

110 

111 def __init__(self, 

112 lons_vnir: np.ndarray, 

113 lats_vnir: np.ndarray, 

114 lons_swir: np.ndarray, 

115 lats_swir: np.ndarray, 

116 prj_vnir: Union[str, int], 

117 prj_swir: Union[str, int], 

118 res_vnir: Tuple[float, float], 

119 res_swir: Tuple[float, float], 

120 resamp_alg: str = 'nearest', 

121 src_nodata: int = None, 

122 tgt_nodata: int = None, 

123 nprocs: int = cpu_count()) -> None: 

124 """Get an instance of VNIR_SWIR_SensorGeometryTransformer. 

125 

126 :param lons_vnir: VNIR longitude array corresponding to the sensor geometry arrays passed later (2D or 3D) 

127 :param lats_vnir: VNIR latitude array corresponding to the sensor geometry arrays passed later (2D or 3D) 

128 :param lons_swir: SWIR longitude array corresponding to the sensor geometry arrays passed later (2D or 3D) 

129 :param lats_swir: SWIR latitude array corresponding to the sensor geometry arrays passed later (2D or 3D) 

130 :param prj_vnir: projection of the VNIR if it would be transformed to map geometry (WKT string or EPSG code) 

131 :param prj_swir: projection of the SWIR if it would be transformed to map geometry (WKT string or EPSG code) 

132 :param res_vnir: pixel dimensions of the VNIR if it would be transformed to map geometry (X, Y) 

133 :param res_swir: pixel dimensions of the SWIR if it would be transformed to map geometry (X, Y) 

134 :param resamp_alg: resampling algorithm ('nearest', 'near', 'bilinear', 'cubic', 'cubic_spline', 

135 'lanczos', 'average', 'mode', 'max', 'min', 'med', 'q1', 'q3') 

136 :param src_nodata: nodata value of the input array 

137 :param tgt_nodata: pixel value to be used for undetermined pixels in the output 

138 :param nprocs: number of CPU cores to use 

139 """ 

140 self.vnir_meta = dict( 

141 lons=lons_vnir, 

142 lats=lats_vnir, 

143 prj=prj_vnir, 

144 res=res_vnir 

145 ) 

146 self.swir_meta = dict( 

147 lons=lons_swir, 

148 lats=lats_swir, 

149 prj=prj_swir, 

150 res=res_swir 

151 ) 

152 self.resamp_alg = resamp_alg 

153 self.src_nodata = src_nodata 

154 self.tgt_nodata = tgt_nodata 

155 self.cpus = nprocs 

156 

157 def transform_sensorgeo_VNIR_to_SWIR(self, data_vnirsensorgeo: np.ndarray) -> np.ndarray: 

158 """Transform any array in VNIR sensor geometry to SWIR sensor geometry to remove geometric shifts. 

159 

160 :param data_vnirsensorgeo: input array in VNIR sensor geometry 

161 :return: input array resampled to SWIR sensor geometry 

162 """ 

163 return self._transform_sensorgeo(data_vnirsensorgeo, inputgeo='vnir') 

164 

165 def transform_sensorgeo_SWIR_to_VNIR(self, data_swirsensorgeo: np.ndarray) -> np.ndarray: 

166 """Transform any array in SWIR sensor geometry to VNIR sensor geometry to remove geometric shifts. 

167 

168 :param data_swirsensorgeo: input array in SWIR sensor geometry 

169 :return: input array resampled to VNIR sensor geometry 

170 """ 

171 return self._transform_sensorgeo(data_swirsensorgeo, inputgeo='swir') 

172 

173 def _transform_sensorgeo(self, 

174 data2transform: np.ndarray, 

175 inputgeo: str, 

176 band_outputgeo: int = 0 

177 ) -> np.ndarray: 

178 """Transform the input array between VNIR and SWIR sensor geometry. 

179 

180 :param data2transform: input array to be transformed 

181 :param inputgeo: 'vnir' if data2transform is given in VNIR sensor geometry 

182 'swir' if data2transform is given in SWIR sensor geometry 

183 :param band_outputgeo: band index of the band from the sensor geometry should be used to generate the output 

184 """ 

185 # TODO: Avoid the resampling here, maybe by replacing the lon/lat arrays by image coordinates for the source 

186 # geometry and by image coordinate differences for the target geometry. Maybe also the proj string for 

187 # local coordinate systems helps (see SensorMapGeometryTransformer class). 

188 

189 if inputgeo not in ['vnir', 'swir']: 

190 raise ValueError(inputgeo) 

191 

192 src, tgt = (self.vnir_meta, self.swir_meta) if inputgeo == 'vnir' else (self.swir_meta, self.vnir_meta) 

193 

194 # get GeoTransformer 

195 # NOTE: We can only use a 3D geolayer to temporarily transform to map geometry but not back to sensor geometry 

196 # of the other detector. For this, we need to select the band of which the sensor geometry should be used. 

197 def ensure_2d(coord_array): 

198 return coord_array if coord_array.ndim == 2 else coord_array[:, :, band_outputgeo] 

199 

200 if data2transform.ndim == 3 and src['lons'].ndim == 3 and src['lons'].shape[2] == data2transform.shape[2]: 

201 src_lons, src_lats = src['lons'], src['lats'] 

202 tgt_lons, tgt_lats = ensure_2d(tgt['lons']), ensure_2d(tgt['lats']) 

203 else: 

204 src_lons, src_lats = ensure_2d(src['lons']), ensure_2d(src['lats']) 

205 tgt_lons, tgt_lats = ensure_2d(tgt['lons']), ensure_2d(tgt['lats']) 

206 

207 GT_src = Geometry_Transformer(src_lons, src_lats, backend='gdal', resamp_alg=self.resamp_alg, nprocs=self.cpus) 

208 GT_tgt = Geometry_Transformer(tgt_lons, tgt_lats, backend='gdal', resamp_alg=self.resamp_alg, nprocs=self.cpus) 

209 

210 # temporarily transform the input sensor geometry array to map geometry 

211 gA_mapgeo = GeoArray( 

212 *GT_src.to_map_geometry( 

213 data2transform, 

214 tgt_prj=src['prj'], 

215 src_nodata=self.src_nodata, 

216 tgt_nodata=self.tgt_nodata 

217 ), 

218 nodata=self.tgt_nodata 

219 ) 

220 

221 # generate the target sensor geometry array (target lons/lats define the target swath definition 

222 tgt_data_sensorgeo =\ 

223 GT_tgt.to_sensor_geometry( 

224 gA_mapgeo, 

225 src_nodata=gA_mapgeo._nodata, # noqa 

226 tgt_nodata=self.tgt_nodata 

227 ) 

228 

229 return tgt_data_sensorgeo 

230 

231 

232def move_extent_to_coord_grid(extent_utm: Tuple[float, float, float, float], 

233 tgt_xgrid: Sequence[float], 

234 tgt_ygrid: Sequence[float], 

235 ) -> Tuple[float, float, float, float]: 

236 """Move the given coordinate extent to a coordinate grid. 

237 

238 :param extent_utm: xmin, ymin, xmax, ymax coordinates 

239 :param tgt_xgrid: target X coordinate grid, e.g. [0, 30] 

240 :param tgt_ygrid: target Y coordinate grid, e.g. [0, 30] 

241 """ 

242 xmin, ymin, xmax, ymax = extent_utm 

243 tgt_xmin = find_nearest(tgt_xgrid, xmin, roundAlg='off', extrapolate=True) 

244 tgt_xmax = find_nearest(tgt_xgrid, xmax, roundAlg='on', extrapolate=True) 

245 tgt_ymin = find_nearest(tgt_ygrid, ymin, roundAlg='off', extrapolate=True) 

246 tgt_ymax = find_nearest(tgt_ygrid, ymax, roundAlg='on', extrapolate=True) 

247 

248 return tgt_xmin, tgt_ymin, tgt_xmax, tgt_ymax 

249 

250 

251class RPC_Geolayer_Generator(object): 

252 """Class for creating pixel-wise longitude/latitude arrays based on rational polynomial coefficients (RPC).""" 

253 

254 def __init__(self, 

255 rpc_coeffs: dict, 

256 elevation: Union[str, GeoArray, int, float], 

257 enmapIm_cornerCoords: Tuple[Tuple[float, float]], 

258 enmapIm_dims_sensorgeo: Tuple[int, int]): 

259 """Get an instance of RPC_Geolayer_Generator. 

260 

261 :param rpc_coeffs: RPC coefficients for a single EnMAP band 

262 :param elevation: digital elevation model in map geometry (file path or instance of GeoArray) OR 

263 average elevation in meters as integer or float 

264 :param enmapIm_cornerCoords: corner coordinates as tuple of lon/lat pairs 

265 :param enmapIm_dims_sensorgeo: dimensions of the EnMAP image in sensor geometry (rows, columns) 

266 """ 

267 self.row_off = rpc_coeffs['row_off'] 

268 self.col_off = rpc_coeffs['col_off'] 

269 self.lat_off = rpc_coeffs['lat_off'] 

270 self.lon_off = rpc_coeffs['long_off'] 

271 self.height_off = rpc_coeffs['height_off'] 

272 self.row_scale = rpc_coeffs['row_scale'] 

273 self.col_scale = rpc_coeffs['col_scale'] 

274 self.lat_scale = rpc_coeffs['lat_scale'] 

275 self.lon_scale = rpc_coeffs['long_scale'] 

276 self.height_scale = rpc_coeffs['height_scale'] 

277 self.row_num_coeffs = rpc_coeffs['row_num_coeffs'] 

278 self.row_den_coeffs = rpc_coeffs['row_den_coeffs'] 

279 self.col_num_coeffs = rpc_coeffs['col_num_coeffs'] 

280 self.col_den_coeffs = rpc_coeffs['col_den_coeffs'] 

281 

282 self.elevation = elevation if isinstance(elevation, (int, float)) else GeoArray(elevation) 

283 self.enmapIm_cornerCoords = enmapIm_cornerCoords 

284 self.enmapIm_dims_sensorgeo = enmapIm_dims_sensorgeo 

285 

286 def _normalize_map_coordinates(self, 

287 lon: np.ndarray, 

288 lat: np.ndarray, 

289 height: np.ndarray) -> (np.ndarray, np.ndarray, np.ndarray): 

290 """Normalize map coordinates to [-1, +1] to improve numerical precision. 

291 

292 :param lon: longitude array 

293 :param lat: latitude array 

294 :param height: elevation array 

295 """ 

296 if not lon.shape == lat.shape == height.shape: 

297 raise ValueError((lon.shape, lat.shape, height.shape), 

298 'Longitude, latitude and height arrays are expected to have the same dimensions.') 

299 

300 lon_norm = (lon - self.lon_off) / self.lon_scale # longitude 

301 lat_norm = (lat - self.lat_off) / self.lat_scale # latitude 

302 height_norm = (height - self.height_off) / self.height_scale # elevation 

303 

304 msg = 'Coordinate normalization yields significantly out-of-range values for %s. ' \ 

305 'Check the coordinates and RPC coefficients.' 

306 # for llh, name in zip([lon_norm, lat_norm, height_norm], ['longitudes', 'latitudes', 'heights']): 

307 for llh, name in zip([lon_norm, lat_norm, height_norm], ['longitudes', 'latitudes']): 

308 if llh.min() < -1.1 or llh.max() > 1.1: 

309 raise RuntimeError((llh.min(), llh.max()), msg % name) 

310 

311 return lon_norm, lat_norm, height_norm 

312 

313 def _compute_normalized_image_coordinates(self, 

314 lon_norm: np.ndarray, 

315 lat_norm: np.ndarray, 

316 height_norm: np.ndarray) -> (np.ndarray, np.ndarray): 

317 """Compute normalized sensor geometry coordinates for the given lon/lat/height positions. 

318 

319 :param lon_norm: normalized longitudes 

320 :param lat_norm: normalized latitudes 

321 :param height_norm: normalized elevation 

322 :return: 

323 """ 

324 P = lat_norm.flatten() 

325 L = lon_norm.flatten() 

326 H = height_norm.flatten() 

327 

328 u = np.zeros((P.size, 20)) 

329 

330 u_data = (ui for ui in [ 

331 1, 

332 L, 

333 P, 

334 H, 

335 L * P, 

336 L * H, 

337 P * H, 

338 L ** 2, 

339 P ** 2, 

340 H ** 2, 

341 P * L * H, 

342 L ** 3, 

343 L * P ** 2, 

344 L * H ** 2, 

345 L ** 2 * P, 

346 P ** 3, 

347 P * H ** 2, 

348 L ** 2 * H, 

349 P ** 2 * H, 

350 H ** 3 

351 ]) 

352 

353 for i, ud in enumerate(u_data): 

354 u[:, i] = ud 

355 

356 num_row_norm = np.sum(self.row_num_coeffs * u, axis=1) 

357 den_row_norm = np.sum(self.row_den_coeffs * u, axis=1) 

358 num_col_norm = np.sum(self.col_num_coeffs * u, axis=1) 

359 den_col_norm = np.sum(self.col_den_coeffs * u, axis=1) 

360 

361 row_norm = num_row_norm / den_row_norm 

362 col_norm = num_col_norm / den_col_norm 

363 

364 return row_norm, col_norm 

365 

366 def _denormalize_image_coordinates(self, 

367 row_norm: np.ndarray, 

368 col_norm: np.ndarray) -> (np.ndarray, np.ndarray): 

369 """De-normalize normalized sensor geometry coordinates to get valid image coordinates. 

370 

371 :param row_norm: normalized rows 

372 :param col_norm: normalized columns 

373 :return: de-normalized rows array, de-normalized columns array, 

374 """ 

375 rows = row_norm * self.row_scale + self.row_off 

376 cols = col_norm * self.col_scale + self.col_off 

377 

378 return rows, cols 

379 

380 def transform_LonLatHeight_to_RowCol(self, 

381 lon: np.ndarray, 

382 lat: np.ndarray, 

383 height: np.ndarray) -> (np.ndarray, np.ndarray): 

384 """Get sensor geometry image coordinates for the given 3D map coordinate positions using RPC coefficients. 

385 

386 :param lon: longitude array 

387 :param lat: latitude array 

388 :param height: elevation array 

389 :return: rows array, columns array 

390 """ 

391 # TODO add reshaping 

392 

393 lon_norm, lat_norm, height_norm = \ 

394 self._normalize_map_coordinates(lon=lon, lat=lat, height=height) 

395 row_norm, col_norm = \ 

396 self._compute_normalized_image_coordinates(lon_norm=lon_norm, lat_norm=lat_norm, height_norm=height_norm) 

397 rows, cols = \ 

398 self._denormalize_image_coordinates(row_norm=row_norm, col_norm=col_norm) 

399 

400 return rows, cols 

401 

402 @staticmethod 

403 def _fill_nans_at_corners(arr: np.ndarray, along_axis: int = 0) -> np.ndarray: 

404 if not arr.ndim == 2: 

405 raise ValueError(arr.ndim, '2D numpy array expected.') 

406 if along_axis not in [0, 1]: 

407 raise ValueError(along_axis, "The 'axis' parameter must be set to 0 or 1") 

408 arr[0, 0] = np.nan 

409 arr[0, 1] = np.nan 

410 arr[1, 1] = np.nan 

411 nans = np.isnan(arr) 

412 

413 if along_axis == 0: 

414 # extrapolate linearly in top/bottom direction 

415 cols_with_nan = np.arange(arr.shape[1])[np.any(nans, axis=0)] 

416 

417 for col in cols_with_nan: 

418 data = arr[:, col] 

419 idx_goodvals = np.argwhere(~nans[:, col]).flatten() 

420 arr[:, col] = make_interp_spline(idx_goodvals, data[idx_goodvals], k=1)(range(data.size)) 

421 else: 

422 # extrapolate linearly in left/right direction 

423 rows_with_nan = np.arange(arr.shape[0])[np.any(nans, axis=1)] 

424 

425 for row in rows_with_nan: 

426 data = arr[row, :] 

427 idx_goodvals = np.argwhere(~nans[row, :]).flatten() 

428 arr[row, :] = make_interp_spline(idx_goodvals, data[idx_goodvals], k=1)(range(data.size)) 

429 

430 return arr 

431 

432 def compute_geolayer(self) -> (np.ndarray, np.ndarray): 

433 """Compute pixel-wise lon/lat arrays based on RPC coefficients, corner coordinates and image dimensions. 

434 

435 :return: (2D longitude array, 2D latitude array) 

436 """ 

437 # transform corner coordinates of EnMAP image to UTM 

438 grid_utm_epsg = get_UTMEPSG_from_LonLat(*get_center_coord(self.enmapIm_cornerCoords)) 

439 cornerCoordsUTM = np.array([transform_any_prj(4326, grid_utm_epsg, lon, lat) 

440 for lon, lat in self.enmapIm_cornerCoords]) 

441 xmin, xmax = cornerCoordsUTM[:, 0].min(), cornerCoordsUTM[:, 0].max() 

442 ymin, ymax = cornerCoordsUTM[:, 1].min(), cornerCoordsUTM[:, 1].max() 

443 

444 # get UTM bounding box and move it to the EnMAP grid 

445 xmin, ymin, xmax, ymax = \ 

446 move_extent_to_coord_grid((xmin, ymin, xmax, ymax), 

447 enmap_coordinate_grid_utm['x'], enmap_coordinate_grid_utm['y']) 

448 

449 # set up a regular grid of UTM points with a specific mesh width 

450 meshwidth = 300 # 10 EnMAP pixels 

451 y_grid_utm, x_grid_utm = np.meshgrid(np.arange(ymax, ymin - meshwidth, -meshwidth), 

452 np.arange(xmin, xmax + meshwidth, meshwidth), 

453 indexing='ij') 

454 

455 if not isinstance(self.elevation, (int, float)): 

456 # transform UTM grid to DEM projection 

457 x_grid_demPrj, y_grid_demPrj = \ 

458 (x_grid_utm, y_grid_utm) if prj_equal(grid_utm_epsg, self.elevation.epsg) else \ 

459 transform_coordArray(CRS(grid_utm_epsg).to_wkt(), 

460 CRS(self.elevation.epsg).to_wkt(), 

461 x_grid_utm, y_grid_utm) 

462 

463 # retrieve corresponding heights from DEM 

464 # -> resample DEM to EnMAP grid? 

465 xy_pairs_demPrj = np.vstack([x_grid_demPrj.flatten(), y_grid_demPrj.flatten()]).T 

466 heights = self.elevation.read_pointData(xy_pairs_demPrj).flatten() 

467 else: 

468 heights = np.full_like(x_grid_utm.flatten(), self.elevation) 

469 

470 # transform UTM points to lon/lat 

471 lon_grid, lat_grid = \ 

472 transform_coordArray(CRS(grid_utm_epsg).to_wkt(), CRS(4326).to_wkt(), x_grid_utm, y_grid_utm) 

473 lons = lon_grid.flatten() 

474 lats = lat_grid.flatten() 

475 

476 # compute floating point EnMAP image coordinates for the selected UTM points 

477 rows, cols = self.transform_LonLatHeight_to_RowCol(lon=lons, lat=lats, height=heights) 

478 

479 # interpolate lon/lats to get lon/lat coordinates integer image coordinates of EnMAP image 

480 rows_im, cols_im = self.enmapIm_dims_sensorgeo 

481 out_rows_grid, out_cols_grid = np.meshgrid(range(rows_im), range(cols_im), indexing='ij') 

482 out_xy_pairs = np.vstack([out_cols_grid.flatten(), out_rows_grid.flatten()]).T 

483 in_xy_pairs = np.array((cols, rows)).T 

484 

485 # Compute the triangulation (that takes time and can be computed for all values to be interpolated at once), 

486 # then run the interpolation 

487 triangulation = Delaunay(in_xy_pairs) 

488 lons_interp = LinearNDInterpolator(triangulation, lons)(out_xy_pairs).reshape(*out_rows_grid.shape) 

489 lats_interp = LinearNDInterpolator(triangulation, lats)(out_xy_pairs).reshape(*out_rows_grid.shape) 

490 

491 # lons_interp / lats_interp may contain NaN values in case xmin, ymin, xmax, ymax has been set too small 

492 # to account for RPC transformation errors 

493 # => fix that by extrapolation at NaN value positions 

494 # FIXME: can this be avoided by modified xmin/ymin/xmy/ymax coords? 

495 

496 lons_interp = self._fill_nans_at_corners(lons_interp, along_axis=0) # extrapolate in left/right direction 

497 lats_interp = self._fill_nans_at_corners(lats_interp, along_axis=1) 

498 

499 # return a geolayer in the exact dimensions like the EnMAP detector array 

500 return lons_interp, lats_interp 

501 

502 def __call__(self, *args, **kwargs): 

503 return self.compute_geolayer() 

504 

505 

506global_dem_sensorgeo: Optional[GeoArray] = None 

507 

508 

509def _initialize_mp(elevation: Union[float, np.ndarray]): 

510 """Declare global variables needed for RPC_3D_Geolayer_Generator._compute_geolayer_for_unique_coeffgroup(). 

511 

512 :param elevation: elevation - either as average value (float) or as a numpy array 

513 """ 

514 global global_dem_sensorgeo 

515 global_dem_sensorgeo = elevation 

516 

517 

518class RPC_3D_Geolayer_Generator(object): 

519 """Class for creating band- AND pixel-wise longitude/latitude arrays based on rational polynomial coeff. (RPC).""" 

520 

521 def __init__(self, 

522 rpc_coeffs_per_band: dict, 

523 elevation: Union[str, GeoArray, int, float], 

524 enmapIm_cornerCoords: Tuple[Tuple[float, float]], 

525 enmapIm_dims_sensorgeo: Tuple[int, int], 

526 CPUs: int = None): 

527 """Get an instance of RPC_3D_Geolayer_Generator. 

528 

529 :param rpc_coeffs_per_band: dictionary of RPC coefficients for each EnMAP band 

530 ({'band_1': <rpc_coeffs_dict>, 

531 'band_2': <rpc_coeffs_dict>, 

532 ...}) 

533 :param elevation: digital elevation model in MAP geometry (file path or instance of GeoArray) OR 

534 average elevation in meters as integer or float 

535 :param enmapIm_cornerCoords: corner coordinates as tuple of lon/lat pairs 

536 :param enmapIm_dims_sensorgeo: dimensions of the EnMAP image in sensor geometry (rows, colunms) 

537 :param CPUs: number of CPUs to use 

538 """ 

539 self.rpc_coeffs_per_band = OrderedDict(natsorted(rpc_coeffs_per_band.items())) 

540 self.elevation = elevation 

541 self.enmapIm_cornerCoords = enmapIm_cornerCoords 

542 self.enmapIm_dims_sensorgeo = enmapIm_dims_sensorgeo 

543 self.CPUs = CPUs or cpu_count() 

544 

545 if not isinstance(elevation, (int, float)): 

546 # get validated DEM in map geometry 

547 # self.logger.debug('Verifying DEM...') # TODO 

548 from ..dem_preprocessing import DEM_Processor 

549 self.elevation = DEM_Processor(dem_path_geoarray=elevation, 

550 enmapIm_cornerCoords=enmapIm_cornerCoords).dem 

551 # TODO clip DEM to needed area 

552 self.elevation.to_mem() 

553 # self.elevation.reproject_to_new_grid() 

554 

555 self.bandgroups_with_unique_rpc_coeffs = self._get_bandgroups_with_unique_rpc_coeffs() 

556 

557 def _get_bandgroups_with_unique_rpc_coeffs(self) -> List[List]: 

558 # combine RPC coefficients of all bands in a single numpy array 

559 band_inds = list(range(len(self.rpc_coeffs_per_band))) 

560 coeffs_first_band = list(self.rpc_coeffs_per_band.values())[0] 

561 keys_float = [k for k in coeffs_first_band 

562 if isinstance(coeffs_first_band[k], float)] 

563 keys_npa = [k for k in coeffs_first_band 

564 if isinstance(coeffs_first_band[k], np.ndarray)] 

565 

566 coeffs_allbands = None 

567 for i, coeffdict in enumerate(self.rpc_coeffs_per_band.values()): 

568 coeffs_curband = np.hstack([[coeffdict[k] for k in keys_float], 

569 *(coeffdict[k] for k in keys_npa)]) 

570 

571 if coeffs_allbands is None: 

572 coeffs_allbands = np.zeros((len(band_inds), 

573 1 + len(coeffs_curband))) 

574 coeffs_allbands[:, 0] = band_inds 

575 

576 coeffs_allbands[i, 1:] = coeffs_curband 

577 

578 # get groups of band indices where bands have the same RPC coefficients 

579 groups = npi.group_by(coeffs_allbands[:, 1:]).split(coeffs_allbands[:, 0]) 

580 groups_bandinds = [group.astype(int).tolist() for group in groups] 

581 

582 return groups_bandinds 

583 

584 @staticmethod 

585 def _compute_geolayer_for_unique_coeffgroup(kwargs): 

586 lons, lats = \ 

587 RPC_Geolayer_Generator(rpc_coeffs=kwargs['rpc_coeffs'], 

588 elevation=global_dem_sensorgeo, 

589 enmapIm_cornerCoords=kwargs['enmapIm_cornerCoords'], 

590 enmapIm_dims_sensorgeo=kwargs['enmapIm_dims_sensorgeo'] 

591 ).compute_geolayer() 

592 

593 return lons, lats, kwargs['group_idx'] 

594 

595 def compute_geolayer(self): 

596 rows, cols = self.enmapIm_dims_sensorgeo 

597 bands = len(self.rpc_coeffs_per_band) 

598 lons = np.empty((rows, cols, bands), dtype=float) 

599 lats = np.empty((rows, cols, bands), dtype=float) 

600 

601 rpc_coeffs_list = list(self.rpc_coeffs_per_band.values()) 

602 

603 # get kwargs for each group of unique RPC coefficients 

604 kwargs_list = [dict(rpc_coeffs=rpc_coeffs_list[group_bandinds[0]], 

605 enmapIm_cornerCoords=self.enmapIm_cornerCoords, 

606 enmapIm_dims_sensorgeo=self.enmapIm_dims_sensorgeo, 

607 group_idx=gi) 

608 for gi, group_bandinds in enumerate(self.bandgroups_with_unique_rpc_coeffs)] 

609 

610 # compute the geolayer ONLY FOR ONE BAND per group with unique RPC coefficients 

611 global global_dem_sensorgeo 

612 global_dem_sensorgeo = self.elevation 

613 

614 if len(self.bandgroups_with_unique_rpc_coeffs) == 1: 

615 lons_oneband, lats_oneband = self._compute_geolayer_for_unique_coeffgroup(kwargs_list[0])[:2] 

616 

617 lons = np.repeat(lons_oneband[:, :, np.newaxis], bands, axis=2) 

618 lats = np.repeat(lats_oneband[:, :, np.newaxis], bands, axis=2) 

619 

620 else: 

621 if self.CPUs > 1: 

622 # multiprocessing (only in case there are multiple unique sets of RPC coefficients) 

623 

624 # FIXME: pickling back large lon/lat arrays to the main process may be an issue on small machines 

625 # -> results could be temporarily written to disk in that case 

626 # NOTE: With the small test dataset pickling has only a small effect on processing time. 

627 with Pool(self.CPUs, initializer=_initialize_mp, initargs=[self.elevation]) as pool: 

628 results = list(pool.imap_unordered(self._compute_geolayer_for_unique_coeffgroup, kwargs_list)) 

629 pool.close() # needed for coverage to work in multiprocessing 

630 pool.join() 

631 

632 else: 

633 # singleprocessing 

634 results = [self._compute_geolayer_for_unique_coeffgroup(kwargs_list[gi]) 

635 for gi, group_bandinds in enumerate(self.bandgroups_with_unique_rpc_coeffs)] 

636 

637 for res in results: 

638 band_lons, band_lats, group_idx = res 

639 bandinds_to_assign = self.bandgroups_with_unique_rpc_coeffs[group_idx] 

640 nbands_to_assign = len(bandinds_to_assign) 

641 

642 lons[:, :, bandinds_to_assign] = np.repeat(band_lons[:, :, np.newaxis], nbands_to_assign, axis=2) 

643 lats[:, :, bandinds_to_assign] = np.repeat(band_lats[:, :, np.newaxis], nbands_to_assign, axis=2) 

644 

645 return lons, lats 

646 

647 

648def compute_mapCoords_within_sensorGeoDims(sensorgeoCoords_YX: List[Tuple[float, float]], 

649 rpc_coeffs: dict, 

650 elevation: Union[str, GeoArray, int, float], 

651 enmapIm_cornerCoords: Tuple[Tuple[float, float]], 

652 enmapIm_dims_sensorgeo: Tuple[int, int], 

653 ) -> List[Tuple[float, float]]: 

654 """Compute map coordinates for a given image coordinate-pair of an EnMAP image in sensor geometry. 

655 

656 :param sensorgeoCoords_YX: list of requested sensor geometry positions [(row, column), (row, column), ...] 

657 :param rpc_coeffs: RPC coefficients describing the relation between sensor and map geometry 

658 :param elevation: digital elevation model in MAP geometry (file path or instance of GeoArray) OR 

659 average elevation in meters as integer or float 

660 :param enmapIm_cornerCoords: MAP coordinates of the EnMAP image 

661 :param enmapIm_dims_sensorgeo: dimensions of the sensor geometry EnMAP image (rows, columns) 

662 :return: 

663 """ 

664 # compute coordinate array 

665 RPCGG = RPC_Geolayer_Generator(rpc_coeffs=rpc_coeffs, 

666 elevation=elevation, 

667 enmapIm_cornerCoords=enmapIm_cornerCoords, # order does not matter 

668 enmapIm_dims_sensorgeo=enmapIm_dims_sensorgeo) 

669 lons, lats = RPCGG.compute_geolayer() 

670 

671 # extract the new corner coordinate from the coordinate arrays computed via RPCs 

672 rows, cols = enmapIm_dims_sensorgeo 

673 

674 ul, ur, ll, lr = enmapIm_cornerCoords 

675 

676 lonlats = [] 

677 for imYX in sensorgeoCoords_YX: 

678 lonlat = \ 

679 ul if imYX == (0, 0) else \ 

680 ur if imYX == (0, cols - 1) else \ 

681 ll if imYX == (rows - 1, 0) else \ 

682 lr if imYX == (rows - 1, cols - 1) else \ 

683 (lons[imYX], lats[imYX]) 

684 

685 lonlats.append(lonlat) 

686 

687 return lonlats 

688 

689 

690def get_UTMEPSG_from_LonLat(lon: float, lat: float) -> int: 

691 zoneNr = int(1 + (lon + 180.0) / 6.0) 

692 isNorth = lat >= 0 

693 

694 return int('326' + str(zoneNr)) if isNorth else int('327' + str(zoneNr)) 

695 

696 

697def get_center_coord(cornerCoordsXY): 

698 # FIXME center coord is not equal to center of bounding box 

699 cornerCoordsXY = np.array(cornerCoordsXY) 

700 x_center = float(np.mean([cornerCoordsXY[:, 0].min(), cornerCoordsXY[:, 0].max()])) 

701 y_center = float(np.mean([cornerCoordsXY[:, 1].min(), cornerCoordsXY[:, 1].max()])) 

702 

703 return x_center, y_center 

704 

705 

706def get_UTMEPSG_from_LonLat_cornersXY(lons: List[float], lats: List[float]): 

707 return get_UTMEPSG_from_LonLat(*get_center_coord(list(zip(lons, lats))))