from __future__ import absolute_import, division, print_function, unicode_literals
import warnings
import six
try:
from itertools import imap
except ImportError:
imap = map
from tqdm import tqdm
import numpy as np
try:
from xvfbwrapper import Xvfb
prerequisites = True
except ImportError:
prerequisites = False
from ..data import Data
from ..utils.utility import lengths, contains_nan
from ..utils.logger import get_logger
from .base import ParameterBase
from .parallel import Parallel
[docs]class RunModel(ParameterBase):
"""
Calculate model and feature results for a series of different model parameters,
and store them in a Data object.
Parameters
----------
model : {None, Model or Model subclass instance, model function}, optional
Model to perform uncertainty quantification on. For requirements see
Model.run.
Default is None.
parameters: {dict {name: parameter_object}, dict of {name: value or Chaospy distribution}, ...], list of Parameter instances, list [[name, value or Chaospy distribution], ...], list [[name, value, Chaospy distribution or callable that returns a Chaospy distribution],...],}
List or dictionary of the parameters that should be created.
On the form ``parameters =``
* ``{name_1: parameter_object_1, name: parameter_object_2, ...}``
* ``{name_1: value_1 or Chaospy distribution, name_2: value_2 or Chaospy distribution, ...}``
* ``[parameter_object_1, parameter_object_2, ...]``,
* ``[[name_1, value_1 or Chaospy distribution], ...]``.
* ``[[name_1, value_1, Chaospy distribution or callable that returns a Chaospy distribution], ...]``
features : {None, Features or Features subclass instance, list of feature functions}, optional
Features to calculate from the model result.
If None, no features are calculated.
If list of feature functions, all will be calculated.
Default is None.
logger_level : {"info", "debug", "warning", "error", "critical", None}, optional
Set the threshold for the logging level. Logging messages less severe
than this level is ignored. If None, no logging to file is performed.
Default logger level is "info".
CPUs : {int, None, "max"}, optional
The number of CPUs to use when calculating the model and features.
If None, no multiprocessing is used.
If "max", the maximum number of CPUs on the computer
(multiprocess.cpu_count()) is used.
Default is "max".
Attributes
----------
model : uncertainpy.Model or subclass of uncertainpy.Model
The model to perform uncertainty quantification on.
parameters : uncertainpy.Parameters
The uncertain parameters.
features : uncertainpy.Features or subclass of uncertainpy.Features
The features of the model to perform uncertainty quantification on.
CPUs : int
The number of CPUs used when calculating the model and features.
See Also
--------
uncertainpy.features.Features
uncertainpy.Parameter
uncertainpy.Parameters
uncertainpy.models.Model
uncertainpy.models.Model.run : Requirements for the model run function.
"""
def __init__(self,
model,
parameters,
features=None,
logger_level="info",
CPUs="max"):
if CPUs == "max":
import multiprocess
CPUs = multiprocess.cpu_count()
self._parallel = Parallel(model=model,
features=features,
logger_level=logger_level)
super(RunModel, self).__init__(model=model,
parameters=parameters,
features=features,
logger_level=logger_level)
self.CPUs = CPUs
@ParameterBase.features.setter
def features(self, new_features):
ParameterBase.features.fset(self, new_features)
self._parallel.features = self.features
@ParameterBase.model.setter
def model(self, new_model):
ParameterBase.model.fset(self, new_model)
self._parallel.model = self.model
[docs] def apply_interpolation(self, results, feature):
"""
Perform interpolation of one model/feature using the interpolation
objects created by Parallel.
Parameters
----------
results : list
A list where each element is a result dictionary for each set
of model evaluations.
An example:
.. code-block:: Python
result = {self.model.name: {"values": array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])},
"feature1d": {"values": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])},
"feature0d": {"values": 1,
"time": np.nan},
"feature2d": {"values": array([[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])},
"feature_adaptive": {"values": array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
"interpolation": scipy interpolation object},
"feature_invalid": {"values": np.nan,
"time": np.nan}}
results = [result 1, result 2, ..., result N]
feature: str
Name of a feature or the model.
Returns
-------
time : array_like
The time array with the greatest number of time steps.
interpolated_results : list
A list containing all interpolated model/features results.
Interpolated at the points of the time results with the greatest
number of time steps.
Notes
-----
Chooses the time array with the highest number of time points and use
this time array to interpolate the model/feature results in each of
those points. If an interpolation is None, gives numpy.nan instead.
"""
logger = get_logger(self)
time_lengths = []
for result in results:
time_lengths.append(len(result[feature]["time"]))
index_max_len = np.argmax(time_lengths)
time = results[index_max_len][feature]["time"]
interpolated_results = []
for result in results:
interpolation = result[feature]["interpolation"]
if interpolation is None:
interpolated_results.append(np.nan)
logger.error("{}: Unknown error while creating the interpolation".format(feature))
elif isinstance(interpolation, six.string_types):
interpolated_results.append(np.nan)
logger.error(interpolation)
else:
interpolated_results.append(interpolation(time))
return time, interpolated_results
[docs] def results_to_data(self, results):
"""
Store `results` in a Data object.
Stores the time and (interpolated) results for the model and each
feature in a Data object. Performs the interpolation calculated in
Parallel, if the result is irregular.
Parameters
----------
results : list
A list where each element is a result dictionary for each set
of model evaluations.
An example:
.. code-block:: Python
result = {self.model.name: {"values": array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])},
"feature1d": {"values": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])},
"feature0d": {"values": 1,
"time": np.nan},
"feature2d": {"values": array([[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])},
"feature_adaptive": {"values": array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
"interpolation": scipy interpolation object},
"feature_invalid": {"values": np.nan,
"time": np.nan}}
results = [result 1, result 2, ..., result N]
Returns
-------
data : Data object
A Data object with time and (interpolated) results for the model and
each feature.
Notes
-----
Sets the following in data, if applicable:
1. ``data["model/features"].evaluations``, which contains all ``values``
2. ``data["model/features"].time``
3. ``data["model/features"].labels``
4. ``data.model_name``
See Also
--------
uncertainpy.Data
"""
logger = get_logger(self)
data = Data(logger_level=self._logger_level)
# Add features and labels
for feature in results[0]:
data.add_features(feature)
if feature == self.model.name:
data[feature]["labels"] = self.model.labels
elif feature in self.features.labels:
data[feature]["labels"] = self.features.labels[feature]
data.model_name = self.model.name
data.model_ignore = self.model.ignore
def add_results(results, data, feature):
data[feature].time = []
data[feature].evaluations = []
for result in results:
data[feature].evaluations.append(result[feature]["values"])
data[feature].time.append(result[feature]["time"])
# results = self.regularize_nan_results(results)
# Check if features are irregular without being specified as a interpolate
# TODO if the feature is irregular, perform the complete interpolation here instead
# for feature in data:
# if (feature == self.model.name and not (self.model.ignore or self.model.interpolate)) \
# or (feature != self.model.name and feature not in self.features.interpolate):
# if not self.is_regular(results, feature):
# data.error.append(feature)
# data[feature].time = []
# data[feature].evaluations = []
# for result in results:
# data[feature].evaluations.append(result[feature]["values"])
# data[feature].time.append(result[feature]["time"])
# raise ValueError("{}: The number of points varies between evaluations.".format(feature)
# + " Try setting interpolate".format(feature))
# Store all results in data, interpolate as needed
# TODO: save raw result instead of interpolated result?
for feature in data:
# Interpolate the data if it is irregular, and ignore the model if required
if feature in self.features.interpolate or \
(feature == self.model.name and self.model.interpolate and not self.model.ignore):
# TODO implement interpolation of >= 2d data, part2
if np.ndim(results[0][feature]["values"]) >= 2:
# raise NotImplementedError("Feature: {feature},".format(feature=feature)
# + " no support for >= 2D interpolation")
logger.error("{feature}:".format(feature=feature)
+ " no support for >= 2D interpolation implemented")
add_results(results, data, feature)
elif np.ndim(results[0][feature]["values"]) == 1:
data[feature].time, data[feature].evaluations = self.apply_interpolation(results, feature)
# Interpolating a 0D result makes no sense, so if a 0D feature
# is supposed to be interpolated store it as normal
elif np.ndim(results[0][feature]["values"]) == 0:
logger.warning("{feature}: ".format(feature=feature) +
"returns a 0D result. No interpolation is performed.")
data[feature].time = results[0][feature]["time"]
data[feature].evaluations = []
for result in results:
data[feature].evaluations.append(result[feature]["values"])
elif feature == self.model.name and self.model.ignore:
add_results(results, data, feature)
else:
# Check if features are irregular without being specified as a interpolate
# TODO if the feature is irregular, perform the complete interpolation here instead
if not self.is_regular(results, feature):
data.error.append(feature)
add_results(results, data, feature)
if feature == self.model.name:
msg = "{}: The number of points varies between evaluations. ".format(feature) + \
"Make sure {} returns the same number of points with different parameters, ".format(feature) + \
"implement Model.postprocess, or try to set interpolate=True."
else:
msg = "{}: The number of points varies between evaluations. ".format(feature) + \
"Make sure {} returns the same number of points, ".format(feature) + \
"or try add {} to interpolate=[].".format(feature)
logger.error(msg)
# raise ValueError("{}: The number of points varies between evaluations.".format(feature)
# + " Try setting interpolate".format(feature))
else:
# Store data from results in a Data object
data[feature].time = results[0][feature]["time"]
data[feature].evaluations = []
for result in results:
data[feature].evaluations.append(result[feature]["values"])
return data
[docs] def evaluate_nodes(self, nodes, uncertain_parameters):
"""
Evaluate the the model and calculate the features
for the nodes (values) for the uncertain parameters.
Parameters
----------
nodes : array
The values for the uncertain parameters
to evaluate the model and features for.
uncertain_parameters : list
A list of the names of all uncertain parameters.
Returns
-------
results : list
A list where each element is a result dictionary for each set
of model evaluations.
An example:
.. code-block:: Python
result = {self.model.name: {"values": array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])},
"feature1d": {"values": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])},
"feature0d": {"values": 1,
"time": np.nan},
"feature2d": {"values": array([[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])},
"feature_adaptive": {"values": array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
"interpolation": scipy interpolation object},
"feature_invalid": {"values": np.nan,
"time": np.nan}}
results = [result 1, result 2, ..., result N]
Raises
------
ImportError
If xvfbwrapper is not installed.
"""
if self.model.suppress_graphics:
if not prerequisites:
raise ImportError("Running with suppress_graphics require: xvfbwrapper")
vdisplay = Xvfb()
vdisplay.start()
results = []
model_parameters = self.create_model_parameters(nodes, uncertain_parameters)
if self.CPUs:
import multiprocess as mp
pool = mp.Pool(processes=self.CPUs)
# pool.map(self._parallel.run, model_parameters)
# chunksize = int(np.ceil(len(model_parameters)/self.CPUs))
chunksize = 1
for result in tqdm(pool.imap(self._parallel.run, model_parameters, chunksize),
desc="Running model",
total=len(nodes.T)):
results.append(result)
pool.close()
else:
for result in tqdm(imap(self._parallel.run, model_parameters),
desc="Running model",
total=len(nodes.T)):
results.append(result)
if self.model.suppress_graphics:
vdisplay.stop()
return results
[docs] def create_model_parameters(self, nodes, uncertain_parameters):
"""
Combine nodes (values) with the uncertain parameter names to create a
list of dictionaries corresponding to the model values for each
model evaluation.
Parameters
----------
nodes : array
A series of different set of parameters. The model and each feature is
evaluated for each set of parameters in the series.
uncertain_parameters : list
A list of names of the uncertain parameters.
Returns
-------
model_parameters : list
A list where each element is a dictionary with the model parameters
for a single evaluation.
An example:
.. code-block:: Python
model_parameter = {"parameter 1": value 1, "parameter 2": value 2, ...}
model_parameters = [model_parameter 1, model_parameter 2, ...]
"""
model_parameters = []
for node in nodes.T:
if node.ndim == 0:
node = [node]
# New set parameters
parameters = {}
for j, parameter in enumerate(uncertain_parameters):
parameters[parameter] = node[j]
for parameter in self.parameters:
if parameter.name not in parameters:
parameters[parameter.name] = parameter.value
model_parameters.append(parameters)
return model_parameters
[docs] def is_regular(self, results, feature):
"""
Test if `feature` in `results` is regular or not, meaning it has a
varying number of values for each evaluation. Ignores results that
contains numpy.nan.
Parameters
----------
results : list
A list where each element is a result dictionary for each set
of model evaluations.
An example:
.. code-block:: Python
result = {self.model.name: {"values": array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])},
"feature1d": {"values": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])},
"feature0d": {"values": 1,
"time": np.nan},
"feature2d": {"values": array([[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])},
"feature_adaptive": {"values": array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
"interpolation": scipy interpolation object},
"feature_invalid": {"values": np.nan,
"time": np.nan}}
results = [result 1, result 2, ..., result N]
feature: str
Name of a feature or the model.
Returns
-------
bool
True if the feature is regular or False if the feature is irregular.
"""
i = 0
for result in results:
i += 1
if not contains_nan(result[feature]["values"]):
values_prev = result[feature]["values"]
# If object array it is not regular
if isinstance(values_prev, np.ndarray):
tmp_array = values_prev
else:
tmp_array = np.array(values_prev)
if tmp_array.dtype is np.dtype("object"):
return False
break
for result in results[i:]:
values = result[feature]["values"]
# If object array it is not regular
if isinstance(values, np.ndarray):
tmp_array = values
else:
tmp_array = np.array(values)
if tmp_array.dtype is np.dtype("object"):
return False
if not contains_nan(values):
try:
if np.shape(values_prev) != np.shape(values):
return False
except ValueError:
if lengths(values_prev) != lengths(values):
return False
values_prev = values
return True
[docs] def run(self, nodes, uncertain_parameters):
"""
Evaluate the the model and calculate the features
for the nodes (values) for the uncertain parameters.
The results are interpolated as necessary.
Parameters
----------
nodes : array
A series of different set of parameters. The model and each feature is
evaluated for each set of parameters in the series.
uncertain_parameters : list
A list of names of the uncertain parameters.
Returns
-------
data : Data object
A Data object with time and (interpolated) results for
the model and each feature.
See Also
--------
uncertainpy.Data
"""
if isinstance(uncertain_parameters, six.string_types):
uncertain_parameters = [uncertain_parameters]
results = self.evaluate_nodes(nodes, uncertain_parameters)
data = self.results_to_data(results)
data.uncertain_parameters = uncertain_parameters
return data
# Currently not needed
[docs] def regularize_nan_results(self, results):
"""
Regularize arrays with that only contain numpy.nan values.
Make each result for each feature have the same the same shape, if they
only contain numpy.nan values.
Parameters
----------
results : list
A list where each element is a result dictionary for each set
of model evaluations.
An example:
.. code-block:: Python
result = {self.model.name: {"values": array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])},
"feature1d": {"values": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])},
"feature0d": {"values": 1,
"time": np.nan},
"feature2d": {"values": array([[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])},
"feature_adaptive": {"values": array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
"interpolation": scipy interpolation object},
"feature_invalid": {"values": np.nan,
"time": np.nan}}
results = [result 1, result 2, ..., result N]
Returns
-------
results : list
A list with where the only nan results have been regularized.
On the form:
.. code-block:: Python
result = {self.model.name: {"values": array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])},
"feature1d": {"values": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])},
"feature0d": {"values": 1,
"time": np.nan},
"feature2d": {"values": array([[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])},
"feature_adaptive": {"values": array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
"time": array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
"interpolation": scipy interpolation object},
"feature_invalid": {"values": np.nan,
"time": np.nan}}
results = [result 1, result 2, ..., result N]
"""
warnings.warn(
"regularize_nan_results is no longer used as nan results no longer are required to be regular.",
DeprecationWarning
)
def regularize(results, data):
"""
Parameters
---------
data : {"values", "time"}
Which data to regularize, either time or values
"""
features = results[0].keys()
for feature in features:
# Find shape of the first result that is not only nan values
shape = np.shape(results[0][feature][data])
for i in range(len(results)):
values = results[i][feature][data]
if not np.all(np.isnan(values)):
shape = np.shape(values)
break
# Find all results that is only nan, and change their shape if
# the shape is wrong
for i in range(len(results)):
values = results[i][feature][data]
if np.all(np.isnan(values)) and np.shape(values) != shape:
results[i][feature][data] = np.full(shape, np.nan, dtype=float)
return results
results = regularize(results, "values")
results = regularize(results, "time")
return results