Source code for daetools.solvers.minpack

# -*- coding: utf-8 -*-
"""********************************************************************************
                             minpack.py
                 DAE Tools: pyDAE module, www.daetools.com
                 Copyright (C) Dragan Nikolic
***********************************************************************************
DAE Tools is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License version 3 as published by the Free Software
Foundation. DAE Tools is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with the
DAE Tools software; if not, see <http://www.gnu.org/licenses/>.
********************************************************************************"""

import sys, numpy
from daetools.pyDAE import *
from time import localtime, strftime
from scipy.optimize import leastsq
from numpy.linalg import cholesky
from scipy.stats import distributions

"""
Levenberg–Marquardt algorithm (LMA) provides a numerical solution to the problem of 
minimizing a function, generally nonlinear, over a space of parameters of the function.
The main properties:
 - LMA is a blend of Gradient descent and Gauss-Newton iteration
 - LMA finds only a local minimum, not a global minimum
 - It only optimises for residual errors in the dependant variable (y);
   any errors in the independent variable are zero or considered to be negligible
 - Fobj = sum[ (yi - f(xi, p))^2 ]; i = [1, m]
 - Damping parameter λ (here given as argument called 'factor')
"""

# Function to calculate either Residuals or Jacobian matrix, subject to the argument calc_values
def Calculate(p, minpack, calc_values):
    # col_deriv must be True
    values = numpy.zeros( (minpack.Nmeasured_variables, minpack.Nexperiments, minpack.Ntime_points) )
    derivs = numpy.zeros( (minpack.Nparameters, minpack.Nmeasured_variables, minpack.Nexperiments, minpack.Ntime_points) )
    
    r_values = values.reshape( (minpack.Nmeasured_variables * minpack.Nexperiments * minpack.Ntime_points) )
    r_derivs = derivs.reshape( (minpack.Nparameters, minpack.Nmeasured_variables * minpack.Nexperiments * minpack.Ntime_points) )

    try:
        # Set the reporting times:
        if minpack.simulation.m.IsModelDynamic:
            times = minpack.experimental_data[0][0]
            minpack.simulation.ReportingTimes = times
            # print minpack.simulation.ReportingTimes
        
        for e in range(0, minpack.Nexperiments):
            # Set initial conditions, initial guesses, initially active states etc
            minpack.simulation.SetUpVariables()
        
            # Assign the input data for the simulation
            for i in range(0, minpack.Ninput_variables):
                minpack.x_input_variables[i].Value = minpack.experimental_data[e][1][i]
                # print minpack.x_input_variables[i].Name, minpack.x_input_variables[i].Value
            
            # Set the parameters values
            for i in range(0, minpack.Nparameters):
                if minpack.enforce_parameters_bounds == True:
                    if p[i] < minpack.parameters[i].LowerBound:
                        minpack.parameters[i].Value = minpack.parameters[i].LowerBound
                    elif p[i] > minpack.parameters[i].UpperBound:
                        minpack.parameters[i].Value = minpack.parameters[i].UpperBound
                    else:
                        minpack.parameters[i].Value = p[i]
                else:
                    minpack.parameters[i].Value = p[i]
                # print minpack.parameters[i].Name, minpack.parameters[i].Value
            
            # Run the simulation
            minpack.simulation.Reset()
            minpack.simulation.SolveInitial()
            minpack.simulation.ReportData(minpack.simulation.CurrentTime)
            
            ct = 0
            if minpack.simulation.m.IsModelDynamic:
                while minpack.simulation.CurrentTime < minpack.simulation.TimeHorizon:
                    t = minpack.simulation.NextReportingTime
                    # print 'Integrating from {0} to {1}...'.format(minpack.simulation.CurrentTime, t)

                    # Do not stop at discontinuity and do not report data around discontinuities!!!
                    minpack.simulation.IntegrateUntilTime(t, eDoNotStopAtDiscontinuity, False)
                    minpack.simulation.ReportData(minpack.simulation.CurrentTime)

                    for o in range(0, minpack.Nmeasured_variables):
                        values[o, e, ct]    = minpack.y_measured_variables[o].Value - minpack.experimental_data[e][2][o][ct]
                        derivs[:, o, e, ct] = minpack.y_measured_variables[o].Gradients
                        #print 'v =', values[o, e, ct]
                        #print 'd =', derivs[:, o, e, ct]
                    ct += 1
                
                if ct != minpack.Ntime_points:
                    raise RuntimeError('ct = {0} while Ntime_points = {1}'.format(ct, minpack.Ntime_points))
            else:
                for o in range(0, minpack.Nmeasured_variables):
                    values[o, e, 0]    = minpack.y_measured_variables[o].Value - minpack.experimental_data[e][2][o][0]
                    derivs[:, o, e, 0] = minpack.y_measured_variables[o].Gradients
                    #print 'v =', values[o, e, 0]
                    #print 'd =', derivs[:, o, e, 0]
        
    except Exception as e:
        print('Exception in function Calculate, for parameters values: {0}'.format(p))
        print(str(e))
    
    if minpack.print_residuals_and_jacobian:
        print('Parameters:', p)
        if calc_values:
            print('  Residuals:')
            print(r_values)
        else:
            print('  Derivatives:')
            print(r_derivs)
        
    if calc_values:
        return r_values
    else:
        return r_derivs
        
# Function to calculate residuals R = f(x_data, parameters) - y_data:
#   R[0], R[1], ..., R[n] 
def Residuals(p, args):
    minpack = args[0]
    return Calculate(p, minpack, True)
    
# Function to calculate Jacobian matrix for residuals R: 
# | dR[0]/dp[0] dR[0]/dp[1] ... dR[0]/dp[n] |
# | dR[1]/dp[0] dR[1]/dp[1] ... dR[1]/dp[n] |
# | ...                                     |
# | dR[n]/dp[0] dR[n]/dp[1] ... dR[n]/dp[n] |
def Derivatives(p, args):
    minpack = args[0]
    return Calculate(p, minpack, False)
    
[docs]class daeMinpackLeastSq: def __init__(self): self.simulation = None self.daesolver = None self.datareporter = None self.log = None self.parameters = [] self.x_input_variables = [] self.y_measured_variables = [] self.p0 = [] self.experimental_data = [] self.x2 = 0.0 self.Ndof = 0 self.sigma = 0.0 self.Nparameters = 0 self.Ninput_variables = 0 self.Nmeasured_variables = 0 self.Nexperiments = 0 self.Ntime_points = 0 self.minpack_leastsq_arguments = {} self.print_residuals_and_jacobian = False self.IsInitialized = False self.enforce_parameters_bounds = False # Results self.p_estimated = None self.cov_x = None self.infodict = None self.msg = "" self.ier = -1
[docs] def Initialize(self, simulation, daesolver, datareporter, log, **kwargs): # Get the inputs self.simulation = simulation self.daesolver = daesolver self.datareporter = datareporter self.log = log if (self.simulation == None): raise RuntimeError('The simulation object cannot be None') if (self.daesolver == None): raise RuntimeError('The daesolver object cannot be None') if (self.datareporter == None): raise RuntimeError('The datareporter object cannot be None') if (self.log == None): raise RuntimeError('The log object cannot be None') self.experimental_data = kwargs.get('experimental_data', None) self.print_residuals_and_jacobian = kwargs.get('print_residuals_and_jacobian', False) self.minpack_leastsq_arguments = kwargs.get('minpack_leastsq_arguments', {}) self.enforce_parameters_bounds = kwargs.get('enforce_parameters_bounds', False) # Call simulation.Initialize (with eParameterEstimation mode) self.simulation.SimulationMode = eParameterEstimation self.simulation.Initialize(self.daesolver, self.datareporter, self.log) self.simulation.RegisterData('') # Check the inputs if (self.simulation == None): raise RuntimeError('The simulation object cannot be None') if (self.log == None): raise RuntimeError('The log object cannot be None') if ( self.experimental_data == None ): raise RuntimeError('Argument [experimental_data] has not been provided') self.parameters = self.simulation.ModelParameters self.x_input_variables = self.simulation.InputVariables self.y_measured_variables = self.simulation.MeasuredVariables self.Nparameters = len(self.parameters) self.Ninput_variables = len(self.x_input_variables) self.Nmeasured_variables = len(self.y_measured_variables) if ( self.Nparameters == 0 ): raise RuntimeError('The number of parameters is equal to zero') if ( self.Ninput_variables == 0 ): raise RuntimeError('The number of input variables is equal to zero') if ( self.Nmeasured_variables == 0 ): raise RuntimeError('The number of measured variables is equal to zero') self.Nexperiments = len(self.experimental_data) if ( len(self.experimental_data) != self.Nexperiments ): raise RuntimeError('[experimental_data] must contain Nexperiments ({0}) number of tuples: ([times], [input_variables], [measured_variables])'.format(self.Nexperiments)) if ( len(self.experimental_data[0]) != 3 ): raise RuntimeError('Each experiment in [experimental_data] must contain tuple: ([time points], [input_variables], [measured_variables])') self.Ntime_points = len(self.experimental_data[0][0]) for exp in self.experimental_data: if ( len(exp) != 3 ): raise RuntimeError('Each experiment in [experimental_data] must contain tuple: ([time points], [input_variables], [measured_variables])') times = exp[0] xin = exp[1] yout = exp[2] if ( len(times) != self.Ntime_points ): raise RuntimeError('Number of time points in each experiment ({0}) must be {1}'.format(len(times), self.Ntime_points)) if ( len(xin) != self.Ninput_variables ): raise RuntimeError('Number of input variables in each experiment must be {0}'.format(self.Ninput_variables)) if ( len(yout) != self.Nmeasured_variables ): raise RuntimeError('Number of measured variables in each experiment must be {0}'.format(self.Nmeasured_variables)) for y in yout: if ( len(y) != self.Ntime_points ): raise RuntimeError('Number of values in each measured variable must be equal to number of time intervals') self.p0 = [] for p in self.parameters: self.p0.append(p.StartingPoint) self.IsInitialized = True self.NoOfCalculateCalls = 0
[docs] def Run(self): if self.IsInitialized == False: raise RuntimeError('daeMinpackLeastSq object has not been initialized') self.minpack_leastsq_arguments['full_output'] = True self.minpack_leastsq_arguments['col_deriv'] = True print('minpack.leastsq will proceed with the following arguments:') print(' ', self.minpack_leastsq_arguments) # Call scipy.optimize.leastsq (Minpack wrapper) self.p_estimated, self.cov_x, self.infodict, self.msg, self.ier = leastsq(Residuals, self.p0, Dfun = Derivatives, args = [self], **self.minpack_leastsq_arguments) # Check the info and calculate the least square statistics if self.ier not in [1, 2, 3, 4]: raise RuntimeError('MINPACK least square method failed ({0})'.format(self.msg)) self.x2 = (self.infodict['fvec']**2).sum() self.Ndof = self.Nexperiments * self.Nmeasured_variables - self.Nparameters self.sigma = numpy.sqrt(self.x2 / self.Ndof)
[docs] def Finalize(self): self.simulation.Finalize()
[docs] def getConfidenceCoefficient(self, confidence): Np = self.Nparameters Ndof = self.Nmeasured_variables * self.Ntime_points * self.Nexperiments - self.Nparameters alpha = 1 - confidence / 100.0 k = distributions.f.ppf(1-alpha, Np, Ndof) * self.sigma * self.Nparameters # print 'F(1-a = {0}, m = {1}, n = {2}) = {3}'.format(1-alpha, Np, Ndof, k) return k
# Returns tuple: [x], [y] coordinates of the ellipse and [x], [y] coordinates of the center (optimal point) # for the given parameter indexes
[docs] def getConfidenceEllipsoid(self, x_param_index, y_param_index, **kwargs): if self.IsInitialized == False: raise RuntimeError('daeMinpackLeastSq object has not been initialized') if (self.ier not in [1, 2, 3, 4]): # or (self.p_estimated == None) or (self.cov_x == None) or (self.infodict == None): raise RuntimeError('daeMinpackLeastSq Run has not been called') if (x_param_index < 0) or (x_param_index >= self.Nparameters): raise RuntimeError('Invalid [x_param_index] argument') if (y_param_index < 0) or (y_param_index >= self.Nparameters): raise RuntimeError('Invalid [y_param_index] argument') np = kwargs.get('no_points', 100) confidence = kwargs.get('confidence', 95) if (np <= 0): raise RuntimeError('Invalid [np] argument') if (confidence <= 0) or (confidence > 100): raise RuntimeError('Invalid [confidence] argument') X = numpy.zeros((2, np)) chol = cholesky(self.cov_x) C = numpy.zeros((2,2)) C[0, 0] = chol[x_param_index, x_param_index] C[0, 1] = chol[x_param_index, y_param_index] C[1, 0] = chol[y_param_index, x_param_index] C[1, 1] = chol[y_param_index, y_param_index] fi = numpy.linspace(0, 2*numpy.pi, num = np) X[0, :] = numpy.sin(fi) X[1, :] = numpy.cos(fi) M = numpy.zeros((2, np)) M[0, :] = self.p_estimated[x_param_index] M[1, :] = self.p_estimated[y_param_index] k = self.getConfidenceCoefficient(confidence) Y = M + k * numpy.dot(C, X) return (Y[0, :], Y[1, :], [self.p_estimated[x_param_index]], [self.p_estimated[y_param_index]])
# Returns tuple: [x], [y_fit] and [y_exp] coordinates for the given input_variable and measured_variable indexes
[docs] def getFit_SS(self, input_variable_index, measured_variable_index, **kwargs): if (self.IsInitialized == False) or (self.experimental_data == None) or (self.Nmeasured_variables == 0) or (self.Nexperiments == 0) or (self.Ntime_points == 0): raise RuntimeError('daeMinpackLeastSq has not been initialized') if (self.ier not in [1, 2, 3, 4]): # or (self.p_estimated == None) or (self.cov_x == None) or (self.infodict == None): raise RuntimeError('daeMinpackLeastSq Run has not been called') if (measured_variable_index < 0) or (measured_variable_index >= self.Nmeasured_variables): raise RuntimeError('Invalid [measured_variable_index] argument') if (input_variable_index < 0) or (input_variable_index >= self.Ninput_variables): raise RuntimeError('Invalid [input_variable_index] argument') if (self.Ntime_points != 1): raise RuntimeError('In order to call getFit_SS function the number of time points must be equal to 1') values = self.infodict['fvec'].reshape( (self.Nmeasured_variables, self.Nexperiments, self.Ntime_points) ) x_axis = [] y_fit = [] y_exp = [] for e in range(0, self.Nexperiments): x_axis.append(self.experimental_data[e][1][input_variable_index]) y_fit.append(self.experimental_data[e][2][measured_variable_index] + values[measured_variable_index][e][0]) y_exp.append(self.experimental_data[e][2][measured_variable_index]) return x_axis, y_exp, y_fit
[docs] def getFit_Dyn(self, measured_variable_index, experiment_index, **kwargs): if (self.IsInitialized == False) or (self.experimental_data == None) or (self.Nmeasured_variables == 0) or (self.Nexperiments == 0) or (self.Ntime_points == 0): raise RuntimeError('daeMinpackLeastSq has not been initialized') if (self.ier not in [1, 2, 3, 4]): # or (self.p_estimated == None) or (self.cov_x == None) or (self.infodict == None): raise RuntimeError('daeMinpackLeastSq Run has not been called') if (measured_variable_index < 0) or (measured_variable_index >= self.Nmeasured_variables): raise RuntimeError('Invalid [measured_variable_index] argument') if (experiment_index < 0) or (experiment_index >= self.Nexperiments): raise RuntimeError('Invalid [experiment_index] argument') if (self.Ntime_points == 1): raise RuntimeError('In order to call getFit_Dyn function the number of time points must be greater than 1') values = self.infodict['fvec'].reshape( (self.Nmeasured_variables, self.Nexperiments, self.Ntime_points) ) # x_axis = time points x_axis = self.experimental_data[experiment_index][0] y_fit = self.experimental_data[experiment_index][2][measured_variable_index] + values[measured_variable_index, experiment_index, :] y_exp = self.experimental_data[experiment_index][2][measured_variable_index] return x_axis, y_exp, y_fit