Source code for sicor.Tools.EnMAP.multiprocessing

#!/usr/bin/env python
# coding: utf-8

# SICOR is a freely available, platform-independent software designed to process hyperspectral remote sensing data,
# and particularly developed to handle data from the EnMAP sensor.

# This file contains multiprocessing tools.

# Copyright (C) 2018  Niklas Bohn (GFZ, <nbohn@gfz-potsdam.de>),
# German Research Centre for Geosciences (GFZ, <https://www.gfz-potsdam.de>)

# This software was developed within the context of the EnMAP project supported by the DLR Space Administration with
# funds of the German Federal Ministry of Economic Affairs and Energy (on the basis of a decision by the German
# Bundestag: 50 EE 1529) and contributions from DLR, GFZ and OHB System AG.

# This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
# version.

# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

# You should have received a copy of the GNU General Public License along with this program.
# If not, see <https://www.gnu.org/licenses/>.


import numpy as np
from multiprocessing import sharedctypes
from time import sleep


[docs] class SharedNdarray(object): """Wrapper class, which collects all necessary instances to make a numpy ndarray accessible as shared memory when using multiprocessing, it exposes the numpy array via three different views which can be used to access it globally. __init__ provides the mechanism to make this array available in each worker, best used using the provided __initializer__.""" def __init__(self, dims, typecode="f", default_value=None): """Instance of SharedNdarray object. C Type Python Type Minimum size in bytes 'b' signed char int 1 'B' unsigned char int 1 'u' Py_UNICODE Unicode character 2 (1) 'h' signed short int 2 'H' unsigned short int 2 'i' signed int int 2 'I' unsigned int int 2 'l' signed long int 4 'L' unsigned long int 4 'q' signed long long int 8 (2) 'Q' unsigned long long int 8 (2) 'f' float float 4 'd' double float 8 :param dims: tuple of dimensions which is used to instantiate a ndarray using np.zero :param typecode: typecode of values :param default_value: default fill value of array """ self.dims = tuple([int(dim) for dim in dims]) self.typecode = typecode self.sh = sharedctypes.Array(self.typecode, int(np.prod(self.dims)), lock=False) self.np = np.ctypeslib.as_array(self.sh).reshape(self.dims) if default_value is not None: self.np[:] = default_value
[docs] def _initializer(self, globals_dict, name): """This adds to globals while using the ctypes library view of [SharedNdarray instance].sh to make the numpy view of [SharedNdarray instance] globally available. :param globals_dict: dictionary of global variables :param name: name of variable """ globals_dict[name] = np.ctypeslib.as_array(self.sh).reshape(self.dims)
[docs] def initializer(globals_dict, add_to_globals): """Globs shall be dict with name:value pairs, when executed value will be added to globals under the name name, if value provides a _initializer attribute this one is called instead. This makes most sense when called as initializer in a multiprocessing pool, e.g. Pool(initializer=__initializer__, initargs=(globs,)). :param globals_dict: dictionary of global variables :param add_to_globals: name of variable dict to be added to globs """ for name, value in add_to_globals.items(): try: # noinspection PyProtectedMember value._initializer(globals_dict, name) except AttributeError: globals_dict[name] = value
[docs] def mp_progress_bar(iter_list, results, bar): """Print progress bar for multiprocessing tasks. :param iter_list: multiprocessing iterable :param results: results object :param bar: progress bar object """ sleep(.1) # noinspection PyProtectedMember numberDone = len(iter_list) - results._number_left bar.print_progress(percent=numberDone / len(iter_list) * 100)