# -*- coding: utf-8 -*-
# EnPT, EnMAP Processing Tool - A Python package for pre-processing of EnMAP Level-1B data
#
# Copyright (C) 2018-2024 Karl Segl (GFZ Potsdam, segl@gfz-potsdam.de), Daniel Scheffler
# (GFZ Potsdam, danschef@gfz-potsdam.de), Niklas Bohn (GFZ Potsdam, nbohn@gfz-potsdam.de),
# Stéphane Guillaso (GFZ Potsdam, stephane.guillaso@gfz-potsdam.de)
#
# This software was developed within the context of the EnMAP project supported
# by the DLR Space Administration with funds of the German Federal Ministry of
# Economic Affairs and Energy (on the basis of a decision by the German Bundestag:
# 50 EE 1529) and contributions from DLR, GFZ and OHB System AG.
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version. Please note the following exception: `EnPT` depends on tqdm, which
# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <https://www.gnu.org/licenses/>.
"""EnPT module 'spatial transform', containing everything related to spatial transformations."""
from typing import Union, Tuple, List, Optional, Sequence # noqa: F401
from multiprocessing import Pool, cpu_count
from collections import OrderedDict
import numpy as np
from scipy.interpolate import LinearNDInterpolator, make_interp_spline
from scipy.spatial import Delaunay
from geoarray import GeoArray
from natsort import natsorted
import numpy_indexed as npi
from pyproj import CRS
from sensormapgeo import Transformer
from sensormapgeo.pyresample_backend.transformer_2d import AreaDefinition
from py_tools_ds.geo.projection import prj_equal
from py_tools_ds.geo.coord_grid import find_nearest
from py_tools_ds.geo.coord_trafo import transform_any_prj, transform_coordArray
from ...options.config import enmap_coordinate_grid_utm
__author__ = 'Daniel Scheffler'
[docs]
def move_extent_to_coord_grid(extent_utm: Tuple[float, float, float, float],
tgt_xgrid: Sequence[float],
tgt_ygrid: Sequence[float],
) -> Tuple[float, float, float, float]:
"""Move the given coordinate extent to a coordinate grid.
:param extent_utm: xmin, ymin, xmax, ymax coordinates
:param tgt_xgrid: target X coordinate grid, e.g. [0, 30]
:param tgt_ygrid: target Y coordinate grid, e.g. [0, 30]
"""
xmin, ymin, xmax, ymax = extent_utm
tgt_xmin = find_nearest(tgt_xgrid, xmin, roundAlg='off', extrapolate=True)
tgt_xmax = find_nearest(tgt_xgrid, xmax, roundAlg='on', extrapolate=True)
tgt_ymin = find_nearest(tgt_ygrid, ymin, roundAlg='off', extrapolate=True)
tgt_ymax = find_nearest(tgt_ygrid, ymax, roundAlg='on', extrapolate=True)
return tgt_xmin, tgt_ymin, tgt_xmax, tgt_ymax
[docs]
class RPC_Geolayer_Generator(object):
"""Class for creating pixel-wise longitude/latitude arrays based on rational polynomial coefficients (RPC)."""
def __init__(self,
rpc_coeffs: dict,
elevation: Union[str, GeoArray, int, float],
enmapIm_cornerCoords: Tuple[Tuple[float, float]],
enmapIm_dims_sensorgeo: Tuple[int, int]):
"""Get an instance of RPC_Geolayer_Generator.
:param rpc_coeffs: RPC coefficients for a single EnMAP band
:param elevation: digital elevation model in map geometry (file path or instance of GeoArray) OR
average elevation in meters as integer or float
:param enmapIm_cornerCoords: corner coordinates as tuple of lon/lat pairs
:param enmapIm_dims_sensorgeo: dimensions of the EnMAP image in sensor geometry (rows, columns)
"""
self.row_off = rpc_coeffs['row_off']
self.col_off = rpc_coeffs['col_off']
self.lat_off = rpc_coeffs['lat_off']
self.lon_off = rpc_coeffs['long_off']
self.height_off = rpc_coeffs['height_off']
self.row_scale = rpc_coeffs['row_scale']
self.col_scale = rpc_coeffs['col_scale']
self.lat_scale = rpc_coeffs['lat_scale']
self.lon_scale = rpc_coeffs['long_scale']
self.height_scale = rpc_coeffs['height_scale']
self.row_num_coeffs = rpc_coeffs['row_num_coeffs']
self.row_den_coeffs = rpc_coeffs['row_den_coeffs']
self.col_num_coeffs = rpc_coeffs['col_num_coeffs']
self.col_den_coeffs = rpc_coeffs['col_den_coeffs']
self.elevation = elevation if isinstance(elevation, (int, float)) else GeoArray(elevation)
self.enmapIm_cornerCoords = enmapIm_cornerCoords
self.enmapIm_dims_sensorgeo = enmapIm_dims_sensorgeo
[docs]
def _normalize_map_coordinates(self,
lon: np.ndarray,
lat: np.ndarray,
height: np.ndarray) -> (np.ndarray, np.ndarray, np.ndarray):
"""Normalize map coordinates to [-1, +1] to improve numerical precision.
:param lon: longitude array
:param lat: latitude array
:param height: elevation array
"""
if not lon.shape == lat.shape == height.shape:
raise ValueError((lon.shape, lat.shape, height.shape),
'Longitude, latitude and height arrays are expected to have the same dimensions.')
lon_norm = (lon - self.lon_off) / self.lon_scale # longitude
lat_norm = (lat - self.lat_off) / self.lat_scale # latitude
height_norm = (height - self.height_off) / self.height_scale # elevation
msg = 'Coordinate normalization yields significantly out-of-range values for %s. ' \
'Check the coordinates and RPC coefficients.'
# for llh, name in zip([lon_norm, lat_norm, height_norm], ['longitudes', 'latitudes', 'heights']):
for llh, name in zip([lon_norm, lat_norm, height_norm], ['longitudes', 'latitudes']):
if llh.min() < -1.1 or llh.max() > 1.1:
raise RuntimeError((llh.min(), llh.max()), msg % name)
return lon_norm, lat_norm, height_norm
[docs]
def _compute_normalized_image_coordinates(self,
lon_norm: np.ndarray,
lat_norm: np.ndarray,
height_norm: np.ndarray) -> (np.ndarray, np.ndarray):
"""Compute normalized sensor geometry coordinates for the given lon/lat/height positions.
:param lon_norm: normalized longitudes
:param lat_norm: normalized latitudes
:param height_norm: normalized elevation
:return:
"""
P = lat_norm.flatten()
L = lon_norm.flatten()
H = height_norm.flatten()
u = np.zeros((P.size, 20))
u_data = (ui for ui in [
1,
L,
P,
H,
L * P,
L * H,
P * H,
L ** 2,
P ** 2,
H ** 2,
P * L * H,
L ** 3,
L * P ** 2,
L * H ** 2,
L ** 2 * P,
P ** 3,
P * H ** 2,
L ** 2 * H,
P ** 2 * H,
H ** 3
])
for i, ud in enumerate(u_data):
u[:, i] = ud
num_row_norm = np.sum(self.row_num_coeffs * u, axis=1)
den_row_norm = np.sum(self.row_den_coeffs * u, axis=1)
num_col_norm = np.sum(self.col_num_coeffs * u, axis=1)
den_col_norm = np.sum(self.col_den_coeffs * u, axis=1)
row_norm = num_row_norm / den_row_norm
col_norm = num_col_norm / den_col_norm
return row_norm, col_norm
[docs]
def _denormalize_image_coordinates(self,
row_norm: np.ndarray,
col_norm: np.ndarray) -> (np.ndarray, np.ndarray):
"""De-normalize normalized sensor geometry coordinates to get valid image coordinates.
:param row_norm: normalized rows
:param col_norm: normalized columns
:return: de-normalized rows array, de-normalized columns array,
"""
rows = row_norm * self.row_scale + self.row_off
cols = col_norm * self.col_scale + self.col_off
return rows, cols
[docs]
@staticmethod
def _fill_nans_at_corners(arr: np.ndarray, along_axis: int = 0) -> np.ndarray:
if not arr.ndim == 2:
raise ValueError(arr.ndim, '2D numpy array expected.')
if along_axis not in [0, 1]:
raise ValueError(along_axis, "The 'axis' parameter must be set to 0 or 1")
arr[0, 0] = np.nan
arr[0, 1] = np.nan
arr[1, 1] = np.nan
nans = np.isnan(arr)
if along_axis == 0:
# extrapolate linearly in top/bottom direction
cols_with_nan = np.arange(arr.shape[1])[np.any(nans, axis=0)]
for col in cols_with_nan:
data = arr[:, col]
idx_goodvals = np.argwhere(~nans[:, col]).flatten()
arr[:, col] = make_interp_spline(idx_goodvals, data[idx_goodvals], k=1)(range(data.size))
else:
# extrapolate linearly in left/right direction
rows_with_nan = np.arange(arr.shape[0])[np.any(nans, axis=1)]
for row in rows_with_nan:
data = arr[row, :]
idx_goodvals = np.argwhere(~nans[row, :]).flatten()
arr[row, :] = make_interp_spline(idx_goodvals, data[idx_goodvals], k=1)(range(data.size))
return arr
[docs]
def compute_geolayer(self) -> (np.ndarray, np.ndarray):
"""Compute pixel-wise lon/lat arrays based on RPC coefficients, corner coordinates and image dimensions.
:return: (2D longitude array, 2D latitude array)
"""
# transform corner coordinates of EnMAP image to UTM
grid_utm_epsg = get_UTMEPSG_from_LonLat(*get_center_coord(self.enmapIm_cornerCoords))
cornerCoordsUTM = np.array([transform_any_prj(4326, grid_utm_epsg, lon, lat)
for lon, lat in self.enmapIm_cornerCoords])
xmin, xmax = cornerCoordsUTM[:, 0].min(), cornerCoordsUTM[:, 0].max()
ymin, ymax = cornerCoordsUTM[:, 1].min(), cornerCoordsUTM[:, 1].max()
# get UTM bounding box and move it to the EnMAP grid
xmin, ymin, xmax, ymax = \
move_extent_to_coord_grid((xmin, ymin, xmax, ymax),
enmap_coordinate_grid_utm['x'], enmap_coordinate_grid_utm['y'])
# set up a regular grid of UTM points with a specific mesh width
meshwidth = 300 # 10 EnMAP pixels
y_grid_utm, x_grid_utm = np.meshgrid(np.arange(ymax, ymin - meshwidth, -meshwidth),
np.arange(xmin, xmax + meshwidth, meshwidth),
indexing='ij')
if not isinstance(self.elevation, (int, float)):
# transform UTM grid to DEM projection
x_grid_demPrj, y_grid_demPrj = \
(x_grid_utm, y_grid_utm) if prj_equal(grid_utm_epsg, self.elevation.epsg) else \
transform_coordArray(CRS(grid_utm_epsg).to_wkt(),
CRS(self.elevation.epsg).to_wkt(),
x_grid_utm, y_grid_utm)
# retrieve corresponding heights from DEM
# -> resample DEM to EnMAP grid?
xy_pairs_demPrj = np.vstack([x_grid_demPrj.flatten(), y_grid_demPrj.flatten()]).T
heights = self.elevation.read_pointData(xy_pairs_demPrj).flatten()
else:
heights = np.full_like(x_grid_utm.flatten(), self.elevation)
# transform UTM points to lon/lat
lon_grid, lat_grid = \
transform_coordArray(CRS(grid_utm_epsg).to_wkt(), CRS(4326).to_wkt(), x_grid_utm, y_grid_utm)
lons = lon_grid.flatten()
lats = lat_grid.flatten()
# compute floating point EnMAP image coordinates for the selected UTM points
rows, cols = self.transform_LonLatHeight_to_RowCol(lon=lons, lat=lats, height=heights)
# interpolate lon/lats to get lon/lat coordinates integer image coordinates of EnMAP image
rows_im, cols_im = self.enmapIm_dims_sensorgeo
out_rows_grid, out_cols_grid = np.meshgrid(range(rows_im), range(cols_im), indexing='ij')
out_xy_pairs = np.vstack([out_cols_grid.flatten(), out_rows_grid.flatten()]).T
in_xy_pairs = np.array((cols, rows)).T
# Compute the triangulation (that takes time and can be computed for all values to be interpolated at once),
# then run the interpolation
triangulation = Delaunay(in_xy_pairs)
lons_interp = LinearNDInterpolator(triangulation, lons)(out_xy_pairs).reshape(*out_rows_grid.shape)
lats_interp = LinearNDInterpolator(triangulation, lats)(out_xy_pairs).reshape(*out_rows_grid.shape)
# lons_interp / lats_interp may contain NaN values in case xmin, ymin, xmax, ymax has been set too small
# to account for RPC transformation errors
# => fix that by extrapolation at NaN value positions
# FIXME: can this be avoided by modified xmin/ymin/xmy/ymax coords?
lons_interp = self._fill_nans_at_corners(lons_interp, along_axis=0) # extrapolate in left/right direction
lats_interp = self._fill_nans_at_corners(lats_interp, along_axis=1)
# return a geolayer in the exact dimensions like the EnMAP detector array
return lons_interp, lats_interp
def __call__(self, *args, **kwargs):
return self.compute_geolayer()
global_dem_sensorgeo: Optional[GeoArray] = None
[docs]
def _initialize_mp(elevation: Union[float, np.ndarray]):
"""Declare global variables needed for RPC_3D_Geolayer_Generator._compute_geolayer_for_unique_coeffgroup().
:param elevation: elevation - either as average value (float) or as a numpy array
"""
global global_dem_sensorgeo
global_dem_sensorgeo = elevation
[docs]
class RPC_3D_Geolayer_Generator(object):
"""Class for creating band- AND pixel-wise longitude/latitude arrays based on rational polynomial coeff. (RPC)."""
def __init__(self,
rpc_coeffs_per_band: dict,
elevation: Union[str, GeoArray, int, float],
enmapIm_cornerCoords: Tuple[Tuple[float, float]],
enmapIm_dims_sensorgeo: Tuple[int, int],
CPUs: int = None):
"""Get an instance of RPC_3D_Geolayer_Generator.
:param rpc_coeffs_per_band: dictionary of RPC coefficients for each EnMAP band
({'band_1': <rpc_coeffs_dict>,
'band_2': <rpc_coeffs_dict>,
...})
:param elevation: digital elevation model in MAP geometry (file path or instance of GeoArray) OR
average elevation in meters as integer or float
:param enmapIm_cornerCoords: corner coordinates as tuple of lon/lat pairs
:param enmapIm_dims_sensorgeo: dimensions of the EnMAP image in sensor geometry (rows, colunms)
:param CPUs: number of CPUs to use
"""
self.rpc_coeffs_per_band = OrderedDict(natsorted(rpc_coeffs_per_band.items()))
self.elevation = elevation
self.enmapIm_cornerCoords = enmapIm_cornerCoords
self.enmapIm_dims_sensorgeo = enmapIm_dims_sensorgeo
self.CPUs = CPUs or cpu_count()
if not isinstance(elevation, (int, float)):
# get validated DEM in map geometry
# self.logger.debug('Verifying DEM...') # TODO
from ..dem_preprocessing import DEM_Processor
self.elevation = DEM_Processor(dem_path_geoarray=elevation,
enmapIm_cornerCoords=enmapIm_cornerCoords).dem
# TODO clip DEM to needed area
self.elevation.to_mem()
# self.elevation.reproject_to_new_grid()
self.bandgroups_with_unique_rpc_coeffs = self._get_bandgroups_with_unique_rpc_coeffs()
[docs]
def _get_bandgroups_with_unique_rpc_coeffs(self) -> List[List]:
# combine RPC coefficients of all bands in a single numpy array
band_inds = list(range(len(self.rpc_coeffs_per_band)))
coeffs_first_band = list(self.rpc_coeffs_per_band.values())[0]
keys_float = [k for k in coeffs_first_band
if isinstance(coeffs_first_band[k], float)]
keys_npa = [k for k in coeffs_first_band
if isinstance(coeffs_first_band[k], np.ndarray)]
coeffs_allbands = None
for i, coeffdict in enumerate(self.rpc_coeffs_per_band.values()):
coeffs_curband = np.hstack([[coeffdict[k] for k in keys_float],
*(coeffdict[k] for k in keys_npa)])
if coeffs_allbands is None:
coeffs_allbands = np.zeros((len(band_inds),
1 + len(coeffs_curband)))
coeffs_allbands[:, 0] = band_inds
coeffs_allbands[i, 1:] = coeffs_curband
# get groups of band indices where bands have the same RPC coefficients
groups = npi.group_by(coeffs_allbands[:, 1:]).split(coeffs_allbands[:, 0])
groups_bandinds = [group.astype(int).tolist() for group in groups]
return groups_bandinds
[docs]
@staticmethod
def _compute_geolayer_for_unique_coeffgroup(kwargs):
lons, lats = \
RPC_Geolayer_Generator(rpc_coeffs=kwargs['rpc_coeffs'],
elevation=global_dem_sensorgeo,
enmapIm_cornerCoords=kwargs['enmapIm_cornerCoords'],
enmapIm_dims_sensorgeo=kwargs['enmapIm_dims_sensorgeo']
).compute_geolayer()
return lons, lats, kwargs['group_idx']
[docs]
def compute_geolayer(self):
rows, cols = self.enmapIm_dims_sensorgeo
bands = len(self.rpc_coeffs_per_band)
lons = np.empty((rows, cols, bands), dtype=float)
lats = np.empty((rows, cols, bands), dtype=float)
rpc_coeffs_list = list(self.rpc_coeffs_per_band.values())
# get kwargs for each group of unique RPC coefficients
kwargs_list = [dict(rpc_coeffs=rpc_coeffs_list[group_bandinds[0]],
enmapIm_cornerCoords=self.enmapIm_cornerCoords,
enmapIm_dims_sensorgeo=self.enmapIm_dims_sensorgeo,
group_idx=gi)
for gi, group_bandinds in enumerate(self.bandgroups_with_unique_rpc_coeffs)]
# compute the geolayer ONLY FOR ONE BAND per group with unique RPC coefficients
global global_dem_sensorgeo
global_dem_sensorgeo = self.elevation
if len(self.bandgroups_with_unique_rpc_coeffs) == 1:
lons_oneband, lats_oneband = self._compute_geolayer_for_unique_coeffgroup(kwargs_list[0])[:2]
lons = np.repeat(lons_oneband[:, :, np.newaxis], bands, axis=2)
lats = np.repeat(lats_oneband[:, :, np.newaxis], bands, axis=2)
else:
if self.CPUs > 1:
# multiprocessing (only in case there are multiple unique sets of RPC coefficients)
# FIXME: pickling back large lon/lat arrays to the main process may be an issue on small machines
# -> results could be temporarily written to disk in that case
# NOTE: With the small test dataset pickling has only a small effect on processing time.
with Pool(self.CPUs, initializer=_initialize_mp, initargs=[self.elevation]) as pool:
results = list(pool.imap_unordered(self._compute_geolayer_for_unique_coeffgroup, kwargs_list))
pool.close() # needed for coverage to work in multiprocessing
pool.join()
else:
# singleprocessing
results = [self._compute_geolayer_for_unique_coeffgroup(kwargs_list[gi])
for gi, group_bandinds in enumerate(self.bandgroups_with_unique_rpc_coeffs)]
for res in results:
band_lons, band_lats, group_idx = res
bandinds_to_assign = self.bandgroups_with_unique_rpc_coeffs[group_idx]
nbands_to_assign = len(bandinds_to_assign)
lons[:, :, bandinds_to_assign] = np.repeat(band_lons[:, :, np.newaxis], nbands_to_assign, axis=2)
lats[:, :, bandinds_to_assign] = np.repeat(band_lats[:, :, np.newaxis], nbands_to_assign, axis=2)
return lons, lats
[docs]
def compute_mapCoords_within_sensorGeoDims(sensorgeoCoords_YX: List[Tuple[float, float]],
rpc_coeffs: dict,
elevation: Union[str, GeoArray, int, float],
enmapIm_cornerCoords: Tuple[Tuple[float, float]],
enmapIm_dims_sensorgeo: Tuple[int, int],
) -> List[Tuple[float, float]]:
"""Compute map coordinates for a given image coordinate-pair of an EnMAP image in sensor geometry.
:param sensorgeoCoords_YX: list of requested sensor geometry positions [(row, column), (row, column), ...]
:param rpc_coeffs: RPC coefficients describing the relation between sensor and map geometry
:param elevation: digital elevation model in MAP geometry (file path or instance of GeoArray) OR
average elevation in meters as integer or float
:param enmapIm_cornerCoords: MAP coordinates of the EnMAP image
:param enmapIm_dims_sensorgeo: dimensions of the sensor geometry EnMAP image (rows, columns)
:return:
"""
# compute coordinate array
RPCGG = RPC_Geolayer_Generator(rpc_coeffs=rpc_coeffs,
elevation=elevation,
enmapIm_cornerCoords=enmapIm_cornerCoords, # order does not matter
enmapIm_dims_sensorgeo=enmapIm_dims_sensorgeo)
lons, lats = RPCGG.compute_geolayer()
# extract the new corner coordinate from the coordinate arrays computed via RPCs
rows, cols = enmapIm_dims_sensorgeo
ul, ur, ll, lr = enmapIm_cornerCoords
lonlats = []
for imYX in sensorgeoCoords_YX:
lonlat = \
ul if imYX == (0, 0) else \
ur if imYX == (0, cols - 1) else \
ll if imYX == (rows - 1, 0) else \
lr if imYX == (rows - 1, cols - 1) else \
(lons[imYX], lats[imYX])
lonlats.append(lonlat)
return lonlats
[docs]
def get_UTMEPSG_from_LonLat(lon: float, lat: float) -> int:
zoneNr = int(1 + (lon + 180.0) / 6.0)
isNorth = lat >= 0
return int('326' + str(zoneNr)) if isNorth else int('327' + str(zoneNr))
[docs]
def get_center_coord(cornerCoordsXY):
# FIXME center coord is not equal to center of bounding box
cornerCoordsXY = np.array(cornerCoordsXY)
x_center = float(np.mean([cornerCoordsXY[:, 0].min(), cornerCoordsXY[:, 0].max()]))
y_center = float(np.mean([cornerCoordsXY[:, 1].min(), cornerCoordsXY[:, 1].max()]))
return x_center, y_center
[docs]
def get_UTMEPSG_from_LonLat_cornersXY(lons: List[float], lats: List[float]):
return get_UTMEPSG_from_LonLat(*get_center_coord(list(zip(lons, lats))))