Source code for daetools.dae_simulator.daetools_ws

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""********************************************************************************
                            daetools_ws.py
                 DAE Tools: pyDAE module, www.daetools.com
                 Copyright (C) Dragan Nikolic
***********************************************************************************
DAE Tools is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License version 3 as published by the Free Software
Foundation. DAE Tools is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with the
DAE Tools software; if not, see <http://www.gnu.org/licenses/>.
********************************************************************************"""
import os, sys, json, numpy, traceback
import uuid, zipfile, importlib, hashlib, threading
from daetools.pyDAE import *
from daetools.dae_simulator.auxiliary import loadSimulation, loadTutorial
from daetools.dae_simulator.web_service import daeWebService, ServerError, BadRequest
    
__WebAppName__  = 'daetools_ws'

def toBool(val):
  return val.lower() in ("yes", "true", "t", "1")

[docs]class daeSimulationWebService(daeWebService): def __init__(self): daeWebService.__init__(self) self.availableSimulations = {} def __call__(self, environ, start_response): # HTTP request handler function. self.noHTTPRequests += 1 simulationID = None try: pathInfo, args = self.getQueryArguments(environ) root_dir = os.path.dirname(os.path.realpath(__file__)) # Serve files: #filename = os.path.basename(pathInfo) #filepath = os.path.join(root_dir, filename) #if os.path.exists(filepath): # return self._sendFile(filepath, 'application/octet-stream', start_response) if pathInfo != ('/%s' % __WebAppName__): raise BadRequest('Invalid path %s requested (/%s available)' % (pathInfo, __WebAppName__)) if 'function' not in args: raise BadRequest('No function argument specified in the query: %s' % pathInfo) else: function = args['function'].value if function == 'shutdown': # Finalize simulations and free resources for all components for simulationID,simulation in self.activeObjects.items(): try: simulation.fmi2Terminate() simulation.fmi2FreeInstance() except: # We do not care about errors here. pass # Empty the storage and set noHTTPRequests to >0 (meaning: stop the server) self.activeObjects = {} self.noHTTPRequests = 1 return self.jsonSuccess(start_response) elif function == 'clear': # Finalize simulations and free resources for all components for simulationID,simulation in self.activeObjects.items(): try: simulation.fmi2Terminate() simulation.fmi2FreeInstance() except: # We do not care about errors here. pass # Empty the storage and set noHTTPRequests to 0 (meaning: keep the server running) self.activeObjects = {} self.noHTTPRequests = 0 return self.jsonSuccess(start_response) elif function == 'status': activeObjects = [] response = {'ActiveClients': activeObjects} for simulationID, simulation in self.activeObjects.items(): activeObjects.append({'simulationID': simulationID, 'modelName': simulation.m.Name}) return self.jsonResult(response, start_response) elif function == 'LoadSimulation': # Get the function arguments pythonFile = str(args['pythonFile'].value) loadCallable = str(args['loadCallable'].value) arguments = str(args['arguments'].value) # Load the simulation directory, simulationFile = os.path.split(pythonFile) try: simulation = loadSimulation(directory, simulationFile, loadCallable, arguments) except Exception as e: raise BadRequest(str(e)) # Create model info simulation.__ModelInfo__ = {} simulation.__ModelInfo__['Domains'] = {} simulation.__ModelInfo__['Parameters'] = {} simulation.__ModelInfo__['Variables'] = {} simulation.__ModelInfo__['STNs'] = {} _collectModelInfo(simulation.__ModelInfo__, simulation.m) # Create an ID for the simulation object (to be used with the subsequent calls) simulationID = str(uuid.uuid1()) # Store the created object in a dictionary self.activeObjects[simulationID] = simulation # Return the component ID response = {'simulationID': str(simulationID)} return self.jsonResult(response, start_response) elif function == 'LoadTutorial': # Get the function arguments tutorialName = str(args['tutorialName'].value) # Load the simulation loadCallable = 'run' arguments = 'initializeAndReturn=True, datareporter = daeNoOpDataReporter()' try: simulation = loadTutorial(tutorialName, loadCallable, arguments) except Exception as e: raise BadRequest(str(e)) # Create model info simulation.__ModelInfo__ = {} simulation.__ModelInfo__['Domains'] = {} simulation.__ModelInfo__['Parameters'] = {} simulation.__ModelInfo__['Variables'] = {} simulation.__ModelInfo__['STNs'] = {} _collectModelInfo(simulation.__ModelInfo__, simulation.m) # Create an ID for the simulation object (to be used with the subsequent calls) simulationID = str(uuid.uuid1()) # Store the created object in a dictionary self.activeObjects[simulationID] = simulation # Return the component ID response = {'simulationID': str(simulationID)} return self.jsonResult(response, start_response) elif function == 'LoadSimulationByName': # Get the function arguments simulationName = str(args['simulationName'].value) arguments = json.loads(args['arguments'].value) if not isinstance(arguments, dict): raise BadRequest('The arguments must be a dictionary object') # Load the simulation if simulationName not in self.availableSimulations: raise BadRequest('Simulation %s does not exist in the list of available simulations' % simulationName) simulationCallable = self.availableSimulations[simulationName] if not callable(simulationCallable): raise BadRequest('Simulation %s does not provide valid callable for loading' % simulationName) try: simulation = simulationCallable(**arguments) except Exception as e: raise BadRequest(str(e)) # Create model info simulation.__ModelInfo__ = {} simulation.__ModelInfo__['Domains'] = {} simulation.__ModelInfo__['Parameters'] = {} simulation.__ModelInfo__['Variables'] = {} simulation.__ModelInfo__['STNs'] = {} _collectModelInfo(simulation.__ModelInfo__, simulation.m) # Create an ID for the simulation object (to be used with the subsequent calls) simulationID = str(uuid.uuid1()) # Store the created object in a dictionary self.activeObjects[simulationID] = simulation # Return the component ID response = {'simulationID': str(simulationID)} return self.jsonResult(response, start_response) elif function == 'AvailableSimulations': # Get the function arguments # Get the list of simulations simNames = list(self.availableSimulations.keys()) # Return the list with the names response = {'AvailableSimulations': simNames} return self.jsonResult(response, start_response) elif function == 'GetModelInfo': # Get the function arguments (if any) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Prepare the model info dictionary modelInfo = {} modelInfo['Domains'] = [name for name, obj in simulation.__ModelInfo__['Domains'].items()] modelInfo['Parameters'] = [name for name, obj in simulation.__ModelInfo__['Parameters'].items()] modelInfo['Variables'] = [name for name, obj in simulation.__ModelInfo__['Variables'].items()] modelInfo['STNs'] = [name for name, obj in simulation.__ModelInfo__['STNs'].items()] # Return the model info response = {'ModelInfo': modelInfo} return self.jsonResult(response, start_response) elif function == 'GetName': # Get the function arguments (if any) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Return the top-level model name response = {'Name': simulation.m.Name} return self.jsonResult(response, start_response) elif function == 'GetCurrentTime': # Get the function arguments (if any) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Return the CurrentTime response = {'CurrentTime': simulation.CurrentTime} # hex return self.jsonResult(response, start_response) elif function == 'GetTimeHorizon': # Get the function arguments (if any) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Return the TimeHorizon response = {'TimeHorizon': simulation.TimeHorizon} # hex return self.jsonResult(response, start_response) elif function == 'SetTimeHorizon': # Get the function arguments (if any) timeHorizon = float(args['timeHorizon'].value) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function simulation.TimeHorizon = timeHorizon return self.jsonSuccess(start_response) elif function == 'GetReportingInterval': # Get the function arguments (if any) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Return the ReportingInterval response = {'ReportingInterval': simulation.ReportingInterval} # hex return self.jsonResult(response, start_response) elif function == 'SetReportingInterval': # Get the function arguments (if any) reportingInterval = float(args['reportingInterval'].value) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function simulation.ReportingInterval = reportingInterval return self.jsonSuccess(start_response) elif function == 'Run': # Get the function arguments (if any) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function simulation.Run() return self.jsonSuccess(start_response) elif function == 'Finalize': # Get the function arguments (if any) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function simulation.Finalize() del self.activeObjects[simulationID] # Changed to use keepRunning function. # If the number of fmi objects is zero shutdown the server. # Otherwise, continue serving the requests from the existing objects. #if len(self.activeObjects) == 0: # threading.Thread(target = httpd.shutdown).start() return self.jsonSuccess(start_response) elif function == 'SolveInitial': # Get the function arguments (if any) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function simulation.SolveInitial() return self.jsonSuccess(start_response) elif function == 'Reinitialize': # Get the function arguments (if any) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function simulation.Reinitialize() return self.jsonSuccess(start_response) elif function == 'Reset': # Get the function arguments (if any) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function simulation.Reset() return self.jsonSuccess(start_response) elif function == 'ReportData': # Get the function arguments (if any) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function simulation.ReportData(simulation.CurrentTime) return self.jsonSuccess(start_response) elif function == 'Integrate': # Get the function arguments (if any) stopAtDiscontinuity = toBool(args['stopAtDiscontinuity'].value) reportDataAroundDiscontinuities = toBool(args['reportDataAroundDiscontinuities'].value) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function stopMode = (eStopAtModelDiscontinuity if stopAtDiscontinuity else eDoNotStopAtDiscontinuity) timeReached = simulation.Integrate(stopMode, reportDataAroundDiscontinuities) # Return the time reached by simulation response = {'TimeReached': timeReached} # hex return self.jsonResult(response, start_response) elif function == 'IntegrateForTimeInterval': # Get the function arguments (if any) timeInterval = float (args['timeInterval'].value) stopAtDiscontinuity = toBool(args['stopAtDiscontinuity'].value) reportDataAroundDiscontinuities = toBool(args['reportDataAroundDiscontinuities'].value) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function stopMode = (eStopAtModelDiscontinuity if stopAtDiscontinuity else eDoNotStopAtDiscontinuity) timeReached = simulation.IntegrateForTimeInterval(timeInterval, stopMode, reportDataAroundDiscontinuities) # Return the time reached by simulation response = {'TimeReached': timeReached} # hex return self.jsonResult(response, start_response) elif function == 'IntegrateUntilTime': # Get the function arguments (if any) timeHorizon = float (args['time'].value) stopAtDiscontinuity = toBool(args['stopAtDiscontinuity'].value) reportDataAroundDiscontinuities = toBool(args['reportDataAroundDiscontinuities'].value) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function stopMode = (eStopAtModelDiscontinuity if stopAtDiscontinuity else eDoNotStopAtDiscontinuity) timeReached = simulation.IntegrateUntilTime(timeHorizon, stopMode, reportDataAroundDiscontinuities) # Return the time reached by simulation response = {'TimeReached': timeReached} # hex return self.jsonResult(response, start_response) elif function == 'GetParameterValue': # Get the function arguments (if any) parameterName = str(args['name'].value) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function parameters = simulation.__ModelInfo__['Parameters'] parameter = parameters[parameterName] values = parameter.npyValues if not isinstance(values, float): values = values.tolist() # Return the values array response = {'Values': values} # hex return self.jsonResult(response, start_response) elif function == 'GetVariableValue': # Get the function arguments (if any) variableName = str(args['name'].value) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function variables = simulation.__ModelInfo__['Variables'] variable = variables[variableName] values = variable.npyValues if not isinstance(values, float): values = values.tolist() # Return the values array response = {'Values': values} # hex return self.jsonResult(response, start_response) elif function == 'GetActiveState': # Get the function arguments (if any) stnName = str(args['stnName'].value) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function stns = simulation.__ModelInfo__['STNs'] stn = stns[stnName] activeState = stn.ActiveState # Return the activeState response = {'ActiveState': activeState} return self.jsonResult(response, start_response) elif function == 'SetParameterValue': # Get the function arguments (if any) parameterName = str(args['name'].value) value = json.loads(str(args['value'].value)) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function parameters = simulation.__ModelInfo__['Parameters'] parameter = parameters[parameterName] if isinstance(value, float): parameter.SetValue(value) else: parameter.SetValues(value) return self.jsonSuccess(start_response) elif function == 'ReAssignValue': # Get the function arguments (if any) variableName = str(args['name'].value) value = json.loads(str(args['value'].value)) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function variables = simulation.__ModelInfo__['Variables'] variable = variables[variableName] if isinstance(value, float): variable.ReAssignValue(value) else: variable.ReAssignValues(numpy.array(value)) return self.jsonSuccess(start_response) elif function == 'ReSetInitialCondition': # Get the function arguments (if any) variableName = str(args['name'].value) value = json.loads(str(args['value'].value)) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function variables = simulation.__ModelInfo__['Variables'] variable = variables[variableName] if isinstance(value, float): variable.ReSetInitialCondition(value) else: variable.ReSetInitialConditions(numpy.array(value)) return self.jsonSuccess(start_response) elif function == 'SetActiveState': # Get the function arguments (if any) stnName = str(args['stnName'].value) activeState = str(args['activeState'].value) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function stns = simulation.__ModelInfo__['STNs'] stn = stns[stnName] stn.ActiveState = activeState return self.jsonSuccess(start_response) elif function == 'DAESolver.GetRelativeTolerance': # Get the function arguments (if any) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function relativeTolerance = simulation.DAESolver.RelativeTolerance # Return the relativeTolerance response = {'RelativeTolerance': relativeTolerance} return self.jsonResult(response, start_response) elif function == 'DAESolver.SetRelativeTolerance': # Get the function arguments (if any) relativeTolerance = float(args['relativeTolerance'].value) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Call the function simulation.DAESolver.RelativeTolerance = relativeTolerance return self.jsonSuccess(start_response) elif function == 'DataReporter.AllValues': # Get the function arguments (if any) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Get the data datareporter = simulation.DataReporter process = datareporter.Process variables = process.dictVariables varValues = process.dictVariableValues values = {} for variableName, (ndarr_values, ndarr_times, l_domains, s_units) in varValues.items(): name = variableName #_removeRootModel(variableName) variable = variables[variableName] domainNames = [_removeRootModel(d.Name) for d in variable.Domains] value = {'ShortName' : _nameFromCanonical(variableName), 'Name' : variableName, 'Values' : ndarr_values.tolist(), 'Times' : ndarr_times.tolist(), 'DomainNames' : domainNames, 'Domains' : [d.tolist() for d in l_domains], 'Units' : s_units } values[name] = value # Return the values dictionary response = {'AllValues': values} return self.jsonResult(response, start_response) elif function == 'DataReporter.Value': # Get the function arguments (if any) # variableName is full canonical name variableName = str(args['variableName'].value) # Get the simulation object from the dictionary simulation, simulationID = self._getSimulation(args) # Get the data datareporter = simulation.DataReporter process = datareporter.Process variables = process.dictVariables varValues = process.dictVariableValues # Pre-pend the variable name with the top-level model name name = variableName # '%s.%s' % (simulation.m.Name, variableName) ndarr_values, ndarr_times, l_domains, s_units = varValues[name] variable = variables[name] domainNames = [_removeRootModel(d.Name) for d in variable.Domains] values = {'ShortName' : _removeRootModel(name), 'Name' : variable.Name, 'Values' : ndarr_values.tolist(), 'Times' : ndarr_times.tolist(), 'DomainNames' : domainNames, 'Domains' : [d.tolist() for d in l_domains], 'Units' : s_units } # Return the values dictionary response = {'Value': values} return self.jsonResult(response, start_response) else: return self.jsonBadRequest('Invalid function specified: %s' % function, start_response, simulationID) except BadRequest as e: print(str(e)) return self.jsonBadRequest(str(e), start_response, simulationID) except ServerError as e: print(str(e)) return self.jsonError(str(e), start_response, simulationID) except Exception as e: exc_type, exc_value, exc_traceback = sys.exc_info() traceback.print_exception(exc_type, exc_value, exc_traceback) ltb = traceback.format_exception(exc_type, exc_value, exc_traceback) return self.jsonError(''.join(ltb), start_response, simulationID) def _getSimulation(self, args): # Returns the simulation object from the dictionary fr the given simulationID. # Raises an exception if the simulationID has not been sent or it does not exist. if 'simulationID' not in args: raise BadRequest('The query arguments do not contain the simulation ID string') simulationID = args['simulationID'].value if simulationID not in self.activeObjects: raise BadRequest('Simulation %s does not exist or is terminated' % simulationID) simulation = self.activeObjects[simulationID] return simulation, simulationID
[docs] @staticmethod def runSimulationsAsWebService(availableSimulations, address = '127.0.0.1', port = 8001): """ Starts the daetools web service providing the simulation via their names. Argument 'simulations' is a dictionary: {name: callable} """ if not isinstance(availableSimulations, dict): raise RuntimeError('The daeRunSimulationsAsWebService function argument availableSimulations must be a dictionary') application = daeSimulationWebService() # Populate the dictionary with the inputs from the argument availableSimulations for simulationName, loaderFunction in availableSimulations.items(): if not callable(loaderFunction): raise RuntimeError('The loaderFunction for the simulation %s is not a callable' % simulationName) application.availableSimulations[simulationName] = loaderFunction # Start the web service daeWebService.startWebService(application, address, port, True)
def _collectModelInfoFromPort(modelInfo, port): for domain in port.Domains: lcanonicalName = domain.CanonicalName.split('.')[1:] canonicalName = '.'.join(lcanonicalName) modelInfo['Domains'][canonicalName] = domain for parameter in port.Parameters: lcanonicalName = parameter.CanonicalName.split('.')[1:] canonicalName = '.'.join(lcanonicalName) modelInfo['Parameters'][canonicalName] = parameter for variable in port.Variables: lcanonicalName = variable.CanonicalName.split('.')[1:] canonicalName = '.'.join(lcanonicalName) modelInfo['Variables'][canonicalName] = variable def _collectModelInfo(modelInfo, model): for domain in model.Domains: lcanonicalName = domain.CanonicalName.split('.')[1:] canonicalName = '.'.join(lcanonicalName) modelInfo['Domains'][canonicalName] = domain for parameter in model.Parameters: lcanonicalName = parameter.CanonicalName.split('.')[1:] canonicalName = '.'.join(lcanonicalName) modelInfo['Parameters'][canonicalName] = parameter for variable in model.Variables: lcanonicalName = variable.CanonicalName.split('.')[1:] canonicalName = '.'.join(lcanonicalName) modelInfo['Variables'][canonicalName] = variable for port in model.Ports: _collectModelInfoFromPort(modelInfo, port) for stn in model.STNs: lcanonicalName = stn.CanonicalName.split('.')[1:] canonicalName = '.'.join(lcanonicalName) modelInfo['STNs'][canonicalName] = stn for component in model.Components: _collectModelInfo(modelInfo, component) def _removeRootModel(name): lnames = daeGetStrippedName(name).split('.') return '.'.join(lnames[1:]) def _nameFromCanonical(canonicalName): return daeGetStrippedName(canonicalName).split('.')[-1] if __name__ == "__main__": address = '127.0.0.1' port = 8001 application = daeSimulationWebService() try: daeWebService.startWebService(application, address, port, True) except Exception as e: pass