Coverage for enpt/processors/spatial_transform/spatial_transform.py: 94%
250 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-07 11:39 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-07 11:39 +0000
1# -*- coding: utf-8 -*-
3# EnPT, EnMAP Processing Tool - A Python package for pre-processing of EnMAP Level-1B data
4#
5# Copyright (C) 2018-2024 Karl Segl (GFZ Potsdam, segl@gfz-potsdam.de), Daniel Scheffler
6# (GFZ Potsdam, danschef@gfz-potsdam.de), Niklas Bohn (GFZ Potsdam, nbohn@gfz-potsdam.de),
7# Stéphane Guillaso (GFZ Potsdam, stephane.guillaso@gfz-potsdam.de)
8#
9# This software was developed within the context of the EnMAP project supported
10# by the DLR Space Administration with funds of the German Federal Ministry of
11# Economic Affairs and Energy (on the basis of a decision by the German Bundestag:
12# 50 EE 1529) and contributions from DLR, GFZ and OHB System AG.
13#
14# This program is free software: you can redistribute it and/or modify it under
15# the terms of the GNU General Public License as published by the Free Software
16# Foundation, either version 3 of the License, or (at your option) any later
17# version. Please note the following exception: `EnPT` depends on tqdm, which
18# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
19# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
20# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
21#
22# This program is distributed in the hope that it will be useful, but WITHOUT
23# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
24# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
25# details.
26#
27# You should have received a copy of the GNU Lesser General Public License along
28# with this program. If not, see <https://www.gnu.org/licenses/>.
30"""EnPT module 'spatial transform', containing everything related to spatial transformations."""
31from typing import Union, Tuple, List, Optional, Sequence # noqa: F401
32from multiprocessing import Pool, cpu_count
33from collections import OrderedDict
34import numpy as np
35from scipy.interpolate import LinearNDInterpolator, make_interp_spline
36from scipy.spatial import Delaunay
37from geoarray import GeoArray
38from natsort import natsorted
39import numpy_indexed as npi
40from pyproj import CRS
42from sensormapgeo import Transformer
43from sensormapgeo.pyresample_backend.transformer_2d import AreaDefinition
44from py_tools_ds.geo.projection import prj_equal
45from py_tools_ds.geo.coord_grid import find_nearest
46from py_tools_ds.geo.coord_trafo import transform_any_prj, transform_coordArray
48from ...options.config import enmap_coordinate_grid_utm
50__author__ = 'Daniel Scheffler'
53class Geometry_Transformer(Transformer):
54 # use Sentinel-2 grid (30m grid with origin at 0/0)
55 # EnMAP geolayer contains pixel center coordinate
57 def to_sensor_geometry(self,
58 path_or_geoarray_mapgeo: Union[str, GeoArray],
59 src_gt: Tuple[float, float, float, float, float, float] = None,
60 src_prj: Union[str, int] = None,
61 src_nodata: int = None,
62 tgt_nodata: int = None
63 ) -> np.ndarray:
64 data_mapgeo = GeoArray(path_or_geoarray_mapgeo)
66 if not data_mapgeo.is_map_geo:
67 raise RuntimeError('The dataset to be transformed into sensor geometry already represents sensor geometry.')
69 return super().to_sensor_geometry(
70 data_mapgeo[:],
71 src_gt=src_gt or data_mapgeo.gt,
72 src_prj=src_prj or data_mapgeo.prj,
73 src_nodata=src_nodata,
74 tgt_nodata=tgt_nodata
75 )
77 def to_map_geometry(self,
78 path_or_geoarray_sensorgeo: Union[str, GeoArray, np.ndarray],
79 tgt_prj: Union[str, int],
80 tgt_extent: Tuple[float, float, float, float] = None,
81 tgt_res: Tuple = None,
82 tgt_coordgrid: Tuple[Tuple, Tuple] = None,
83 src_nodata: int = None,
84 tgt_nodata: int = None,
85 area_definition: AreaDefinition = None
86 ) -> Tuple[np.ndarray, tuple, str]:
87 data_sensorgeo = GeoArray(path_or_geoarray_sensorgeo)
89 if data_sensorgeo.is_map_geo and not data_sensorgeo.is_rotated:
90 raise RuntimeError('The dataset to be transformed into map geometry already represents map geometry.')
92 # run transformation (output extent/area definition etc. is internally computed from the geolayers if not given)
93 out_data, out_gt, out_prj = \
94 super().to_map_geometry(
95 data_sensorgeo[:],
96 tgt_prj=tgt_prj,
97 tgt_extent=tgt_extent,
98 tgt_res=tgt_res,
99 tgt_coordgrid=tgt_coordgrid,
100 src_nodata=src_nodata,
101 tgt_nodata=tgt_nodata,
102 area_definition=area_definition
103 )
105 return out_data, out_gt, out_prj
108class VNIR_SWIR_SensorGeometryTransformer(object):
109 """Class to transform between EnMAP VNIR and SWIR sensor geometry."""
111 def __init__(self,
112 lons_vnir: np.ndarray,
113 lats_vnir: np.ndarray,
114 lons_swir: np.ndarray,
115 lats_swir: np.ndarray,
116 prj_vnir: Union[str, int],
117 prj_swir: Union[str, int],
118 res_vnir: Tuple[float, float],
119 res_swir: Tuple[float, float],
120 resamp_alg: str = 'nearest',
121 src_nodata: int = None,
122 tgt_nodata: int = None,
123 nprocs: int = cpu_count()) -> None:
124 """Get an instance of VNIR_SWIR_SensorGeometryTransformer.
126 :param lons_vnir: VNIR longitude array corresponding to the sensor geometry arrays passed later (2D or 3D)
127 :param lats_vnir: VNIR latitude array corresponding to the sensor geometry arrays passed later (2D or 3D)
128 :param lons_swir: SWIR longitude array corresponding to the sensor geometry arrays passed later (2D or 3D)
129 :param lats_swir: SWIR latitude array corresponding to the sensor geometry arrays passed later (2D or 3D)
130 :param prj_vnir: projection of the VNIR if it would be transformed to map geometry (WKT string or EPSG code)
131 :param prj_swir: projection of the SWIR if it would be transformed to map geometry (WKT string or EPSG code)
132 :param res_vnir: pixel dimensions of the VNIR if it would be transformed to map geometry (X, Y)
133 :param res_swir: pixel dimensions of the SWIR if it would be transformed to map geometry (X, Y)
134 :param resamp_alg: resampling algorithm ('nearest', 'near', 'bilinear', 'cubic', 'cubic_spline',
135 'lanczos', 'average', 'mode', 'max', 'min', 'med', 'q1', 'q3')
136 :param src_nodata: nodata value of the input array
137 :param tgt_nodata: pixel value to be used for undetermined pixels in the output
138 :param nprocs: number of CPU cores to use
139 """
140 self.vnir_meta = dict(
141 lons=lons_vnir,
142 lats=lats_vnir,
143 prj=prj_vnir,
144 res=res_vnir
145 )
146 self.swir_meta = dict(
147 lons=lons_swir,
148 lats=lats_swir,
149 prj=prj_swir,
150 res=res_swir
151 )
152 self.resamp_alg = resamp_alg
153 self.src_nodata = src_nodata
154 self.tgt_nodata = tgt_nodata
155 self.cpus = nprocs
157 def transform_sensorgeo_VNIR_to_SWIR(self, data_vnirsensorgeo: np.ndarray) -> np.ndarray:
158 """Transform any array in VNIR sensor geometry to SWIR sensor geometry to remove geometric shifts.
160 :param data_vnirsensorgeo: input array in VNIR sensor geometry
161 :return: input array resampled to SWIR sensor geometry
162 """
163 return self._transform_sensorgeo(data_vnirsensorgeo, inputgeo='vnir')
165 def transform_sensorgeo_SWIR_to_VNIR(self, data_swirsensorgeo: np.ndarray) -> np.ndarray:
166 """Transform any array in SWIR sensor geometry to VNIR sensor geometry to remove geometric shifts.
168 :param data_swirsensorgeo: input array in SWIR sensor geometry
169 :return: input array resampled to VNIR sensor geometry
170 """
171 return self._transform_sensorgeo(data_swirsensorgeo, inputgeo='swir')
173 def _transform_sensorgeo(self,
174 data2transform: np.ndarray,
175 inputgeo: str,
176 band_outputgeo: int = 0
177 ) -> np.ndarray:
178 """Transform the input array between VNIR and SWIR sensor geometry.
180 :param data2transform: input array to be transformed
181 :param inputgeo: 'vnir' if data2transform is given in VNIR sensor geometry
182 'swir' if data2transform is given in SWIR sensor geometry
183 :param band_outputgeo: band index of the band from the sensor geometry should be used to generate the output
184 """
185 # TODO: Avoid the resampling here, maybe by replacing the lon/lat arrays by image coordinates for the source
186 # geometry and by image coordinate differences for the target geometry. Maybe also the proj string for
187 # local coordinate systems helps (see SensorMapGeometryTransformer class).
189 if inputgeo not in ['vnir', 'swir']:
190 raise ValueError(inputgeo)
192 src, tgt = (self.vnir_meta, self.swir_meta) if inputgeo == 'vnir' else (self.swir_meta, self.vnir_meta)
194 # get GeoTransformer
195 # NOTE: We can only use a 3D geolayer to temporarily transform to map geometry but not back to sensor geometry
196 # of the other detector. For this, we need to select the band of which the sensor geometry should be used.
197 def ensure_2d(coord_array):
198 return coord_array if coord_array.ndim == 2 else coord_array[:, :, band_outputgeo]
200 if data2transform.ndim == 3 and src['lons'].ndim == 3 and src['lons'].shape[2] == data2transform.shape[2]:
201 src_lons, src_lats = src['lons'], src['lats']
202 tgt_lons, tgt_lats = ensure_2d(tgt['lons']), ensure_2d(tgt['lats'])
203 else:
204 src_lons, src_lats = ensure_2d(src['lons']), ensure_2d(src['lats'])
205 tgt_lons, tgt_lats = ensure_2d(tgt['lons']), ensure_2d(tgt['lats'])
207 GT_src = Geometry_Transformer(src_lons, src_lats, backend='gdal', resamp_alg=self.resamp_alg, nprocs=self.cpus)
208 GT_tgt = Geometry_Transformer(tgt_lons, tgt_lats, backend='gdal', resamp_alg=self.resamp_alg, nprocs=self.cpus)
210 # temporarily transform the input sensor geometry array to map geometry
211 gA_mapgeo = GeoArray(
212 *GT_src.to_map_geometry(
213 data2transform,
214 tgt_prj=src['prj'],
215 src_nodata=self.src_nodata,
216 tgt_nodata=self.tgt_nodata
217 ),
218 nodata=self.tgt_nodata
219 )
221 # generate the target sensor geometry array (target lons/lats define the target swath definition
222 tgt_data_sensorgeo =\
223 GT_tgt.to_sensor_geometry(
224 gA_mapgeo,
225 src_nodata=gA_mapgeo._nodata, # noqa
226 tgt_nodata=self.tgt_nodata
227 )
229 return tgt_data_sensorgeo
232def move_extent_to_coord_grid(extent_utm: Tuple[float, float, float, float],
233 tgt_xgrid: Sequence[float],
234 tgt_ygrid: Sequence[float],
235 ) -> Tuple[float, float, float, float]:
236 """Move the given coordinate extent to a coordinate grid.
238 :param extent_utm: xmin, ymin, xmax, ymax coordinates
239 :param tgt_xgrid: target X coordinate grid, e.g. [0, 30]
240 :param tgt_ygrid: target Y coordinate grid, e.g. [0, 30]
241 """
242 xmin, ymin, xmax, ymax = extent_utm
243 tgt_xmin = find_nearest(tgt_xgrid, xmin, roundAlg='off', extrapolate=True)
244 tgt_xmax = find_nearest(tgt_xgrid, xmax, roundAlg='on', extrapolate=True)
245 tgt_ymin = find_nearest(tgt_ygrid, ymin, roundAlg='off', extrapolate=True)
246 tgt_ymax = find_nearest(tgt_ygrid, ymax, roundAlg='on', extrapolate=True)
248 return tgt_xmin, tgt_ymin, tgt_xmax, tgt_ymax
251class RPC_Geolayer_Generator(object):
252 """Class for creating pixel-wise longitude/latitude arrays based on rational polynomial coefficients (RPC)."""
254 def __init__(self,
255 rpc_coeffs: dict,
256 elevation: Union[str, GeoArray, int, float],
257 enmapIm_cornerCoords: Tuple[Tuple[float, float]],
258 enmapIm_dims_sensorgeo: Tuple[int, int]):
259 """Get an instance of RPC_Geolayer_Generator.
261 :param rpc_coeffs: RPC coefficients for a single EnMAP band
262 :param elevation: digital elevation model in map geometry (file path or instance of GeoArray) OR
263 average elevation in meters as integer or float
264 :param enmapIm_cornerCoords: corner coordinates as tuple of lon/lat pairs
265 :param enmapIm_dims_sensorgeo: dimensions of the EnMAP image in sensor geometry (rows, columns)
266 """
267 self.row_off = rpc_coeffs['row_off']
268 self.col_off = rpc_coeffs['col_off']
269 self.lat_off = rpc_coeffs['lat_off']
270 self.lon_off = rpc_coeffs['long_off']
271 self.height_off = rpc_coeffs['height_off']
272 self.row_scale = rpc_coeffs['row_scale']
273 self.col_scale = rpc_coeffs['col_scale']
274 self.lat_scale = rpc_coeffs['lat_scale']
275 self.lon_scale = rpc_coeffs['long_scale']
276 self.height_scale = rpc_coeffs['height_scale']
277 self.row_num_coeffs = rpc_coeffs['row_num_coeffs']
278 self.row_den_coeffs = rpc_coeffs['row_den_coeffs']
279 self.col_num_coeffs = rpc_coeffs['col_num_coeffs']
280 self.col_den_coeffs = rpc_coeffs['col_den_coeffs']
282 self.elevation = elevation if isinstance(elevation, (int, float)) else GeoArray(elevation)
283 self.enmapIm_cornerCoords = enmapIm_cornerCoords
284 self.enmapIm_dims_sensorgeo = enmapIm_dims_sensorgeo
286 def _normalize_map_coordinates(self,
287 lon: np.ndarray,
288 lat: np.ndarray,
289 height: np.ndarray) -> (np.ndarray, np.ndarray, np.ndarray):
290 """Normalize map coordinates to [-1, +1] to improve numerical precision.
292 :param lon: longitude array
293 :param lat: latitude array
294 :param height: elevation array
295 """
296 if not lon.shape == lat.shape == height.shape:
297 raise ValueError((lon.shape, lat.shape, height.shape),
298 'Longitude, latitude and height arrays are expected to have the same dimensions.')
300 lon_norm = (lon - self.lon_off) / self.lon_scale # longitude
301 lat_norm = (lat - self.lat_off) / self.lat_scale # latitude
302 height_norm = (height - self.height_off) / self.height_scale # elevation
304 msg = 'Coordinate normalization yields significantly out-of-range values for %s. ' \
305 'Check the coordinates and RPC coefficients.'
306 # for llh, name in zip([lon_norm, lat_norm, height_norm], ['longitudes', 'latitudes', 'heights']):
307 for llh, name in zip([lon_norm, lat_norm, height_norm], ['longitudes', 'latitudes']):
308 if llh.min() < -1.1 or llh.max() > 1.1:
309 raise RuntimeError((llh.min(), llh.max()), msg % name)
311 return lon_norm, lat_norm, height_norm
313 def _compute_normalized_image_coordinates(self,
314 lon_norm: np.ndarray,
315 lat_norm: np.ndarray,
316 height_norm: np.ndarray) -> (np.ndarray, np.ndarray):
317 """Compute normalized sensor geometry coordinates for the given lon/lat/height positions.
319 :param lon_norm: normalized longitudes
320 :param lat_norm: normalized latitudes
321 :param height_norm: normalized elevation
322 :return:
323 """
324 P = lat_norm.flatten()
325 L = lon_norm.flatten()
326 H = height_norm.flatten()
328 u = np.zeros((P.size, 20))
330 u_data = (ui for ui in [
331 1,
332 L,
333 P,
334 H,
335 L * P,
336 L * H,
337 P * H,
338 L ** 2,
339 P ** 2,
340 H ** 2,
341 P * L * H,
342 L ** 3,
343 L * P ** 2,
344 L * H ** 2,
345 L ** 2 * P,
346 P ** 3,
347 P * H ** 2,
348 L ** 2 * H,
349 P ** 2 * H,
350 H ** 3
351 ])
353 for i, ud in enumerate(u_data):
354 u[:, i] = ud
356 num_row_norm = np.sum(self.row_num_coeffs * u, axis=1)
357 den_row_norm = np.sum(self.row_den_coeffs * u, axis=1)
358 num_col_norm = np.sum(self.col_num_coeffs * u, axis=1)
359 den_col_norm = np.sum(self.col_den_coeffs * u, axis=1)
361 row_norm = num_row_norm / den_row_norm
362 col_norm = num_col_norm / den_col_norm
364 return row_norm, col_norm
366 def _denormalize_image_coordinates(self,
367 row_norm: np.ndarray,
368 col_norm: np.ndarray) -> (np.ndarray, np.ndarray):
369 """De-normalize normalized sensor geometry coordinates to get valid image coordinates.
371 :param row_norm: normalized rows
372 :param col_norm: normalized columns
373 :return: de-normalized rows array, de-normalized columns array,
374 """
375 rows = row_norm * self.row_scale + self.row_off
376 cols = col_norm * self.col_scale + self.col_off
378 return rows, cols
380 def transform_LonLatHeight_to_RowCol(self,
381 lon: np.ndarray,
382 lat: np.ndarray,
383 height: np.ndarray) -> (np.ndarray, np.ndarray):
384 """Get sensor geometry image coordinates for the given 3D map coordinate positions using RPC coefficients.
386 :param lon: longitude array
387 :param lat: latitude array
388 :param height: elevation array
389 :return: rows array, columns array
390 """
391 # TODO add reshaping
393 lon_norm, lat_norm, height_norm = \
394 self._normalize_map_coordinates(lon=lon, lat=lat, height=height)
395 row_norm, col_norm = \
396 self._compute_normalized_image_coordinates(lon_norm=lon_norm, lat_norm=lat_norm, height_norm=height_norm)
397 rows, cols = \
398 self._denormalize_image_coordinates(row_norm=row_norm, col_norm=col_norm)
400 return rows, cols
402 @staticmethod
403 def _fill_nans_at_corners(arr: np.ndarray, along_axis: int = 0) -> np.ndarray:
404 if not arr.ndim == 2:
405 raise ValueError(arr.ndim, '2D numpy array expected.')
406 if along_axis not in [0, 1]:
407 raise ValueError(along_axis, "The 'axis' parameter must be set to 0 or 1")
408 arr[0, 0] = np.nan
409 arr[0, 1] = np.nan
410 arr[1, 1] = np.nan
411 nans = np.isnan(arr)
413 if along_axis == 0:
414 # extrapolate linearly in top/bottom direction
415 cols_with_nan = np.arange(arr.shape[1])[np.any(nans, axis=0)]
417 for col in cols_with_nan:
418 data = arr[:, col]
419 idx_goodvals = np.argwhere(~nans[:, col]).flatten()
420 arr[:, col] = make_interp_spline(idx_goodvals, data[idx_goodvals], k=1)(range(data.size))
421 else:
422 # extrapolate linearly in left/right direction
423 rows_with_nan = np.arange(arr.shape[0])[np.any(nans, axis=1)]
425 for row in rows_with_nan:
426 data = arr[row, :]
427 idx_goodvals = np.argwhere(~nans[row, :]).flatten()
428 arr[row, :] = make_interp_spline(idx_goodvals, data[idx_goodvals], k=1)(range(data.size))
430 return arr
432 def compute_geolayer(self) -> (np.ndarray, np.ndarray):
433 """Compute pixel-wise lon/lat arrays based on RPC coefficients, corner coordinates and image dimensions.
435 :return: (2D longitude array, 2D latitude array)
436 """
437 # transform corner coordinates of EnMAP image to UTM
438 grid_utm_epsg = get_UTMEPSG_from_LonLat(*get_center_coord(self.enmapIm_cornerCoords))
439 cornerCoordsUTM = np.array([transform_any_prj(4326, grid_utm_epsg, lon, lat)
440 for lon, lat in self.enmapIm_cornerCoords])
441 xmin, xmax = cornerCoordsUTM[:, 0].min(), cornerCoordsUTM[:, 0].max()
442 ymin, ymax = cornerCoordsUTM[:, 1].min(), cornerCoordsUTM[:, 1].max()
444 # get UTM bounding box and move it to the EnMAP grid
445 xmin, ymin, xmax, ymax = \
446 move_extent_to_coord_grid((xmin, ymin, xmax, ymax),
447 enmap_coordinate_grid_utm['x'], enmap_coordinate_grid_utm['y'])
449 # set up a regular grid of UTM points with a specific mesh width
450 meshwidth = 300 # 10 EnMAP pixels
451 y_grid_utm, x_grid_utm = np.meshgrid(np.arange(ymax, ymin - meshwidth, -meshwidth),
452 np.arange(xmin, xmax + meshwidth, meshwidth),
453 indexing='ij')
455 if not isinstance(self.elevation, (int, float)):
456 # transform UTM grid to DEM projection
457 x_grid_demPrj, y_grid_demPrj = \
458 (x_grid_utm, y_grid_utm) if prj_equal(grid_utm_epsg, self.elevation.epsg) else \
459 transform_coordArray(CRS(grid_utm_epsg).to_wkt(),
460 CRS(self.elevation.epsg).to_wkt(),
461 x_grid_utm, y_grid_utm)
463 # retrieve corresponding heights from DEM
464 # -> resample DEM to EnMAP grid?
465 xy_pairs_demPrj = np.vstack([x_grid_demPrj.flatten(), y_grid_demPrj.flatten()]).T
466 heights = self.elevation.read_pointData(xy_pairs_demPrj).flatten()
467 else:
468 heights = np.full_like(x_grid_utm.flatten(), self.elevation)
470 # transform UTM points to lon/lat
471 lon_grid, lat_grid = \
472 transform_coordArray(CRS(grid_utm_epsg).to_wkt(), CRS(4326).to_wkt(), x_grid_utm, y_grid_utm)
473 lons = lon_grid.flatten()
474 lats = lat_grid.flatten()
476 # compute floating point EnMAP image coordinates for the selected UTM points
477 rows, cols = self.transform_LonLatHeight_to_RowCol(lon=lons, lat=lats, height=heights)
479 # interpolate lon/lats to get lon/lat coordinates integer image coordinates of EnMAP image
480 rows_im, cols_im = self.enmapIm_dims_sensorgeo
481 out_rows_grid, out_cols_grid = np.meshgrid(range(rows_im), range(cols_im), indexing='ij')
482 out_xy_pairs = np.vstack([out_cols_grid.flatten(), out_rows_grid.flatten()]).T
483 in_xy_pairs = np.array((cols, rows)).T
485 # Compute the triangulation (that takes time and can be computed for all values to be interpolated at once),
486 # then run the interpolation
487 triangulation = Delaunay(in_xy_pairs)
488 lons_interp = LinearNDInterpolator(triangulation, lons)(out_xy_pairs).reshape(*out_rows_grid.shape)
489 lats_interp = LinearNDInterpolator(triangulation, lats)(out_xy_pairs).reshape(*out_rows_grid.shape)
491 # lons_interp / lats_interp may contain NaN values in case xmin, ymin, xmax, ymax has been set too small
492 # to account for RPC transformation errors
493 # => fix that by extrapolation at NaN value positions
494 # FIXME: can this be avoided by modified xmin/ymin/xmy/ymax coords?
496 lons_interp = self._fill_nans_at_corners(lons_interp, along_axis=0) # extrapolate in left/right direction
497 lats_interp = self._fill_nans_at_corners(lats_interp, along_axis=1)
499 # return a geolayer in the exact dimensions like the EnMAP detector array
500 return lons_interp, lats_interp
502 def __call__(self, *args, **kwargs):
503 return self.compute_geolayer()
506global_dem_sensorgeo: Optional[GeoArray] = None
509def _initialize_mp(elevation: Union[float, np.ndarray]):
510 """Declare global variables needed for RPC_3D_Geolayer_Generator._compute_geolayer_for_unique_coeffgroup().
512 :param elevation: elevation - either as average value (float) or as a numpy array
513 """
514 global global_dem_sensorgeo
515 global_dem_sensorgeo = elevation
518class RPC_3D_Geolayer_Generator(object):
519 """Class for creating band- AND pixel-wise longitude/latitude arrays based on rational polynomial coeff. (RPC)."""
521 def __init__(self,
522 rpc_coeffs_per_band: dict,
523 elevation: Union[str, GeoArray, int, float],
524 enmapIm_cornerCoords: Tuple[Tuple[float, float]],
525 enmapIm_dims_sensorgeo: Tuple[int, int],
526 CPUs: int = None):
527 """Get an instance of RPC_3D_Geolayer_Generator.
529 :param rpc_coeffs_per_band: dictionary of RPC coefficients for each EnMAP band
530 ({'band_1': <rpc_coeffs_dict>,
531 'band_2': <rpc_coeffs_dict>,
532 ...})
533 :param elevation: digital elevation model in MAP geometry (file path or instance of GeoArray) OR
534 average elevation in meters as integer or float
535 :param enmapIm_cornerCoords: corner coordinates as tuple of lon/lat pairs
536 :param enmapIm_dims_sensorgeo: dimensions of the EnMAP image in sensor geometry (rows, colunms)
537 :param CPUs: number of CPUs to use
538 """
539 self.rpc_coeffs_per_band = OrderedDict(natsorted(rpc_coeffs_per_band.items()))
540 self.elevation = elevation
541 self.enmapIm_cornerCoords = enmapIm_cornerCoords
542 self.enmapIm_dims_sensorgeo = enmapIm_dims_sensorgeo
543 self.CPUs = CPUs or cpu_count()
545 if not isinstance(elevation, (int, float)):
546 # get validated DEM in map geometry
547 # self.logger.debug('Verifying DEM...') # TODO
548 from ..dem_preprocessing import DEM_Processor
549 self.elevation = DEM_Processor(dem_path_geoarray=elevation,
550 enmapIm_cornerCoords=enmapIm_cornerCoords).dem
551 # TODO clip DEM to needed area
552 self.elevation.to_mem()
553 # self.elevation.reproject_to_new_grid()
555 self.bandgroups_with_unique_rpc_coeffs = self._get_bandgroups_with_unique_rpc_coeffs()
557 def _get_bandgroups_with_unique_rpc_coeffs(self) -> List[List]:
558 # combine RPC coefficients of all bands in a single numpy array
559 band_inds = list(range(len(self.rpc_coeffs_per_band)))
560 coeffs_first_band = list(self.rpc_coeffs_per_band.values())[0]
561 keys_float = [k for k in coeffs_first_band
562 if isinstance(coeffs_first_band[k], float)]
563 keys_npa = [k for k in coeffs_first_band
564 if isinstance(coeffs_first_band[k], np.ndarray)]
566 coeffs_allbands = None
567 for i, coeffdict in enumerate(self.rpc_coeffs_per_band.values()):
568 coeffs_curband = np.hstack([[coeffdict[k] for k in keys_float],
569 *(coeffdict[k] for k in keys_npa)])
571 if coeffs_allbands is None:
572 coeffs_allbands = np.zeros((len(band_inds),
573 1 + len(coeffs_curband)))
574 coeffs_allbands[:, 0] = band_inds
576 coeffs_allbands[i, 1:] = coeffs_curband
578 # get groups of band indices where bands have the same RPC coefficients
579 groups = npi.group_by(coeffs_allbands[:, 1:]).split(coeffs_allbands[:, 0])
580 groups_bandinds = [group.astype(int).tolist() for group in groups]
582 return groups_bandinds
584 @staticmethod
585 def _compute_geolayer_for_unique_coeffgroup(kwargs):
586 lons, lats = \
587 RPC_Geolayer_Generator(rpc_coeffs=kwargs['rpc_coeffs'],
588 elevation=global_dem_sensorgeo,
589 enmapIm_cornerCoords=kwargs['enmapIm_cornerCoords'],
590 enmapIm_dims_sensorgeo=kwargs['enmapIm_dims_sensorgeo']
591 ).compute_geolayer()
593 return lons, lats, kwargs['group_idx']
595 def compute_geolayer(self):
596 rows, cols = self.enmapIm_dims_sensorgeo
597 bands = len(self.rpc_coeffs_per_band)
598 lons = np.empty((rows, cols, bands), dtype=float)
599 lats = np.empty((rows, cols, bands), dtype=float)
601 rpc_coeffs_list = list(self.rpc_coeffs_per_band.values())
603 # get kwargs for each group of unique RPC coefficients
604 kwargs_list = [dict(rpc_coeffs=rpc_coeffs_list[group_bandinds[0]],
605 enmapIm_cornerCoords=self.enmapIm_cornerCoords,
606 enmapIm_dims_sensorgeo=self.enmapIm_dims_sensorgeo,
607 group_idx=gi)
608 for gi, group_bandinds in enumerate(self.bandgroups_with_unique_rpc_coeffs)]
610 # compute the geolayer ONLY FOR ONE BAND per group with unique RPC coefficients
611 global global_dem_sensorgeo
612 global_dem_sensorgeo = self.elevation
614 if len(self.bandgroups_with_unique_rpc_coeffs) == 1:
615 lons_oneband, lats_oneband = self._compute_geolayer_for_unique_coeffgroup(kwargs_list[0])[:2]
617 lons = np.repeat(lons_oneband[:, :, np.newaxis], bands, axis=2)
618 lats = np.repeat(lats_oneband[:, :, np.newaxis], bands, axis=2)
620 else:
621 if self.CPUs > 1:
622 # multiprocessing (only in case there are multiple unique sets of RPC coefficients)
624 # FIXME: pickling back large lon/lat arrays to the main process may be an issue on small machines
625 # -> results could be temporarily written to disk in that case
626 # NOTE: With the small test dataset pickling has only a small effect on processing time.
627 with Pool(self.CPUs, initializer=_initialize_mp, initargs=[self.elevation]) as pool:
628 results = list(pool.imap_unordered(self._compute_geolayer_for_unique_coeffgroup, kwargs_list))
629 pool.close() # needed for coverage to work in multiprocessing
630 pool.join()
632 else:
633 # singleprocessing
634 results = [self._compute_geolayer_for_unique_coeffgroup(kwargs_list[gi])
635 for gi, group_bandinds in enumerate(self.bandgroups_with_unique_rpc_coeffs)]
637 for res in results:
638 band_lons, band_lats, group_idx = res
639 bandinds_to_assign = self.bandgroups_with_unique_rpc_coeffs[group_idx]
640 nbands_to_assign = len(bandinds_to_assign)
642 lons[:, :, bandinds_to_assign] = np.repeat(band_lons[:, :, np.newaxis], nbands_to_assign, axis=2)
643 lats[:, :, bandinds_to_assign] = np.repeat(band_lats[:, :, np.newaxis], nbands_to_assign, axis=2)
645 return lons, lats
648def compute_mapCoords_within_sensorGeoDims(sensorgeoCoords_YX: List[Tuple[float, float]],
649 rpc_coeffs: dict,
650 elevation: Union[str, GeoArray, int, float],
651 enmapIm_cornerCoords: Tuple[Tuple[float, float]],
652 enmapIm_dims_sensorgeo: Tuple[int, int],
653 ) -> List[Tuple[float, float]]:
654 """Compute map coordinates for a given image coordinate-pair of an EnMAP image in sensor geometry.
656 :param sensorgeoCoords_YX: list of requested sensor geometry positions [(row, column), (row, column), ...]
657 :param rpc_coeffs: RPC coefficients describing the relation between sensor and map geometry
658 :param elevation: digital elevation model in MAP geometry (file path or instance of GeoArray) OR
659 average elevation in meters as integer or float
660 :param enmapIm_cornerCoords: MAP coordinates of the EnMAP image
661 :param enmapIm_dims_sensorgeo: dimensions of the sensor geometry EnMAP image (rows, columns)
662 :return:
663 """
664 # compute coordinate array
665 RPCGG = RPC_Geolayer_Generator(rpc_coeffs=rpc_coeffs,
666 elevation=elevation,
667 enmapIm_cornerCoords=enmapIm_cornerCoords, # order does not matter
668 enmapIm_dims_sensorgeo=enmapIm_dims_sensorgeo)
669 lons, lats = RPCGG.compute_geolayer()
671 # extract the new corner coordinate from the coordinate arrays computed via RPCs
672 rows, cols = enmapIm_dims_sensorgeo
674 ul, ur, ll, lr = enmapIm_cornerCoords
676 lonlats = []
677 for imYX in sensorgeoCoords_YX:
678 lonlat = \
679 ul if imYX == (0, 0) else \
680 ur if imYX == (0, cols - 1) else \
681 ll if imYX == (rows - 1, 0) else \
682 lr if imYX == (rows - 1, cols - 1) else \
683 (lons[imYX], lats[imYX])
685 lonlats.append(lonlat)
687 return lonlats
690def get_UTMEPSG_from_LonLat(lon: float, lat: float) -> int:
691 zoneNr = int(1 + (lon + 180.0) / 6.0)
692 isNorth = lat >= 0
694 return int('326' + str(zoneNr)) if isNorth else int('327' + str(zoneNr))
697def get_center_coord(cornerCoordsXY):
698 # FIXME center coord is not equal to center of bounding box
699 cornerCoordsXY = np.array(cornerCoordsXY)
700 x_center = float(np.mean([cornerCoordsXY[:, 0].min(), cornerCoordsXY[:, 0].max()]))
701 y_center = float(np.mean([cornerCoordsXY[:, 1].min(), cornerCoordsXY[:, 1].max()]))
703 return x_center, y_center
706def get_UTMEPSG_from_LonLat_cornersXY(lons: List[float], lats: List[float]):
707 return get_UTMEPSG_from_LonLat(*get_center_coord(list(zip(lons, lats))))