from matplotlib.pyplot import *
import numpy as np
from numpy import dot
from numpy.linalg import inv, solve
from copy import copy
import tables
import importlib
from numba import jit
from time import time, sleep
from multiprocessing import Pool
from multiprocessing import cpu_count
from itertools import product
import pyprind
from tqdm import tqdm
# custom imports
from ..Tools import initializer, SharedNdarray
from ..Tools import SolarIrradiance
from ..Tools import box_rspf, gauss_rspf
__author = "Niklas Bohn, Andre Hollstein"
[docs]
@jit(forceobj=True) # equivalent to deprecated nopython=False (would be better/faster to make it nopython-compatible)
def nrm(aa, bb):
cc = 0.0
nn = aa.shape[0]
for ii in range(nn):
cc += np.abs(aa[ii] - bb[ii])
return cc / nn
[docs]
@jit(nopython=True)
def int_rho(n_inp, xx_inp, yy_inp, n_int, xx_int, yy_out, jac_out):
"""
:param n_inp:
:param xx_inp:
:param yy_inp:
:param n_int:
:param xx_int:
:param yy_out:
:param jac_out:
:return:
"""
for i_int in range(n_int):
for i_inp in range(n_inp):
jac_out[i_inp, i_int] = 0.0
for i_int in range(n_int):
for i_inp in range(n_inp):
if xx_int[i_int] < xx_inp[i_inp]:
break
dx = xx_inp[i_inp] - xx_inp[i_inp - 1]
jac_out[i_inp - 1, i_int] = (xx_inp[i_inp] - xx_int[i_int]) / dx
jac_out[i_inp, i_int] = (xx_int[i_int] - xx_inp[i_inp - 1]) / dx
yy_out[i_int] = yy_inp[i_inp - 1] * jac_out[i_inp - 1, i_int] + yy_inp[i_inp] * jac_out[i_inp, i_int]
# noinspection PyShadowingNames
[docs]
@jit(nopython=True)
def apply_bounds(xii, xi, bounds_up, bounds_dn):
nn = xii.shape[0]
step = 0.5
for ii in range(nn):
if xii[ii] >= bounds_up[ii]:
xii[ii] = xi[ii] + step * (bounds_up[ii] - xi[ii])
if xii[ii] < bounds_dn[ii]:
xii[ii] = xi[ii] + step * (bounds_dn[ii] - xi[ii])
# noinspection PyShadowingNames
[docs]
def sat(wvl_inst, wvl_rsp, solar, rspf_type="gaussian", sigma=None, width=None):
"""
Calculate normalized sensor specific response function with respect to given instrument wavelengths.
:param wvl_inst: instrument wavelengths
:param wvl_rsp: response wavelengths
:param solar: solar irradiances
:param rspf_type: type of response function, either "gaussian" or "box"
:param sigma: fwhm of instrument bands (only for type "gaussian")
:param width: fwhm of instrument bands (only for type "box")
:return: dictionary containing instrument wavelengths, response wavelengths, sensor specific response function and
solar irradiances
"""
rsp = np.zeros((len(wvl_inst), len(wvl_rsp)))
sol = np.zeros(len(wvl_inst))
if sigma is not None:
try:
_ = sigma[0] # sigma is array
assert len(sigma) == len(wvl_inst)
sigmas = sigma
except TypeError:
sigmas = sigma * np.ones(len(wvl_inst))
else:
sigmas = None
if width is not None:
try:
_ = width[0] # sigma is array
assert len(width) == len(wvl_inst)
widths = width
except TypeError:
widths = width * np.ones(len(wvl_inst))
else:
widths = None
# calculate response function
if rspf_type == "gaussian":
for ii_chl, (wvl_center, sigma) in enumerate(zip(wvl_inst, sigmas)):
rsp[ii_chl, :] = gauss_rspf(wvl_center=wvl_center, wvl_sol_irr=wvl_rsp, sigma=sigma)
sol[ii_chl] = np.trapz(x=solar.wvl,
y=gauss_rspf(wvl_center=wvl_center, sigma=sigma, wvl_sol_irr=solar.wvl) * solar())
elif rspf_type == "box":
for ii_chl, (wvl_center, width) in enumerate(zip(wvl_inst, widths)):
rsp[ii_chl, :] = box_rspf(wvl_center=wvl_center, wvl_sol_irr=wvl_rsp, width=width)
sol[ii_chl] = np.trapz(x=solar.wvl,
y=box_rspf(wvl_center=wvl_center, width=width, wvl_sol_irr=solar.wvl) * solar())
else:
raise ValueError("Type is wrong.")
return {"wvl_inst": wvl_inst, "wvl_rsp": wvl_rsp, "rspf": rsp, "sol_irr": sol}
# noinspection PyShadowingNames
def __ff_jj_sc__(self, xi, sc_jj):
ff, jj = self.reflectance_toa_j(pt=xi)
jj_t_sc = sc_jj * jj.transpose() # for numpy broadcasting of sc_jj
jj_sc = jj_t_sc.transpose()
return ff, jj_sc, jj_t_sc
# noinspection PyUnresolvedReferences
@jit(forceobj=True) # equivalent to deprecated nopython=False (would be better/faster to make it nopython-compatible)
def __se_jj_sa_jit__(ssei, ssai, jj, g):
ni = ssai.shape[0]
nk = ssei.shape[0]
bb = np.zeros((ni, ni))
for ii in range(ni):
for jj in range(ni):
bb[ii, jj] = 0.0
bb[ii, ii] = bb[ii, ii] + (1.0 + g) * ssai[ii]
for ii in range(ni):
for kk in range(nk):
jj_ii_kk = jj[ii, kk] * ssei[kk]
if jj_ii_kk != 0.0:
for jj in range(ni):
bb[ii, jj] += jj_ii_kk * jj[jj, kk]
return bb
def __se_jj_sa_np__(ssei, ssai, jj, g):
jj_sc = jj
jj_t_sc = jj.transpose()
return dot(ssei * jj_sc, jj_t_sc) + (1.0 + g) * np.diag(ssai)
# noinspection PyShadowingNames,PyShadowingNames
[docs]
def opt(self, y_rfl, pt0, rho_0=[{"method": "const", "value": 0.1},
{"method": "guess", "sigma": None, "fill": 0.1},
{'method': 'prescribed'}][0],
ftol=None, ftol_rel=None, ee=1.05, n_iter=55, n_bad=5, debug=False, g=0.0,
ssei=None, ssai=None, instrument_error_model=None, compute_errors=True, sc_oe_thres=0.01,
optimization=True, se_jj_sa=__se_jj_sa_jit__, rho_fg=None
):
from scipy.ndimage import gaussian_filter1d # import here to avoid static TLS ImportError
t0 = time()
if ssei is None:
assert hasattr(instrument_error_model,
'__call__'), "instrument_error_model should be callable which converts refl. to error."
# noinspection PyTypeChecker
ssei = np.nan_to_num(1.0 / (instrument_error_model(y_rfl) ** 2.0))
else:
# noinspection PyTypeChecker
ssei = np.nan_to_num(1.0 / (ssei ** 2.0))
ssei[ssei > 1e10] = 1e10
if len(ssai) != len(self.scale_dn): # not all errors given, assume that first ones are given
bf = np.ones(len(self.scale_dn), dtype=float)
bf[:len(ssai)] = ssai[:]
ssai = bf
apriori_errors_sc = self.x_phys_to_scaled(ssai, offset_scaling=0.0)[:self.n_jac_at]
# noinspection PyTypeChecker
ssai = np.nan_to_num(1.0 / self.x_phys_to_scaled(ssai, offset_scaling=0.0) ** 2.0) # only error, no offset needed
ssai[ssai > 1e10] = 1e10
sai = np.diag(ssai)
# each method should produce rho_rfl
if rho_0["method"] == "const":
rho_rfl = rho_0["value"] * np.ones(self.n_wvl)
elif rho_0["method"] == "prescribed":
rho_rfl = np.array(rho_fg, dtype=self.dtype)
rho_rfl[rho_rfl < 0.0] = 0.0
rho_rfl[rho_rfl > 1.0] = 0.0
elif rho_0["method"] == "guess":
rho_rfl = self.reflectance_boa(pt=pt0, toa_reflectance=y_rfl)
rho_rfl[rho_rfl < 0.0] = 0.0
rho_rfl[rho_rfl > 1.0] = 0.0
if rho_0["sigma"] is not None:
rho_rfl = gaussian_filter1d(rho_rfl, sigma=rho_0["sigma"])
else:
raise ValueError("rho_0 should be dictionary with parameters:%s" % str(rho_0))
rj = self.rho_to_rj(rho_rfl)
rj0 = copy(rj)
pt = copy(pt0)
xi_ph, yi_ph = self.ptrh2x(pt, rj)
xa_ph, ya_ph = self.ptrh2x(pt0, rj0)
xa_sc = self.x_phys_to_scaled(xa_ph)
xi_sc = self.x_phys_to_scaled(xi_ph)
if ftol_rel is not None:
assert ftol is None
ftol = np.mean(y_rfl) / ftol_rel
sc_jj = (self.bounds_up - self.bounds_dn) / (self.scale_up - self.scale_dn)
xi_ph_best = copy(xi_ph)
yi_ph_best = copy(yi_ph)
pt_best = copy(pt0)
rj_best = copy(rj)
pt_s = np.zeros(len(pt))
rj_s = np.zeros(len(rj))
nrms = []
result = {}
improved_result = False # we haven't improved jet
result["ff"] = y_rfl
jj_sc_best = np.zeros((len(xi_ph), self.n_wvl))
jj_t_sc_best = np.zeros((self.n_wvl, len(xi_ph)))
if np.max(apriori_errors_sc) >= sc_oe_thres and optimization is True:
ff_best = np.zeros(self.n_wvl)
nrm_min = 1e10 # nrm(y_rfl,self.reflectance_toa(pt=pt0,rho=rho_rfl))
i_bad = 0
for ii in range(n_iter):
# evaluate actual step, compute Jacobian
ff, jj_sc, jj_t_sc = __ff_jj_sc__(self, xi=yi_ph, sc_jj=sc_jj)
nrm_akt = nrm(y_rfl, ff)
if debug and np.isnan(nrm_akt):
# import pudb; pu.db
ff, jj_sc, jj_t_sc = __ff_jj_sc__(self, xi=yi_ph, sc_jj=sc_jj)
nrm_akt = nrm(y_rfl, ff)
if debug: # print debug
print(("step %2i,akt:%7.4f,min:%7.4f,ftol:%6.4f,ib:%2i->" % (ii, nrm_akt, nrm_min, ftol, i_bad)) +
(8 * "%.2f," % tuple(yi_ph[:8]))
)
if ee * nrm_akt < nrm_min: # if actual step is better than min
nrm_min = copy(nrm_akt)
nrms.append(copy(nrm_min))
i_bad = 0 # reset i_bad counter
xi_ph_best[:] = xi_ph[:]
ff_best[:] = ff[:]
pt_best[:] = pt[:]
rj_best[:] = rj[:]
jj_sc_best[:] = jj_sc[:]
jj_t_sc_best[:] = jj_t_sc[:]
improved_result = True
else:
i_bad += 1
if i_bad > n_bad:
break
if nrm_min < ftol:
break
# make step, compute xi_ph, pt, and rj
"""
xii_sc = xi_sc + dot(inv(dot(ssei*jj_sc,jj_t_sc)+(1.0+g)*sai),
(dot(jj_sc,ssei*(y_rfl-ff))-ssai*(xi_sc-xa_sc))) # readable but slow
"""
try:
xii_sc = xi_sc + np.nan_to_num(
solve(se_jj_sa(ssei, ssai, jj_sc, g), (dot(jj_sc, ssei * (y_rfl - ff)) - ssai * (xi_sc - xa_sc))))
result["ssei"] = ssei
result["ssai"] = ssai
except np.linalg.linalg.LinAlgError as err:
print("LinalgError")
raise err
if np.isnan(np.sum(xii_sc)):
raise ValueError()
apply_bounds(xii_sc, xi_sc, self.scale_up, self.scale_dn)
xi_ph = self.x_scaled_to_phys(xii_sc)
rj[:] = xi_ph[-self.n_rho_lin:]
pt[:self.n_jac_at] = xi_ph[:self.n_jac_at]
xi_ph, yi_ph = self.ptrh2x(pt, rj)
xi_sc = self.x_phys_to_scaled(xi_ph)
result["ff"] = ff_best
result["apriori_errors_sc"] = apriori_errors_sc
result["pt"] = pt_best
result["rj"] = rj_best
if compute_errors:
while True:
try:
ss = np.sqrt((np.diagonal(inv(dot(jj_sc_best * ssei, jj_t_sc_best) + sai))))
except UnboundLocalError:
ff_best, jj_sc_best, jj_t_sc_best = __ff_jj_sc__(self, xi=yi_ph_best, sc_jj=sc_jj)
else:
break
rj_s[:] = ss[-self.n_rho_lin:]
pt_s[:self.n_jac_at] = self.x_scaled_to_phys(ss, offset_scaling=0.0)[:self.n_jac_at]
result["pt_s"] = pt_s
result["rj_s"] = rj_s
if improved_result:
result["rho_ac"] = self.reflectance_boa(pt=pt_best, toa_reflectance=y_rfl)
result["rho_ac"][result["rho_ac"] > 1.0] = 0.0
result["rho_ac"][result["rho_ac"] < 0.0] = 0.0
result["rho_rj"] = copy(self.rj_to_rho_J(rj_best)[0])
result["rho_rjp"] = copy(self.rj_to_rho_J(rj_best + rj_s)[0])
result["rho_rjm"] = copy(self.rj_to_rho_J(rj_best - rj_s)[0])
else:
result["rho_ac"] = rho_rfl
result["rho_rj"] = rho_rfl
if debug:
print("EEooFF, runtime:%.3fs" % (time() - t0,))
print(len(pt0) * "%.3f," % tuple(pt0))
print(len(pt) * "%.3f," % tuple(pt_best))
print(nrms)
return result
[docs]
class Rho2Rj(object):
def __init__(self, rho_wvl):
self.rho_wvl = rho_wvl
self.n_rho_lin = len(rho_wvl)
def __call__(self, rho):
return rho
[docs]
class Rj2RhoJ(object):
def __init__(self, rho_wvl):
self.rho_wvl = rho_wvl
self.n_rho_lin = len(rho_wvl)
def __call__(self, rj):
return rj, np.diag(rj)
[docs]
def rho_wvl_lin_int(wvl, rho_wvl=None, n_rho_max=None, extra_points=None, linear_segments=None, min_wvl_difference=1.0):
if rho_wvl is not None and n_rho_max is None: # only rho_wvl given
pass
elif n_rho_max is not None and rho_wvl is None: # only n_rho_max given
rho_wvl = np.linspace(np.floor(wvl[0]), np.ceil(wvl[-1]), n_rho_max)
else:
ValueError("Give either rho_wvl or n_rho_max.s")
if linear_segments is not None:
rho_wvl = np.unique(np.hstack((rho_wvl, np.hstack(linear_segments))))
for ex in linear_segments:
rho_wvl = (lambda x: rho_wvl[np.invert(np.logical_and(x[0] < rho_wvl, rho_wvl < x[1]))])(ex)
if extra_points is not None:
rho_wvl = np.unique(np.hstack((rho_wvl, np.array(extra_points))))
try:
while np.min(np.abs(np.diff(rho_wvl[:-1]))) < min_wvl_difference:
rho_wvl = np.delete(rho_wvl, 1 + np.argmin(np.abs(np.diff(rho_wvl[:-1]))))
except ValueError:
pass
return rho_wvl
# noinspection PyPep8Naming
[docs]
class Rho2Rj_LinInterp(object):
def __init__(self, **kwargs):
self.rho_wvl = rho_wvl_lin_int(**kwargs)
self.n_rho_lin = len(self.rho_wvl)
self.wvl = kwargs["wvl"]
if 'linear_segments' in kwargs:
self.linear_segments = kwargs["linear_segments"]
def __call__(self, rho):
return np.interp(x=self.rho_wvl, xp=self.wvl, fp=rho)
# noinspection PyPep8Naming
[docs]
class Rj2RhoJ_LinInterp(object):
def __init__(self, **kwargs):
self.wvl = kwargs["wvl"]
self.n_wvl = len(self.wvl)
if 'linear_segments' in kwargs:
self.linear_segments = kwargs["linear_segments"]
self.rho_wvl = rho_wvl_lin_int(**kwargs)
self.n_rho_lin = len(self.rho_wvl)
self.rho = np.zeros(self.n_wvl, dtype=float)
self.J_rho = np.zeros((self.n_rho_lin, self.n_wvl), dtype=float)
def __call__(self, rj):
int_rho(self.n_rho_lin, self.rho_wvl, rj, self.n_wvl, self.wvl, self.rho, self.J_rho)
return self.rho, self.J_rho
# noinspection PyPep8Naming
[docs]
class Rho2Rj_PCA(object):
def __init__(self, components, mean, rho_wvl, bounds=None):
assert len(rho_wvl) == len(mean)
assert len(rho_wvl) == components.shape[1]
self.rho_wvl = rho_wvl
self.wvl = rho_wvl
self.components = components
self.mean = mean
self.n_rho_lin = self.components.shape[0]
if bounds is None:
self.bounds_up = +1 * np.ones(self.n_rho_lin)
self.bounds_dn = -1 * np.ones(self.n_rho_lin)
else:
self.bounds_up = +1 * np.abs(bounds)
self.bounds_dn = -1 * np.abs(bounds)
def __call__(self, rho):
return np.dot(self.components, rho - self.mean)
# noinspection PyPep8Naming
[docs]
class Rj2RhoJ_PCA(Rho2Rj_PCA):
def __call__(self, rj):
rho = np.dot(rj, self.components) + self.mean
rho[rho < 0.0] = 0.0
rho[rho > 1.0] = 1.0
return rho, self.components
[docs]
class Rho2Rj_const(object):
def __init__(self, rho_wvl):
self.wvl = rho_wvl
self.n_rho_lin = len(rho_wvl)
def __call__(self, rho):
return rho * np.ones(self.n_rho_lin)
[docs]
class Rj2RhoJ_const(object):
def __init__(self, rho_wvl):
self.wvl = rho_wvl
self.n_rho_lin = len(rho_wvl)
def __call__(self, rj):
rho = rj * np.ones(self.n_rho_lin)
return rho, np.diag(rho)
# noinspection PyProtectedMember,PyShadowingNames
[docs]
class RtFo(object):
def __init__(self, atm_tables_fn,
dim_scat,
dim_atm=("spr", "coz", "cwv", "tmp"),
dim_vza="vza",
dim_sza="sza",
dim_azi="azi",
dim_pca="pca",
dim_wvl="wvl",
dim_layer="layer",
table_path="",
buffer_tables=None,
only_toa=False,
n_pcs=99,
dtype=np.float32,
slices=None,
hash_formats=None,
hash_format_default="%.3f,",
interpol_dim_range=range(1, 12),
sensors=None,
response_function_threshold=1e-5,
sensor_interpolation_reference=["sensor", "pca"][0],
sol_irr_pca=None,
default_sensor=None, **_
):
"""
1. Build Forward Operator Object including relevant data from LUT
2. Interpolation of atmospheric functions for a specific spectral subset,
returns object which can be used to compute toa reflectance, radiance and surface reflectance
:param atm_tables_fn: path to LUT
:param dim_scat: key of relevant scattering parameter (tau)
:param dim_atm: keys of relevant atmospheric parameters
:param dim_vza: key of viewing zenith angle
:param dim_sza: key of sun zenith angle
:param dim_azi: key of sun azimuth angle
:param dim_pca: key of principle components
:param dim_wvl: key of wavelengths
:param dim_layer: key of atmospheric layer
:param table_path: path to aerosol table within LUT
:param buffer_tables: AC table for specific sensors (smile)
:param only_toa: use only TOA fluxes
:param n_pcs: number of principle components
:param dtype: data type of values stored in dimensions dictionary
:param slices: part of LUT which should be considered
:param hash_formats: fixed length of the parameter values
:param hash_format_default: default length of the parameter values
:param interpol_dim_range: maximum dimension of interpolation
:param sensors: dict containing wavelength, response function and solar irradiances of input sensor
:param response_function_threshold: threshold for sensor response function
:param sensor_interpolation_reference: interpolation reference
:param sol_irr_pca: principle components of solar irradiance
:param default_sensor: default sensor
:param _:
"""
# check input parameters
if slices is None:
slices = {}
if hash_formats is None:
hash_formats = {}
assert type(dim_scat) == list, "dim_scat is not list"
assert all(isinstance(item, str) for item in dim_scat), "should be string with dim name for hdf5 file"
assert type(n_pcs) == int
assert type(table_path) == str, "table_path should be path string in hdf5 file"
assert type(only_toa) == bool
assert all(isinstance(item, int) for item in interpol_dim_range), "interpol_dim_range should only contain ints"
assert type(slices) == dict, "Slices should be dictionary with dim_name:slice as key:value pairs"
assert type(hash_formats) == dict, "hash_patterns should be dictionary"
assert type(response_function_threshold) == float, "response_function_threshold should be small float number"
# load interpolation classes
self.interp = {ii: importlib.import_module("sicor.Tools.NM.interp_spectral_n_%i" % ii) for ii in
interpol_dim_range}
# open LUT hdf5 file and group as defined in table_path
with tables.open_file(atm_tables_fn, "r") as atm_tables_h5:
self.atm_tables_h5 = atm_tables_h5
self.atm_tables = atm_tables_h5.root
self.grp = self.atm_tables.__getattr__(table_path)
# copy dimension data names to self, collect in dim_names
self.wvl_pca = self.atm_tables.__getattr__(dim_wvl).read()
self.wvl = self.wvl_pca
self.n_wvl = len(self.wvl_pca)
self.dtype = dtype
self.dim_scat = dim_scat
self.dim_atm = dim_atm
self.dim_vza = dim_vza
self.dim_sza = dim_sza
self.dim_azi = dim_azi
self.dim_pca = dim_pca
self.dim_layer = dim_layer
self.dim_names = list(self.dim_atm) + list(self.dim_scat) + [
self.dim_vza, self.dim_sza, self.dim_azi, self.dim_pca, self.dim_layer]
self.sza_reduced = None
self.vza_reduced = None
# set given slices, default is all elements
self.slices = {dim: slice(None) for dim in self.dim_names}
for dim, slc in slices.items():
self.slices[dim] = slc
# same same for hash formats
self.hash_formats = {dim: hash_format_default for dim in self.dim_names}
for dim, slc in hash_formats.items():
self.hash_formats[dim] = slc
# set number of principal components, add slice for pca dimension
self.n_pca_max = self.grp.E_DN.components.shape[0]
self.n_pcs = np.min([self.n_pca_max, n_pcs])
self.slices[self.dim_pca] = slice(0, self.n_pcs)
# set dictionary for dimension_name:scale
self.dims = {dim: np.array(self.atm_tables.__getattr__(dim)[self.slices[dim]], dtype=self.dtype) for dim in
self.dim_names}
# convert arccos to degree
self.dims[self.dim_vza] = np.rad2deg(np.arccos(self.dims[self.dim_vza]))
self.dims[self.dim_sza] = np.rad2deg(np.arccos(self.dims[self.dim_sza]))
# set sensor wavelengths, response function and solar irradiances
self.sensors = sensors
if self.sensors is not None:
self.response_function_threshold = response_function_threshold
self.sensor_interpolation_reference = sensor_interpolation_reference
self.wvl_sensors = {sensor_name: sensor["wvl_inst"] for sensor_name, sensor in self.sensors.items()}
self.sol_irr_sensors = {sensor_name: sensor["sol_irr"] / np.pi for sensor_name, sensor in
self.sensors.items() if sensor["sol_irr"] is not None}
for name, sensor in self.sensors.items():
self.__instrument_val__(sensor)
self.components = "components"
self.mean = "mean"
if sol_irr_pca is not None:
self.sol_irr_pca = sol_irr_pca / np.pi
else:
self.sol_irr_pca = None
self.sol_irr = self.sol_irr_pca
# each atmospheric function is defined as dictionary with common types as defined in __mk_table__
# serves as preparation for the atmospheric dimension interpolation
# __mk_table__ creates a spectral subset of the LUT to match the wavelengths (or subset) of the input sensor
# process atmospheric path reflection and upward transmittance only at TOA
if only_toa:
if buffer_tables is not None and "L0" in buffer_tables:
self.L0 = buffer_tables["L0"]
else:
self.L0 = self.__mk_tble__(
grp=self.grp.L0_TOA,
dims=(list(self.dim_atm) + list(self.dim_scat) +
[self.dim_vza, self.dim_sza, self.dim_azi, self.dim_pca])
)
if buffer_tables is not None and "T_UP" in buffer_tables:
self.T_UP = buffer_tables["T_UP"]
else:
self.T_UP = self.__mk_tble__(
grp=self.grp.T_UP_TOA,
dims=list(self.dim_atm) + list(self.dim_scat) + [self.dim_vza, self.dim_pca],
idx=self.L0["dims"]
)
# process atmospheric path reflection and upward transmittance including the layer dimension
else:
self.L0 = self.__mk_tble__(
grp=self.grp.L0,
dims=(list(self.dim_atm) + list(self.dim_scat) +
[self.dim_layer, self.dim_vza, self.dim_sza, self.dim_azi, self.dim_pca])
)
self.T_UP = self.__mk_tble__(
grp=self.grp.T_UP,
dims=list(self.dim_atm) + list(self.dim_scat) + [self.dim_layer, self.dim_vza, self.dim_pca],
idx=self.L0["dims"]
)
self.pt_dims = copy(self.L0["dims"])
self.pt_dims.remove(self.dim_pca)
self.jacobean_switch = {dim_name: 1 for dim_name in self.pt_dims}
if buffer_tables is not None and "E_DN" in buffer_tables:
self.E_DN = buffer_tables["E_DN"]
else:
self.E_DN = self.__mk_tble__(grp=self.grp.E_DN,
dims=list(self.dim_atm) + list(self.dim_scat) + [self.dim_sza,
self.dim_pca],
idx=self.L0["dims"]
)
if buffer_tables is not None and "SS" in buffer_tables:
self.SS = buffer_tables["SS"]
else:
self.SS = self.__mk_tble__(grp=self.grp.SS,
dims=list(self.dim_atm) + list(self.dim_scat) + [self.dim_pca],
idx=self.L0["dims"]
)
# settings for interpolation between atmospheric dimensions
self.__jacobean__ = False
self.__caching__ = False
self.__jacobean_range__ = self.SS["idx"]
self.n_jac_at = len(self.__jacobean_range__)
self.n_jac_fl = len(self.L0["idx"])
self.interpolation_settings(jacobean=self.__jacobean__,
caching=self.__caching__)
self.idx_dim_vza = self.L0["dims"].index(self.dim_vza)
self.idx_dim_sza = self.L0["dims"].index(self.dim_sza)
if default_sensor is not None:
self.set_sensor(default_sensor)
self.rho = None
self.J_rho = None
self.n_rho_lin = None
self.bounds_atm = None
self.bounds_up = None
self.bounds_dn = None
self.scale_up = None
self.scale_dn = None
self.rho_to_rj = None
self.rj_to_rho_J = None
self.LUTS = (self.L0, self.E_DN, self.T_UP, self.SS)
self.LUT_suffixs = []
self.current_sensor = ""
# noinspection PyPep8Naming
[docs]
def set_rho_lin(self, Rho2Rj, Rj2RhoJ):
"""
Assign the surface reflectance model to the forward operator.
:param Rho2Rj: Model to obtain a first guess of surface reflectance values for selected absorption feature
wavelengths.
:param Rj2RhoJ: Surface reflectance model. Following models can be chosen: Rj2RhoJ_LinInterp, Rj2RhoJ_PCA or
Rj2RhoJ_const for Sentinel-2; Rj2RhoJ_LinX1X2 for EnMAP single CH4/CWV Retrieval or Rj2RhoJ_Beer_Lambert for
EnMAP simultaneous 3 phases of water retrieval.
:return: None
"""
# test api for surface reflectance model class
assert hasattr(Rho2Rj, '__call__')
assert hasattr(Rj2RhoJ, '__call__')
assert Rho2Rj.n_rho_lin == Rj2RhoJ.n_rho_lin
self.rho_to_rj = Rho2Rj
self.rj_to_rho_J = Rj2RhoJ
# number of surface reflectance values (i.e. number of the respective absorption feature wavelengths) used by
# the chosen model
self.n_rho_lin = Rho2Rj.n_rho_lin
# Assign lower and upper bounds for the values of atmospheric state parameters
self.bounds_atm = np.array(self.L0["range"])
self.bounds_up = np.ones(self.n_jac_fl + self.n_rho_lin)
self.bounds_up[:self.n_jac_fl] = self.bounds_atm[:self.n_jac_fl, 1]
self.bounds_dn = np.zeros(self.n_jac_fl + self.n_rho_lin)
self.bounds_dn[:self.n_jac_fl] = self.bounds_atm[:self.n_jac_fl, 0]
try:
self.bounds_up[self.n_jac_fl:] = Rho2Rj.bounds_up
self.bounds_dn[self.n_jac_fl:] = Rho2Rj.bounds_dn
except AttributeError:
pass
# assign lower and upper bounds for scaling of the values of atmospheric state parameters
self.scale_up = 1.0 + np.zeros(self.bounds_up.shape[0])
self.scale_dn = 0.0 + np.zeros(self.bounds_up.shape[0])
# check if instrument wavelengths equal model walenghths
if np.sum(np.abs(self.wvl - self.rho_to_rj.wvl)) > 0.0:
print("Warning, do something!")
[docs]
def ptrh2x(self, pt, rj):
return np.hstack((pt[self.__jacobean_range__], rj)), np.hstack((pt, rj))
[docs]
def x_phys_to_scaled(self, xx, offset_scaling=1.0):
"""
Convert physical values of atmospheric state parameters to scaled values with respect to the upper bounds..
:param xx: array containing physical values of atmospheric state parameters
:param offset_scaling: offset scaling
:return: array containing scaled values of atmospheric state parameters
(ratio of the max. value as defined in LUT)
"""
aa = (self.scale_up - self.scale_dn) / (self.bounds_up - self.bounds_dn)
bb = self.scale_dn - aa * self.bounds_dn
return aa * xx + bb * offset_scaling
[docs]
def x_scaled_to_phys(self, xx, offset_scaling=1.0):
"""
Convert scaled values of atmospheric parameters to physical values.
:param xx: array containing scaled values of atmospheric state parameters
(ratio of the max. value as defined in LUT)
:param offset_scaling: offset scaling
:return: array containing physical values of atmospheric state parameters
"""
aa = (self.bounds_up - self.bounds_dn) / (self.scale_up - self.scale_dn)
bb = self.bounds_dn - aa * self.scale_dn
return aa * xx + bb * offset_scaling
[docs]
def pt(self, **kwargs):
pt = np.zeros(len(self.pt_dims))
for ii, key in enumerate(self.pt_dims):
pt[ii] = kwargs[key]
return pt
@staticmethod
def __instrument_val__(instrument):
for key in ["wvl_inst", "wvl_rsp", "rspf"]:
assert type(instrument[key]) == np.ndarray, "%s should be numpy ndarray"
assert instrument["rspf"].shape == (
len(instrument["wvl_inst"]), len(instrument["wvl_rsp"])), "Shape of rspf is wrong."
@staticmethod
def __component_key__(sensor_name):
return sensor_name + "_components"
@staticmethod
def __mean_key__(sensor_name):
return sensor_name + "_mean"
[docs]
def set_sensor(self, sensor_name=None):
self.rho_to_rj = None
self.rj_to_rho_J = None
if sensor_name is None:
self.components = "components"
self.mean = "mean"
self.wvl = self.wvl_pca
self.sol_irr = self.sol_irr_pca
self.current_sensor = sensor_name
else:
assert self.sensors is not None, "Sensors were not defined on init."
assert sensor_name in self.sensors, "Sensor %s were not defined. Defined are:%s" % (
sensor_name, str(self.sensors.keys()))
self.components = self.__component_key__(sensor_name)
self.mean = self.__mean_key__(sensor_name)
self.wvl = self.wvl_sensors[sensor_name]
self.sol_irr = self.sol_irr_sensors[sensor_name] if sensor_name in self.sol_irr_sensors else None
self.current_sensor = sensor_name
self.n_wvl = len(self.wvl)
[docs]
def interpolation_settings(self, jacobean, caching):
"""
Set Jacobean and/or caching to True or False.
:param jacobean: True or False
:param caching: True or False
:return: None
"""
assert type(jacobean) == bool
assert type(caching) == bool
for interp in [self.E_DN, self.SS, self.T_UP, self.L0]:
interp["int"].settings(jacobean, caching)
self.__jacobean__ = jacobean
self.__caching__ = caching
# noinspection PyProtectedMember,PyDictCreation
def __mk_tble__(self, grp, dims, idx=None):
"""
Build a subset of LUT for each atmospheric function to match the spectral subset of the input sensor.
:param grp: atmospheric function
:param dims: parameters of the atmospheric state the respective function depends on
:param idx: indices dims as stored in LUT
:return: spectral subset of LUT for the respective atmospheric function
"""
assert type(grp) == tables.group.Group, "grp should be hdf5 group"
assert all(isinstance(item, str) for item in dims), "dims should be list of strings with dim names"
# set up table dict
tble = {}
# noinspection PyProtectedMember
# fill dict with information from LUT and set interpolation parameters
tble["pathname"] = grp._v_pathname
tble["dims"] = dims
tble["components"] = np.array(grp.components[0:self.n_pcs, :], dtype=self.dtype)
tble["mean"] = np.array(grp.mean[:], dtype=self.dtype)
tble["range"] = [(self.dims[dim][0], self.dims[dim][-1]) for dim in tble["dims"]]
tble["n_dims"] = len(tble["dims"]) - 1 # only for interpolation, spectral dim doesn't count
# set up the jacobian with the same dimension as the atmospheric state
tble["jacobean_switch"] = np.ones((tble["n_dims"], self.n_pcs))
# load table data to memory, according to given slice
tble["dat"] = np.array(grp.pca[tuple([self.slices[dim] for dim in tble["dims"]])], dtype=self.dtype)
if idx is None:
tble["idx"] = np.arange(tble["n_dims"])
else:
tble["idx"] = np.array([idx.index(dim) for dim in tble["dims"][:-1]], dtype=int)
# instance the n-dimensional interpolation function (n = number of dimensions of atmospheric state)
tble["int"] = self.interp[tble["n_dims"]].intp(data=tble["dat"],
axes=[self.dims[dim] for dim in tble["dims"][:-1]],
hash_pattern="".join(
[self.hash_formats[dim] for dim in tble["dims"][:-1]]))
if self.sensors is not None:
# build LUT subset with respect to either the sensor response wavelengths (default) or the wavelengths
# stored in the LUT.
# for now this is super slow - but we do this only once and premature optimization might not be needed here
for sensor_name, sensor in tqdm(self.sensors.items()):
components = self.__component_key__(sensor_name)
mean = self.__mean_key__(sensor_name)
tble[components] = np.zeros((self.n_pcs, len(self.sensors[sensor_name]["wvl_inst"])))
tble[mean] = np.zeros(len(self.sensors[sensor_name]["wvl_inst"]))
if self.sensor_interpolation_reference == "sensor":
sels = [(lambda x: slice(x[0], x[1]))(
np.where(sensor["rspf"][jj_sensor, :] > self.response_function_threshold)[0][[0, -1]]) for
jj_sensor in range(len(sensor["wvl_inst"]))]
for jj_sensor, sel in zip(range(len(sensor["wvl_inst"])), sels):
smpl = np.interp(x=sensor["wvl_rsp"][sel], xp=self.wvl_pca, fp=tble["mean"], left=0.0,
right=0.0)
buf = np.trapz(x=sensor["wvl_rsp"][sel], y=sensor["rspf"][jj_sensor, sel] * smpl)
tble[mean][jj_sensor] = buf
for jj_pc in range(self.n_pcs):
for jj_sensor, sel in zip(range(len(sensor["wvl_inst"])), sels):
smpl = np.interp(x=sensor["wvl_rsp"][sel], xp=self.wvl_pca, fp=tble["components"][jj_pc, :])
buf = np.trapz(x=sensor["wvl_rsp"][sel], y=sensor["rspf"][jj_sensor, sel] * smpl)
tble[components][jj_pc, jj_sensor] = buf
elif self.sensor_interpolation_reference == "pca":
for jj_sensor in range(len(sensor["wvl_inst"])):
smpl = np.interp(x=self.wvl_pca, xp=sensor["wvl_rsp"], fp=sensor["rspf"][jj_sensor, :],
left=0.0, right=0.0)
buf = np.trapz(x=self.wvl_pca, y=tble["mean"][:] * smpl)
tble[mean][jj_sensor] = buf
for jj_pc in range(self.n_pcs):
for jj_sensor in range(len(sensor["wvl_inst"])):
smpl = np.interp(x=self.wvl_pca, xp=sensor["wvl_rsp"], fp=sensor["rspf"][jj_sensor, :],
left=0.0, right=0.0)
buf = np.trapz(x=self.wvl_pca, y=tble["components"][jj_pc, :] * smpl)
tble[components][jj_pc, jj_sensor] = buf
else:
raise ValueError("self.sensor_interpolation_reference=\"%s\" is not implemented" %
self.sensor_interpolation_reference)
return tble
def __interpolate__(self, func, pt):
"""
Interpolate within atmospheric function for a given state.
:param func: atmospheric function (E_DN, L0, SS, T_UP)
:param pt: array containing atmospheric state parameters
:return: interpolated spectrum and its jacobean of atmospheric function
"""
try:
if func["int"].__jacobean__: # we have a Jacobean
ii, jj = func["int"](pt[func["idx"]]) # e.g., sicor.Tools.NM.interp_spectral_n_3.intp(pt[0, 2, 6])
# reconstruction of the spectrum from the principle components using matrix multiplication
spectrum = np.dot(ii, func[self.components]) + func[self.mean]
jacobean = np.dot(jj * func["jacobean_switch"], func[self.components])
if self.__jacobean_range__ is None:
return spectrum, jacobean
else:
return spectrum, jacobean[self.__jacobean_range__, :]
else: # spectral scalar return case
return np.dot(func["int"](pt[func["idx"]]), func[self.components]) + func[self.mean]
except ValueError as err:
raise err
[docs]
def set_jacobean_switch(self, **kwargs):
""" update self.jacobean_switch and update function jacobean switch matrices """
for key, value in kwargs.items():
self.jacobean_switch[key] = value
for func in [self.T_UP, self.E_DN, self.SS, self.L0]:
self.__set_jacobean_switch__(func)
def __set_jacobean_switch__(self, func):
""" edit jacobean switch matrix of function according to self.jacobean_switch """
for ii, dim in enumerate(func["dims"][:-1]):
func["jacobean_switch"][ii] = self.jacobean_switch[dim]
[docs]
def ee(self, pt):
"""
Interpolate E_DN for a given atmospheric state pt.
:param pt: atmospheric state
:return: solar downward reflection at the surface
"""
return self.__interpolate__(self.E_DN, pt)
[docs]
def tt(self, pt):
"""
Interpolate T_UP for a given atmospheric state pt.
:param pt: atmospheric state
:return: upward transmittance
"""
return self.__interpolate__(self.T_UP, pt)
[docs]
def ss(self, pt):
"""
Interpolate SS for a given atmospheric state pt.
:param pt: atmospheric state
:return: spherical albedo
"""
return self.__interpolate__(self.SS, pt)
[docs]
def l0(self, pt):
"""
Interpolate L0 for a given atmospheric state pt.
:param pt: atmospheric state
:return: atmospheric path reflectance
"""
return self.__interpolate__(self.L0, pt)
[docs]
def reflectance_to_radiance(self, reflectance, pt):
"""
Convert TOA reflectance to radiance.
:param reflectance: TOA reflectance
:param pt: atmospheric state
:return: TOA radiance
"""
return reflectance * self.sol_irr * self.mu_sun(pt)
[docs]
def radiance_to_reflectance(self, radiance, pt, in_place=False):
"""
Convert TOA radiance to reflectance.
:param radiance: TOA radiance
:param pt: atmospheric state
:param in_place: if True, convert input radiance array without returning a new one
:return: if in_place=False, TOA reflectance, else None
"""
if in_place is True:
radiance_bf = radiance.reshape((-1, radiance.shape[-1])).T
radiance_bf /= self.mu_sun(pt).reshape(-1)
radiance /= self.sol_irr
else:
return radiance / (self.sol_irr * self.mu_sun(pt))
[docs]
def reflectance_toa(self, pt, rho):
"""
Calculate TOA reflectance for a given atmospheric state and for given surface reflectance
by interpolating in the LUT and applying the simplified solution of the RTE.
:param pt: atmospheric state
:param rho: surface reflectance
:return: TOA reflectance
"""
if self.__jacobean__:
self.interpolation_settings(jacobean=False, caching=self.__caching__)
ee = self.ee(pt)
tt = self.tt(pt)
ss = self.ss(pt)
l0 = self.l0(pt)
# this is the simplified solution of the RTE
return (ee * rho * tt / (-rho * ss + 1) + l0 * np.pi) / self.mu_sun(pt)
[docs]
def radiance_toa(self, pt, rho):
"""
Convert TOA reflectance, calculated by the simplified solution of the RTE, to radiance.
:param pt: atmospheric state
:param rho: surface reflectance
:return: TOA radiance
"""
return self.reflectance_to_radiance(self.reflectance_toa(pt, rho), pt)
[docs]
def reflectance_toa_j(self, pt):
"""
Calculate TOA reflectance for a given atmospheric state by interpolating in the LUT and applying
the simplified solution of the RTE. The needed surface reflectance value is derived from
the predefined surface reflectance model. Additionally, the Jacobian is calculated.
:param pt: atmospheric state
:return: TOA reflectance and the Jacobian
"""
if self.__jacobean__ is False:
self.interpolation_settings(jacobean=True, caching=self.__caching__)
assert len(pt) >= max(self.L0["idx"]) + + self.n_rho_lin
ee, j_ee = self.ee(pt)
tt, j_tt = self.tt(pt)
ss, j_ss = self.ss(pt)
l0, j_l0 = self.l0(pt)
# calculate surface reflectance and its Jacobian from the surface reflectance model
rho, j_rho = self.rj_to_rho_J(pt[-self.n_rho_lin:])
# calculate TOA reflectance
ll = ee * rho * tt / (-rho * ss + 1) + l0 * np.pi
# calculate Jacobian
jj = np.zeros((self.n_jac_at + self.n_rho_lin, self.n_wvl))
jj[0:self.n_jac_at, :] = (j_l0 * np.pi * (rho * ss - 1) ** 2 + j_ss * ee * rho ** 2 * tt - rho * (
j_ee * tt + j_tt * ee) * (rho * ss - 1)) / (rho * ss - 1) ** 2
jj[self.n_jac_at:, :] = j_rho * ee * tt / (rho * ss - 1) ** 2
return ll, jj
[docs]
def radiance_toa_j(self, pt):
"""
Convert TOA reflectance, calculated by the simplified solution of the RTE, to radiance.
Additionally, the Jacobian is calculated.
:param pt: atmospheric state
:return: TOA radiance and the Jacobian
"""
ll, jj = self.reflectance_toa_j(pt)
return (self.reflectance_to_radiance(ll, pt),
self.reflectance_to_radiance(jj, pt))
[docs]
def mu_sun(self, pt):
"""
Compute cosine of solar incidence angle for given background state pt.
:param pt: pt numpy ndarray which holds background information for data, e.g. 1-D array with
state (water vapor, observation angles, ...) or e.g. 3-D array with first two dimensions
for the 'image' and the last for the state
:return: cosine of solar angle
"""
if self.idx_dim_sza is not None:
assert isinstance(pt, np.ndarray)
return np.cos(np.deg2rad(
pt.__getitem__((pt.ndim-1)*[slice(None)] + [self.idx_dim_sza])))
else:
return np.cos(np.deg2rad(self.sza_reduced))
[docs]
def reflectance_boa(self, pt, toa_radiance=None, toa_reflectance=None):
"""
Calculate surface reflectance for a given atmospheric state and for given TOA reflectance or radiance
by interpolating in the LUT and applying the simplified solution of the RTE.
:param pt: atmospheric state
:param toa_radiance: TOA radiance
:param toa_reflectance: TOA reflectance
:return: surface reflectance
"""
if self.__jacobean__:
self.interpolation_settings(jacobean=False, caching=self.__caching__)
# if TOA radiance is given, convert to TOA reflectance
if toa_reflectance is None and toa_radiance is not None:
toa_reflectance = self.radiance_to_reflectance(toa_radiance, pt)
ll = toa_reflectance
ee = self.ee(pt)
tt = self.tt(pt)
ss = self.ss(pt)
l0 = self.l0(pt)
ld = ll - l0 * np.pi
return ld / (ee * tt + ld * ss)
[docs]
def reflectance_boa_j(self, pt, toa_radiance=None, toa_reflectance=None):
"""
Calculate surface reflectance for a given atmospheric state and for given TOA reflectance or radiance
by interpolating in the LUT and applying the simplified solution of the RTE.
Additionally, the Jacobian is calculated.
:param pt: atmospheric state
:param toa_radiance: TOA radiance
:param toa_reflectance: TOA reflectance
:return: surface reflectance and the Jacobian
"""
if self.__jacobean__ is False:
self.interpolation_settings(jacobean=True, caching=self.__caching__)
# if TOA radiance is given, convert to TOA reflectance
if toa_reflectance is None and toa_radiance is not None:
toa_reflectance = self.radiance_to_reflectance(toa_radiance, pt)
ll = toa_reflectance
ee, j_ee = self.ee(pt)
tt, j_tt = self.tt(pt)
ss, j_ss = self.ss(pt)
l0, j_l0 = self.l0(pt)
ld = ll - l0 * np.pi
# calculate surface reflectance
rr = ld / (ee * tt + ld * ss)
# calculate Jacobian
jj = -(j_l0 * np.pi * (ee * tt + ss * (ll - l0 * np.pi)) + (ll - l0 * np.pi) * (
j_ee * tt - j_l0 * np.pi * ss + j_ss * (ll - l0 * np.pi) + j_tt * ee)) / (ee * tt +
ss * (ll - l0 * np.pi)) ** 2
return rr, jj
def __reduce_LUT__(self, fkt, reduce_suffix, reduce_dims):
"""
Reduce dimension of LUT by defining fixed values for chosen state parameters.
:param fkt: table of atmospheric function
:param reduce_suffix: suffix of the name for LUT subset
:param reduce_dims: state parameters to be fixed
:return: None
"""
def get_int_dx(dims, value):
"""
dims: numpy array, monotonly rising, scale
value: scalar, within dims
returns: lower index for bin containing value, fraction of value within bin
"""
ii, dd = None, None # init count variables with None, will raise error if not found
for ii, dd in enumerate(dims):
if dd >= value:
break
return ii - 1, (dd - value) / (dd - dims[ii - 1])
principal_keys = ["range", "idx", "dims", "dat", "jacobean_switch"]
for key in principal_keys: # rename principal keys to key_orig if necessary
new_key = key + "_orig"
if new_key not in fkt:
fkt[new_key] = fkt.pop(key)
if "int_orig" not in fkt:
fkt["int_orig"] = fkt.pop("int")
dat = copy(fkt["dat_orig"])
idx = list(copy(fkt["idx_orig"]))
dms = copy(fkt["dims_orig"])
rng = copy(fkt["range_orig"])
jac = copy(fkt["jacobean_switch_orig"])
# perform reduction through linear interpolation
for reduce_dim, value in reduce_dims.items():
if reduce_dim in dms:
rdx = dms.index(reduce_dim)
rii, rpp = get_int_dx(self.dims[reduce_dim], value)
slc1 = len(dat.shape) * [slice(None)]
slc2 = copy(slc1)
slc1[rdx] = rii
slc2[rdx] = rii + 1
dat = rpp * dat[tuple(slc1)] + (1.0 - rpp) * dat[tuple(slc2)]
_ = dms.pop(rdx)
_ = idx.pop(rdx)
_ = rng.pop(rdx)
jac = np.delete(jac, rdx, axis=0)
idx = np.array(idx, dtype=int)
for key, val in [("dat", dat), ("idx", idx), ("dims", dms), ("range", rng), ("jacobean_switch", jac)]:
fkt["%s_%s" % (key, reduce_suffix)] = val
fkt["reduce_dims_%s" % reduce_suffix] = reduce_dims
fkt["int_%s" % reduce_suffix] = self.interp[len(dat.shape) - 1].intp(data=fkt["dat_%s" % reduce_suffix],
caching=True,
axes=[self.dims[dim] for dim in
fkt["dims_%s" % reduce_suffix][:-1]],
hash_pattern="".join(
[self.hash_formats[dim] for dim in
fkt["dims_%s" % reduce_suffix][:-1]]),
)
[docs]
def reduce_luts(self, reduce_suffix, reduce_dims, set_new_luts=True):
"""
Function to call for reducing dimension of LUT.
:param reduce_suffix: suffix of the name for LUT subset
:param reduce_dims: state parameters to be fixed
:param set_new_luts:
:return: None
"""
for fkt in self.LUTS:
self.__reduce_LUT__(fkt, reduce_suffix, reduce_dims)
if reduce_suffix not in self.LUT_suffixs:
self.LUT_suffixs.append(reduce_suffix)
if set_new_luts is True:
self.set_luts(reduce_suffix)
[docs]
def set_luts(self, suffix):
"""
Assigns a LUT to the forward operator.
:param suffix: suffix of the LUT's name
:return: None
"""
principal_keys = {"range", "idx", "dims", "dat", "int", "jacobean_switch"}
for fkt in self.LUTS:
for key in principal_keys:
try:
fkt[key] = fkt["%s_%s" % (key, suffix)]
except KeyError:
if suffix != "orig":
raise
self.__jacobean_range__ = np.arange(len(self.SS["idx"]))
self.n_jac_at = len(self.__jacobean_range__)
self.n_jac_fl = len(self.L0["idx"])
try:
self.idx_dim_sza = self.L0["dims"].index(self.dim_sza)
except ValueError:
self.idx_dim_sza = None
self.sza_reduced = self.L0["reduce_dims_%s" % suffix]["sza"]
# noinspection PyBroadException
try:
self.idx_dim_vza = self.L0["dims"].index(self.dim_vza)
except Exception:
self.idx_dim_vza = None
self.vza_reduced = self.L0["reduce_dims_%s" % suffix]["vza"]
[docs]
def get_wvl_idx(self, data_wvls, sensor_name=None):
"""
:param sensor_name:
:param data_wvls: numpy array with wavelength as used in the data
:return: numpy array with ints which give the closest indices to the wavelength in data_wvls
"""
if sensor_name is None:
return np.array([np.argmin(np.abs(data_wvls - wv)) for wv in self.wvl], dtype=int)
else:
return np.array([np.argmin(np.abs(data_wvls - wv)) for wv in self.wvl_sensors[sensor_name]], dtype=int)
# noinspection PyProtectedMember,PyDictCreation,PyUnresolvedReferences
def __minimize__(pt_index, p0, pt_names, data, opt_func, zoom_factor=None, opt_range="full",
opt_options=None, update_p0=True, zoom_interpolation_order=1, debug=False,
monitor=True, processes=None):
if not opt_options:
opt_options = {"maxiter": 50, "disp": False}
if processes is None:
processes = np.max([1, int(np.ceil(cpu_count() / 2) - 2)])
if zoom_factor is None:
zoom_factor = np.ones(3, dtype=float)
elif len(zoom_factor) == 1:
zoom_factor = np.array([zoom_factor, zoom_factor, 1.0], dtype=float)
elif len(zoom_factor) == 2:
zoom_factor = np.array([zoom_factor[0], zoom_factor[1], 1.0], dtype=float)
else:
raise ValueError("Wrong zoom_factor")
globs = {}
globs["__p0__"] = __zm__(p0[:, :, pt_index], zoom_factor, order=zoom_interpolation_order)
globs["__ff__"] = opt_func
globs["__opt_options__"] = opt_options
globs["__data__"] = __zm__(data, zoom_factor, order=zoom_interpolation_order)
res_shape = list(globs["__data__"].shape[:2]) + [opt_func.n_x]
globs["__res__"] = SharedNdarray(res_shape, default_value=np.NaN)
globs["__norm__"] = SharedNdarray(res_shape[:2])
globs["__meta__"] = SharedNdarray(res_shape[:2] + [data.shape[2]])
if opt_range == "full":
rng = list(product(np.arange(0, res_shape[0], 1), np.arange(0, res_shape[1], 1)))
else:
funcs = (np.ceil, np.floor, np.ceil)
opt_range_x = [int(fun(val)) for fun, val in zip(funcs, np.array(opt_range[0]) * zoom_factor[0])]
opt_range_y = [int(fun(val)) for fun, val in zip(funcs, np.array(opt_range[1]) * zoom_factor[1])]
rng = list(product(np.arange(*opt_range_x), np.arange(*opt_range_y)))
if monitor is True:
with Pool(processes=processes, initializer=initializer, initargs=(globals(), globs,)) as pl:
result = pl.map_async(__min__, rng)
p_bar = pyprind.ProgBar(result._number_left, monitor=True, width=130)
while not result.ready():
p_bar.cnt = p_bar.max_iter - result._number_left
p_bar.update()
p_bar.cnt -= 1
sleep(1)
pl.close()
pl.join()
else:
if debug is True:
initializer(globals(), globs)
for rr in rng:
__min__(rr)
else:
with Pool(processes=processes, initializer=initializer, initargs=(globals(), globs,)) as pl:
pl.map(__min__, rng)
pl.close()
pl.join()
for ii, (up, dn) in enumerate(zip(opt_func.x_bounds_up, opt_func.x_bounds_dn)):
globs["__res__"].np[globs["__res__"].np[:, :, ii] > up, ii] = up
globs["__res__"].np[globs["__res__"].np[:, :, ii] < dn, ii] = dn
res = __rezoom__(globs["__res__"].np, data.shape, order=zoom_interpolation_order)
norm = __rezoom__(globs["__norm__"].np, data.shape[:2], order=zoom_interpolation_order)
meta = __rezoom__(globs["__meta__"].np, data.shape, order=zoom_interpolation_order)
for ii, (up, dn) in enumerate(zip(opt_func.x_bounds_up, opt_func.x_bounds_dn)):
res[res[:, :, ii] > up, ii] = up
res[res[:, :, ii] < dn, ii] = dn
if update_p0 is True:
res_to_pt = [list(pt_names).index(dim) for dim in opt_func.jjsc_names if dim in pt_names]
if opt_range == "full":
p0[:, :, res_to_pt] = res[:, :, :opt_func.n_atm]
else:
slx = slice(*opt_range[0])
sly = slice(*opt_range[1])
p0[slx, sly, res_to_pt] = res[slx, sly, :opt_func.n_atm]
elif update_p0 is False:
pass
else:
res_to_pt = [list(pt_names).index(dim) for dim in globs["__ff__"].jjsc_names if
dim in pt_names and dim in update_p0]
res_id = [ii for ii, dim in enumerate(globs["__ff__"].dim_names) if dim in update_p0]
print(res_to_pt, res_id)
if opt_range == "full":
p0[:, :, res_to_pt] = res[:, :, res_id]
else:
slx = slice(*opt_range[0])
sly = slice(*opt_range[1])
p0[slx, sly, res_to_pt] = res[slx, sly, res_id]
del globs["__norm__"]
del globs["__res__"]
del globs["__meta__"]
return res, meta, norm
# noinspection PyUnresolvedReferences
def __min__(ii, debug=False, only_success=False):
"""
Core function of the optimization process. Optimization is done by looping through all pixels.
:param ii: row/column index of each enmap pixel
:param debug:
:param only_success:
:return:
"""
from scipy.optimize import minimize # import here to avoid static TLS ImportError
i1, i2 = ii
if __data__[i1, i2, 0] == 0.0:
return
if __opt_options__ is None:
__res__[i1, i2, :] = __ff__.fo.x_scaled_to_phys(__ff__.x0(p0=__p0__[i1, i2, :], data=__data__[i1, i2, :]))
else:
if np.all(np.isfinite(__data__[i1, i2, :])):
ress_full = [
minimize(fun=__ff__, x0=__ff__.x0(p0=__p0__[i1, i2, :], data=__data__[i1, i2, :], first_guess=fg),
args=(__p0__[i1, i2, :], __data__[i1, i2, :]), jac=True,
method='BFGS', options=__opt_options__)
for fg in [0.06, 0.1]]
# choose the optimization with the lowest norm value between modeled and measured TOA reflectance
# => best one
ress_full = sorted(ress_full, key=lambda k: k['fun'])
res = ress_full[0]
if debug is True:
print(__opt_options__)
print((__p0__[i1, i2, :], __data__[i1, i2, :]))
print(__ff__.x0(p0=__p0__[i1, i2, :], data=__data__[i1, i2, :]))
return res
else:
if only_success is True:
if res["success"] is True:
__res__[i1, i2, :] = __ff__.x_scaled_to_phys(res["x"])
else:
__res__[i1, i2, :] = np.NaN
else:
# create array containing the optimization result, the norms and the modeled TOA reflectance
__res__[i1, i2, :] = __ff__.x_scaled_to_phys(res["x"])
__norm__[i1, i2] = np.sqrt(res["fun"]) / np.mean(__data__[i1, i2, :])
__meta__[i1, i2, :] = __ff__(res["x"], __p0__[i1, i2, :], __data__[i1, i2, :], model_output=True)
def __rezoom__(res, data_shape, order=3):
from scipy.ndimage import zoom # import here to avoid static TLS ImportError
sh0, sh1 = np.array(res.shape, dtype=float), np.array(res.shape, dtype=float)
sh1[:2] = np.array(data_shape)[:2]
fac = sh1 / sh0
if np.mean(fac) == 1.0:
return np.copy(res)
else:
return zoom(input=res, zoom=fac, order=order)
def __zm__(data, fac, order=3):
from scipy.ndimage import zoom # import here to avoid static TLS ImportError
if fac is None:
return data
elif np.mean(fac) == 1.0:
return data
else:
try:
return zoom(data, fac, output=data.dtype, order=order)
except RuntimeError:
return zoom(np.array(data, dtype=np.float32), fac, output=np.float32, order=order)
# noinspection PyShadowingNames
[docs]
class FF(object):
def __init__(self, fo, optimize_dims_atm, weight=None, wvl_sel=slice(None)):
self.weight = weight
self.fo = fo
self.optimize_dims_atm = optimize_dims_atm
self.n_rho_lin = self.fo.n_rho_lin
self.n_atm = len(self.optimize_dims_atm)
self.n_x = self.n_rho_lin + self.n_atm
self.wvl_sel = wvl_sel
if not all([dim in self.fo.L0["dims"] for dim in self.optimize_dims_atm]):
raise ValueError("Missing:", self.optimize_dims_atm, self.fo.L0["dims"])
self.atm_idx = np.array([idx for dim, idx in zip(self.fo.L0["dims"], self.fo.L0["idx"])], dtype=int)
self.atm_opt_idx = np.array(
[idx for dim, idx in zip(self.fo.L0["dims"], self.fo.L0["idx"]) if dim in self.optimize_dims_atm],
dtype=int)
self.atm_bgr_idx = np.array(
[idx for dim, idx in zip(self.fo.L0["dims"], self.fo.L0["idx"]) if dim not in self.optimize_dims_atm],
dtype=int)
dim_names = list(optimize_dims_atm) + [self.fo.dim_names[dx] for dx in self.atm_bgr_idx]
self.dim_names = [dim for dim in self.fo.L0["dims"] if dim in dim_names] + self.n_rho_lin * ["rj"]
# not all lut parameters are in the jacobean (view directions)
jjsc = (self.fo.bounds_up - self.fo.bounds_dn) / (self.fo.scale_up - self.fo.scale_dn)
self.jjsc = [jj for jj, dim in zip(jjsc, self.dim_names) if dim == "rj" or dim in self.fo.SS["dims"]]
self.jjsc_names = [dim for jj, dim in zip(jjsc, self.dim_names) if dim == "rj" or dim in self.fo.SS["dims"]]
self.pt = np.zeros(np.max(self.fo.L0["idx"]) + 1 + # atmospheric indices
self.fo.n_rho_lin, # surface indices
dtype=float)
self.pt_bounds_up = np.zeros(len(self.pt))
self.pt_bounds_dn = np.zeros(len(self.pt))
for ii, idx in enumerate(self.fo.L0["idx"]):
self.pt_bounds_up[idx] = self.fo.bounds_up[ii]
self.pt_bounds_dn[idx] = self.fo.bounds_dn[ii]
try:
self.pt_bounds_up[-self.n_rho_lin:] = self.fo.rj_to_rho_J.bounds_up[:]
except AttributeError:
self.pt_bounds_up[-self.n_rho_lin:] = 1.0
try:
self.pt_bounds_dn[-self.n_rho_lin:] = self.fo.rj_to_rho_J.bounds_dn[:]
except AttributeError:
self.pt_bounds_dn[-self.n_rho_lin:] = 0.0
self.n_atm_all = len(self.atm_idx)
self.n_x_all = self.n_rho_lin + self.n_atm_all
self.x_bounds_up = np.zeros(self.n_x)
self.x_bounds_dn = np.zeros(self.n_x)
self.x_bounds_up[:self.n_atm] = self.pt_bounds_up[self.atm_opt_idx]
self.x_bounds_dn[:self.n_atm] = self.pt_bounds_dn[self.atm_opt_idx]
try:
self.x_bounds_up[self.n_atm:] = self.fo.rj_to_rho_J.bounds_up
except AttributeError:
self.x_bounds_up[self.n_atm:] = 1.0
try:
self.x_bounds_dn[self.n_atm:] = self.fo.rj_to_rho_J.bounds_dn
except AttributeError:
self.x_bounds_dn[self.n_atm:] = 0.0
self.jac_sel = np.array([False] * len(self.jjsc))
for ii, dim in enumerate(self.jjsc_names):
if dim in self.optimize_dims_atm or dim == "rj":
self.jac_sel[ii] = True
[docs]
@staticmethod
def apply_bounds(xii, bounds_up, bounds_dn):
nn = xii.shape[0]
for ii in range(nn):
if xii[ii] >= bounds_up[ii]:
xii[ii] = bounds_up[ii]
if xii[ii] < bounds_dn[ii]:
xii[ii] = bounds_dn[ii]
def __norm__(self, aa, bb):
if self.weight is None:
return (aa - bb) ** 2
else:
return self.weight * (aa - bb) ** 2
def __d_bb_norm__(self, aa, bb):
if self.weight is None:
return -2 * (aa - bb)
else:
return -2 * (aa - bb) * self.weight
[docs]
def x_phys_to_scaled(self, xx):
return (xx - self.x_bounds_dn) / (self.x_bounds_up - self.x_bounds_dn)
[docs]
def x_scaled_to_phys(self, xx):
return (self.x_bounds_up - self.x_bounds_dn) * xx + self.x_bounds_dn
[docs]
def x0(self, p0=None, data=None, first_guess=None):
"""
Estimation of the first guess solution x0.
x0 contains a first guess solution of parameter concentration as well as estimated reflectance values at
the two continuum levels of the chosen absorption feature.
:param p0:
:param data:
:param first_guess:
:return:
"""
xx = np.ones(self.n_x) # initial first guess
if p0 is not None and data is not None:
self.pt[self.atm_opt_idx] = p0[self.atm_opt_idx]
self.pt[self.atm_bgr_idx] = p0[self.atm_bgr_idx]
self.apply_bounds(self.pt, self.pt_bounds_up, self.pt_bounds_dn)
xx[-self.n_rho_lin:] = self.fo.rho_to_rj(
self.fo.reflectance_boa(pt=self.pt, toa_reflectance=data))
# copy first guess for atm
xx[:self.n_atm] = p0[self.atm_opt_idx]
self.apply_bounds(xx, self.x_bounds_up, self.x_bounds_dn)
# values are scaled => (value - lower bound) / (upper bound - lower bound)
xx = self.x_phys_to_scaled(xx)
if first_guess is not None:
xx[:self.n_atm] = first_guess
return xx
def __call__(self, xx, pt, dt, xx_is_scaled=True, model_output=False):
if xx_is_scaled: # scale back
xx = self.x_scaled_to_phys(xx)
self.pt[self.atm_opt_idx] = xx[:self.n_atm] # copy x to pt
self.pt[self.atm_bgr_idx] = pt[self.atm_bgr_idx] # copy bg data
self.pt[-self.n_rho_lin:] = xx[-self.fo.n_rho_lin:] # copy surface data
self.apply_bounds(self.pt, self.pt_bounds_up, self.pt_bounds_dn)
ff, jj = self.fo.reflectance_toa_j(self.pt)
if model_output is True:
return ff
else:
f = (np.sum(self.__norm__(dt[self.wvl_sel], ff[self.wvl_sel])))
j = (np.sum(jj[:, self.wvl_sel] * self.__d_bb_norm__(dt[self.wvl_sel], ff[self.wvl_sel]), axis=1))
if xx_is_scaled:
j = j * self.jjsc
n = float(len(dt))
return f / n, j[self.jac_sel] / n
# noinspection PyDictCreation
if __name__ == "__main__":
solar = SolarIrradiance(path_thuillier="./databases/solar_irradiance/Solar_irradiance_Thuillier_2002.xls",
path_fontenla="./databases/solar_irradiance/SUNp1fontenla.asc",
path_earth_sun_distance='./databases/solar_distance/Earth_Sun_distances_per_day_edited.csv'
)
instruments = {
"sat_1": sat(wvl_inst=np.arange(450, 650, 20.0), wvl_rsp=np.arange(250, 950, 0.2), sigma=5.0, solar=solar),
"sat_2": sat(wvl_inst=np.arange(1450, 1650, 2.0), wvl_rsp=np.arange(1250, 2050, 0.2), sigma=5.0, solar=solar),
"EnMap_Gauss": sat(wvl_inst=np.arange(450, 2450, 5.0), wvl_rsp=np.arange(400, 2540, 0.1), sigma=7.0,
solar=solar),
"MS": sat(wvl_inst=np.arange(450, 2450, 25.0), wvl_rsp=np.arange(400, 2540, 0.1), sigma=15.0, solar=solar),
"MS2": sat(wvl_inst=np.arange(450, 2450, 125.0), wvl_rsp=np.arange(400, 2540, 0.1), sigma=45.0, solar=solar)}
wvl_enmap, sigmas_enmap = (lambda x: (x[:, 0], x[:, 1]))(np.loadtxt("./databases/SRF_EnMap.txt"))
instruments["EnMap"] = sat(wvl_inst=wvl_enmap, wvl_rsp=np.arange(wvl_enmap[0], wvl_enmap[-1], 0.2),
sigma=sigmas_enmap, solar=solar)
atm_tables_fn = "./linear_atm_functions_ncwv_3_npre_3_ncoz_3_ntmp_4_wvl_350.0_2550.0_1.00_pca.h5"
self = RtFo(atm_tables_fn=atm_tables_fn,
n_pcs=15,
hash_formats={'spr': '%.0f,', 'coz': '%.0f,', 'cwv': '%.0f,', 'tmp': '%0f,', 'tau_a': '%.2f,',
'vza': '%.0f,'},
slices={"tmp": slice(0, 4, 1)},
dim_scat=["tau_a"],
table_path="/table_aerosol/type_1",
only_toa=False,
sensors=instruments,
sensor_interpolation_reference=["sensor", "pca", "error"][0])
fig = figure(figsize=(10, 7))
self.set_sensor()
rho_pca = -1 * (self.wvl_pca - self.wvl_pca[0]) ** 2 / 10 ** 7 + 0.5
pt = np.array([1015.0, 350., 1.5, 0.0, 0.1, 0.0, 0.0, 0.0, 0.0])
plot(self.wvl, self.reflectance_toa(pt, rho_pca), "0.7")
for sat, col, ls in zip(['EnMap_Gauss', 'EnMap'], ["r", "c", "c", "y", "b", "m"],
[".", ".", ".", "-", ".", ".", ".", "."]):
self.set_sensor(sat)
plot(self.wvl, self.reflectance_toa(pt, np.interp(x=self.wvl, xp=self.wvl_pca, fp=rho_pca)), linestyle=ls,
color=col, markersize=3, marker="<")
show()
print("EEooFF")