Source code for daetools.code_generators.analyzer

"""
***********************************************************************************
                            analyzer.py
                DAE Tools: pyDAE module, www.daetools.com
                Copyright (C) Dragan Nikolic
***********************************************************************************
DAE Tools is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License version 3 as published by the Free Software
Foundation. DAE Tools is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with the
DAE Tools software; if not, see <http://www.gnu.org/licenses/>.
************************************************************************************
"""
import sys, numpy, traceback
from daetools.pyDAE import *

[docs]class daeCodeGeneratorAnalyzer(object): def __init__(self): self._simulation = None self.ports = [] self.dictPorts = {} self.models = [] self.dictModels = {} self.runtimeInformation = {}
[docs] def analyzeSimulation(self, simulation, unaryFlops = {}, binaryFlops = {}): if not simulation: raise RuntimeError('Invalid simulation object') self._simulation = simulation self.ports = [] self.dictPorts = {} self.models = [] self.dictModels = {} self.runtimeInformation = { 'Domains' : [], 'Parameters' : [], 'Variables' : [], 'Equations' : [], 'STNs' : [], 'PortConnections' : [] } self.unaryFlops = unaryFlops self.binaryFlops = binaryFlops self._collectObjects(self._simulation.m) #print self.ports #print self.models self.models.reverse() for port_class, dict_port in self.ports: data = self.analyzePort(dict_port['port']) # Update dict_port dict_port['data'] = data for model_class, dict_model in self.models: data = self.analyzeModel(dict_model['model']) # Update dict_model dict_model['data'] = data self.runtimeInformation['ModelType'] = self._simulation.m.ModelType self.runtimeInformation['IsModelDynamic'] = self._simulation.m.IsModelDynamic self.runtimeInformation['TotalNumberOfVariables'] = self._simulation.TotalNumberOfVariables self.runtimeInformation['NumberOfEquations'] = self._simulation.NumberOfEquations self.runtimeInformation['IDs'] = self._simulation.VariableTypes self.runtimeInformation['DOFs'] = self._simulation.DOFs self.runtimeInformation['Values'] = self._simulation.Values self.runtimeInformation['TimeDerivatives'] = self._simulation.TimeDerivatives self.runtimeInformation['IndexMappings'] = self._simulation.IndexMappings self.runtimeInformation['RelativeTolerance'] = self._simulation.RelativeTolerance self.runtimeInformation['AbsoluteTolerances'] = self._simulation.AbsoluteTolerances self.runtimeInformation['TimeHorizon'] = self._simulation.TimeHorizon self.runtimeInformation['ReportingInterval'] = self._simulation.ReportingInterval self.runtimeInformation['QuasiSteadyState'] = True if self._simulation.InitialConditionMode == eQuasiSteadyState else False self._collectRuntimeInformationFromModel(self._simulation.m)
[docs] def analyzePort(self, port): result = { 'Class' : '', 'Name' : '', 'Parameters' : [], 'Domains' : [], 'Variables' : [], 'VariableTypes' : {} } result['Class'] = port.__class__.__name__ result['Name'] = port.Name #if len(port.Domains) > 0: # raise RuntimeError('Ports cannot contain domains') #if len(port.Parameters) > 0: # raise RuntimeError('Ports cannot contain parameters') # Domains for domain in port.Domains: data = {} data['Name'] = domain.Name data['Type'] = str(domain.Type) data['Units'] = domain.Units data['NumberOfIntervals'] = domain.NumberOfIntervals data['NumberOfPoints'] = domain.NumberOfPoints #data['DiscretizationMethod'] = str(domain.DiscretizationMethod) #data['DiscretizationOrder'] = domain.DiscretizationOrder data['Description'] = domain.Description result['Domains'].append(data) # Parameters for parameter in port.Parameters: data = {} data['Name'] = parameter.Name data['Domains'] = [daeGetRelativeName(port, domain) for domain in parameter.Domains] data['Units'] = parameter.Units data['Description'] = parameter.Description result['Parameters'].append(data) for variable in port.Variables: #if len(variable.Domains) > 0: # raise RuntimeError('Ports cannot contain distributed variables') data = {} data['Name'] = variable.Name data['Domains'] = [daeGetRelativeName(port, domain) for domain in variable.Domains] data['Type'] = variable.VariableType.Name data['Description'] = variable.Description if variable.NumberOfPoints == 1: IDs = [variable.npyIDs] else: IDs = variable.npyIDs if cnDifferential in IDs: data['RuntimeHint'] = 'differential' elif cnAssigned in IDs: data['RuntimeHint'] = 'assigned' else: data['RuntimeHint'] = 'algebraic' if not variable.VariableType.Name in result['VariableTypes']: vt_data = {} vt_data['Name'] = variable.VariableType.Name vt_data['Units'] = variable.VariableType.Units vt_data['LowerBound'] = variable.VariableType.LowerBound vt_data['UpperBound'] = variable.VariableType.UpperBound vt_data['InitialGuess'] = variable.VariableType.InitialGuess vt_data['AbsoluteTolerance'] = variable.VariableType.AbsoluteTolerance if not variable.VariableType.Name in result['VariableTypes']: result['VariableTypes'][variable.VariableType.Name] = vt_data result['Variables'].append(data) return result
[docs] def analyzeModel(self, model): result = { 'Class' : '', 'Name' : '', 'CanonicalName' : '', 'VariableTypes' : {}, 'Parameters' : [], 'Domains' : [], 'Variables' : [], 'Ports' : [], 'EventPorts' : [], 'Components' : [], 'Equations' : [], 'PortConnections' : [], 'EventPortConnections' : [], 'STNs' : [], 'OnConditionActions' : [], 'OnEventActions' : [], 'PortArrays' : [], # ?? 'ComponentArrays' : [] # ?? } result['Class'] = model.__class__.__name__ result['Name'] = model.Name result['CanonicalName'] = model.CanonicalName # Domains for domain in model.Domains: data = {} data['Name'] = domain.Name data['Type'] = str(domain.Type) data['Units'] = domain.Units data['NumberOfIntervals'] = domain.NumberOfIntervals data['NumberOfPoints'] = domain.NumberOfPoints #data['DiscretizationMethod'] = str(domain.DiscretizationMethod) #data['DiscretizationOrder'] = domain.DiscretizationOrder data['Description'] = domain.Description result['Domains'].append(data) # Parameters for parameter in model.Parameters: data = {} data['Name'] = parameter.Name data['Domains'] = [daeGetRelativeName(model, domain) for domain in parameter.Domains] data['Units'] = parameter.Units data['Description'] = parameter.Description result['Parameters'].append(data) # Variables for variable in model.Variables: data = {} data['Name'] = variable.Name data['Domains'] = [daeGetRelativeName(model, domain) for domain in variable.Domains] data['Type'] = variable.VariableType.Name data['Description'] = variable.Description if variable.NumberOfPoints == 1: IDs = [variable.npyIDs] else: IDs = variable.npyIDs if cnDifferential in IDs: data['RuntimeHint'] = 'differential' elif cnAssigned in IDs: data['RuntimeHint'] = 'assigned' else: data['RuntimeHint'] = 'algebraic' if not variable.VariableType.Name in result['VariableTypes']: vt_data = {} vt_data['Name'] = variable.VariableType.Name vt_data['Units'] = variable.VariableType.Units vt_data['LowerBound'] = variable.VariableType.LowerBound vt_data['UpperBound'] = variable.VariableType.UpperBound vt_data['InitialGuess'] = variable.VariableType.InitialGuess vt_data['AbsoluteTolerance'] = variable.VariableType.AbsoluteTolerance if not variable.VariableType.Name in result['VariableTypes']: result['VariableTypes'][variable.VariableType.Name] = vt_data result['Variables'].append(data) # PortConnections for port_connection in model.PortConnections: data = {} data['PortFrom'] = daeGetRelativeName(model, port_connection.PortFrom) data['PortTo'] = daeGetRelativeName(model, port_connection.PortTo) data['Equations'] = self._processEquations(port_connection.Equations, model) result['PortConnections'].append(data) # EventPortConnections for event_port_connection in model.EventPortConnections: data = {} data['PortFrom'] = daeGetRelativeName(model, event_port_connection.PortFrom) data['PortTo'] = daeGetRelativeName(model, event_port_connection.PortTo) result['EventPortConnections'].append(data) # Ports for port in model.Ports: data = {} data['Name'] = port.Name data['Class'] = port.__class__.__name__ data['Type'] = str(port.Type) data['Description'] = port.Description result['Ports'].append(data) # EventPorts for event_port in model.EventPorts: data = {} data['Name'] = event_port.Name data['Class'] = event_port.__class__.__name__ data['Type'] = str(event_port.Type) data['Description'] = event_port.Description result['EventPorts'].append(data) # Equations result['Equations'] = self._processEquations(model.Equations, model) # StateTransitionNetworks result['STNs'] = self._processSTNs(model.STNs, model) # OnConditionActions result['OnConditionActions'] = self._processOnConditionActions(model.OnConditionActions, model) # OnEventActions result['OnEventActions'] = self._processOnEventActions(model.OnEventActions, model) # Components for component in model.Components: data = {} data['Name'] = component.Name data['Class'] = component.__class__.__name__ data['Description'] = component.Description result['Components'].append(data) # PortArrays if len(model.PortArrays) > 0: raise RuntimeError('PortArrays are not supported by the code analyzer') # ComponentArrays if len(model.ComponentArrays) > 0: raise RuntimeError('ComponentArrays are not supported by the code analyzer') return result
def _collectObjects(self, model): if not model.__class__.__name__ in self.dictModels: self.dictModels[model.__class__.__name__] = None self.models.append( (model.__class__.__name__, {'model' : model, 'data' : None}) ) for port in model.Ports: if not port.__class__.__name__ in self.dictPorts: self.dictPorts[port.__class__.__name__] = None self.ports.append( (port.__class__.__name__, {'port' : port, 'data' : None}) ) for model in model.Components: self._collectObjects(model) def _processEquations(self, equations, model): eqns = [] for equation in equations: data = {} data['Name'] = equation.Name data['Scaling'] = equation.Scaling data['Residual'] = equation.Residual data['Description'] = equation.Description data['EquationType'] = equation.EquationType data['DistributedEquationDomainInfos'] = [] for dedi in equation.DistributedEquationDomainInfos: dedi_data = {} # Get a name relative to the equation's parent model dedi_data['Domain'] = daeGetRelativeName(equation.Model, dedi.Domain) dedi_data['DomainBounds'] = str(dedi.DomainBounds) dedi_data['DomainPoints'] = dedi.DomainPoints data['DistributedEquationDomainInfos'].append(dedi_data) data['EquationExecutionInfos'] = [] for eeinfo in equation.EquationExecutionInfos: eedata = {} eedata['ResidualRuntimeNode'] = eeinfo.Node eedata['VariableIndexes'] = eeinfo.VariableIndexes # GetComputeStackInfo() returns tuple: CS.size, CS.flops eedata['ComputeStackInfo'] = eeinfo.GetComputeStackInfo(self.unaryFlops, self.binaryFlops) eedata['EquationType'] = str(eeinfo.EquationType) data['EquationExecutionInfos'].append(eedata) eqns.append(data) return eqns def _processSTNs(self, STNs, model): stns = [] for stn in STNs: data = {} if isinstance(stn, daeIF): data['Class'] = 'daeIF' else: data['Class'] = 'daeSTN' data['Name'] = stn.Name data['CanonicalName'] = stn.CanonicalName data['Description'] = stn.Description data['ActiveState'] = stn.ActiveState stateMap = {} for i, state in enumerate(stn.States): stateMap[state.Name] = i data['StateMap'] = stateMap data['States'] = [] for i, state in enumerate(stn.States): state_data = {} state_data['Name'] = state.Name state_data['Equations'] = self._processEquations(state.Equations, model) state_data['NestedSTNs'] = self._processSTNs(state.NestedSTNs, model) state_data['OnConditionActions'] = self._processOnConditionActions(state.OnConditionActions, model) state_data['OnEventActions'] = self._processOnEventActions(state.OnEventActions, model) data['States'].append(state_data) stns.append(data) return stns def _processOnConditionActions(self, on_condition_actions, model): sOnConditionActions = [] for on_condition_action in on_condition_actions: oca_data = {} oca_data['ConditionSetupNode'] = on_condition_action.Condition.SetupNode oca_data['ConditionRuntimeNode'] = on_condition_action.Condition.RuntimeNode oca_data['EventTolerance'] = on_condition_action.Condition.EventTolerance oca_data['Expressions'] = on_condition_action.Condition.Expressions oca_data['Actions'] = self._processActions(on_condition_action.Actions, model) if len(on_condition_action.UserDefinedActions) > 0: raise RuntimeError('UserDefinedActions cannot be exported') sOnConditionActions.append(oca_data) return sOnConditionActions def _processOnEventActions(self, on_event_actions, model): sOnEventActions = [] for on_event_action in on_event_actions: oea_data = {} oea_data['EventPort'] = daeGetRelativeName(model, on_event_action.EventPort) oea_data['Actions'] = self._processActions(on_event_action.Actions, model) if len(on_event_action.UserDefinedActions) > 0: raise RuntimeError('UserDefinedActions cannot be exported') sOnEventActions.append(oea_data) return sOnEventActions def _processActions(self, actions, model): sActions = [] for action in actions: data = {} data['Type'] = str(action.Type) if action.Type == eChangeState: data['STN'] = action.STN.Name data['STNCanonicalName'] = action.STN.CanonicalName data['StateTo'] = action.StateTo.Name elif action.Type == eSendEvent: data['SendEventPort'] = daeGetRelativeName(model, action.SendEventPort) data['VariableWrapper'] = action.VariableWrapper elif action.Type == eReAssignOrReInitializeVariable: data['VariableWrapper'] = action.VariableWrapper data['SetupNode'] = action.SetupNode data['RuntimeNode'] = action.RuntimeNode data['DomainIndexes'] = action.VariableWrapper.DomainIndexes data['VariableCanonicalName'] = action.VariableWrapper.Variable.CanonicalName data['OverallIndex'] = action.VariableWrapper.OverallIndex data['ID'] = action.VariableWrapper.VariableType elif action.Type == eUserDefinedAction: raise RuntimeError('User defined actions are not supported') sActions.append(data) return sActions def _collectRuntimeInformationFromModel(self, model): for domain in model.Domains: data = {} data['CanonicalName'] = domain.CanonicalName data['Description'] = domain.Description data['Type'] = str(domain.Type) data['NumberOfIntervals'] = domain.NumberOfIntervals data['NumberOfPoints'] = domain.NumberOfPoints #data['DiscretizationMethod'] = str(domain.DiscretizationMethod) #data['DiscretizationOrder'] = domain.DiscretizationOrder if domain.Type == eUnstructuredGrid: data['Points'] = numpy.array([i for i in range(domain.NumberOfPoints)]) else: data['Points'] = domain.Points data['Units'] = domain.Units self.runtimeInformation['Domains'].append(data) for parameter in model.Parameters: data = {} data['CanonicalName'] = parameter.CanonicalName data['Description'] = parameter.Description data['Domains'] = [domain.NumberOfPoints for domain in parameter.Domains] data['DomainNames'] = [domain.CanonicalName for domain in parameter.Domains] data['NumberOfPoints'] = parameter.NumberOfPoints data['Values'] = parameter.npyValues # nd_array[d1][d2]...[dn] or float if no domains data['DomainsIndexesMap'] = parameter.GetDomainsIndexesMap(0) # {index : [domains indexes]} data['ReportingOn'] = parameter.ReportingOn data['Units'] = parameter.Units self.runtimeInformation['Parameters'].append(data) for variable in model.Variables: data = {} data['CanonicalName'] = variable.CanonicalName data['Description'] = variable.Description data['Domains'] = [domain.NumberOfPoints for domain in variable.Domains] data['DomainNames'] = [domain.CanonicalName for domain in variable.Domains] data['NumberOfPoints'] = variable.NumberOfPoints data['OverallIndex'] = variable.OverallIndex data['Values'] = variable.npyValues # nd_array[d1][d2]...[dn] or float if no domains data['IDs'] = variable.npyIDs # nd_array[d1][d2]...[dn] or float if no domains data['DomainsIndexesMap'] = variable.GetDomainsIndexesMap(0) # {index : [domains indexes]} data['ReportingOn'] = variable.ReportingOn data['Units'] = variable.VariableType.Units data['VariableType'] = variable.VariableType self.runtimeInformation['Variables'].append(data) for port in model.Ports: self._collectRuntimeInformationFromPort(port) for port_connection in model.PortConnections: data = {} data['PortFrom'] = port_connection.PortFrom.CanonicalName data['PortTo'] = port_connection.PortTo.CanonicalName data['Equations'] = self._processEquations(port_connection.Equations, model) self.runtimeInformation['PortConnections'].append(data) # equations self.runtimeInformation['Equations'].extend( self._processEquations(model.Equations, model) ) # StateTransitionNetworks self.runtimeInformation['STNs'].extend( self._processSTNs(model.STNs, model) ) for component in model.Components: self._collectRuntimeInformationFromModel(component) def _collectRuntimeInformationFromPort(self, port): for domain in port.Domains: data = {} data['CanonicalName'] = domain.CanonicalName data['Description'] = domain.Description data['Type'] = str(domain.Type) data['NumberOfIntervals'] = domain.NumberOfIntervals data['NumberOfPoints'] = domain.NumberOfPoints #data['DiscretizationMethod'] = str(domain.DiscretizationMethod) #data['DiscretizationOrder'] = domain.DiscretizationOrder if domain.Type == eUnstructuredGrid: data['Points'] = numpy.array([i for i in range(domain.NumberOfPoints)]) else: data['Points'] = domain.npyPoints data['Units'] = domain.Units self.runtimeInformation['Domains'].append(data) for parameter in port.Parameters: data = {} data['CanonicalName'] = parameter.CanonicalName data['Description'] = parameter.Description data['Domains'] = [domain.NumberOfPoints for domain in parameter.Domains] data['DomainNames'] = [domain.CanonicalName for domain in parameter.Domains] data['NumberOfPoints'] = parameter.NumberOfPoints data['Values'] = parameter.npyValues # nd_array[d1][d2]...[dn] or float if no domains data['DomainsIndexesMap'] = parameter.GetDomainsIndexesMap(0) # {index : [domains indexes]} data['ReportingOn'] = parameter.ReportingOn data['Units'] = parameter.Units self.runtimeInformation['Parameters'].append(data) for variable in port.Variables: data = {} data['CanonicalName'] = variable.CanonicalName data['Description'] = variable.Description data['Domains'] = [domain.NumberOfPoints for domain in variable.Domains] data['DomainNames'] = [domain.CanonicalName for domain in variable.Domains] data['NumberOfPoints'] = variable.NumberOfPoints data['OverallIndex'] = variable.OverallIndex data['Values'] = variable.npyValues # nd_array[d1][d2]...[dn] or float if no domains data['IDs'] = variable.npyIDs # nd_array[d1][d2]...[dn] or float if no domains data['DomainsIndexesMap'] = variable.GetDomainsIndexesMap(0) # {index : [domains indexes]} data['ReportingOn'] = variable.ReportingOn data['Units'] = variable.VariableType.Units data['VariableType'] = variable.VariableType self.runtimeInformation['Variables'].append(data)