"""
***********************************************************************************
fmi.py
DAE Tools: pyDAE module, www.daetools.com
Copyright (C) Dragan Nikolic
***********************************************************************************
DAE Tools is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License version 3 as published by the Free Software
Foundation. DAE Tools is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with the
DAE Tools software; if not, see <http://www.gnu.org/licenses/>.
************************************************************************************
"""
import os, shutil, sys, numpy, math, traceback, uuid, zipfile, tempfile, json, time, glob
import daetools
from daetools.pyDAE import *
from .c99 import daeCodeGenerator_c99
from .fmi_xml_support import *
[docs]class daeCodeGenerator_FMI(fmiModelDescription):
def __init__(self, xml_stylesheet = None):
fmiModelDescription.__init__(self)
self.useWebService = False
[docs] def generateSimulation(self, simulation,
directory,
py_simulation_file,
callable_object_name,
arguments,
additional_files = [],
localsAsOutputs = True,
add_xml_stylesheet = False,
useWebService = False):
try:
tmp_folder = ''
if not simulation:
raise RuntimeError('Invalid simulation object')
if not os.path.isdir(directory):
os.makedirs(directory)
if not isinstance(additional_files, list):
raise RuntimeError('Additional python files must be a list of tuples')
if not callable_object_name:
raise RuntimeError('No python callable object name specified for FMU')
self.useWebService = useWebService
self.localsAsOutputs = localsAsOutputs
self.wrapperInstanceName = simulation.m.Name
modelIdentifier = simulation.m.GetStrippedName()
fmu_directory = directory
tmp_folder = tempfile.mkdtemp(prefix = 'daetools-fmu-')
xml_description_filename = os.path.join(tmp_folder, 'modelDescription.xml')
sources_dir = os.path.join(tmp_folder, 'sources')
resources_dir = os.path.join(tmp_folder, 'resources')
binaries_dir = os.path.join(tmp_folder, 'binaries')
fmu_filename = os.path.join(fmu_directory, simulation.m.Name + '.fmu')
folder = tmp_folder
if not os.path.exists(folder):
os.makedirs(folder)
folder = os.path.join(tmp_folder, 'documentation')
if not os.path.exists(folder):
os.makedirs(folder)
folder = os.path.join(tmp_folder, 'sources')
if not os.path.exists(folder):
os.makedirs(folder)
folder = os.path.join(tmp_folder, 'resources')
if not os.path.exists(folder):
os.makedirs(folder)
folder = os.path.join(tmp_folder, 'binaries')
if not os.path.exists(folder):
os.makedirs(folder)
folder = os.path.join(tmp_folder, 'binaries/win32')
if not os.path.exists(folder):
os.makedirs(folder)
folder = os.path.join(tmp_folder, 'binaries/win64')
if not os.path.exists(folder):
os.makedirs(folder)
folder = os.path.join(tmp_folder, 'binaries/linux32')
if not os.path.exists(folder):
os.makedirs(folder)
folder = os.path.join(tmp_folder, 'binaries/linux64')
if not os.path.exists(folder):
os.makedirs(folder)
folder = os.path.join(tmp_folder, 'binaries/darwin32')
if not os.path.exists(folder):
os.makedirs(folder)
folder = os.path.join(tmp_folder, 'binaries/darwin64')
if not os.path.exists(folder):
os.makedirs(folder)
# Copy the python file with the simulation class and all additional files to the 'resources' folder
# py_simulation_file argument is often specified using the __file__ attribute.
# In Python 2.7 __file__ points to 'file.pyc' - correct it to 'file.py'
if py_simulation_file.endswith('.pyc'):
py_simulation_file = py_simulation_file[:-1]
py_path, py_filename = os.path.split(str(py_simulation_file))
shutil.copy2(py_simulation_file, os.path.join(resources_dir, py_filename))
if add_xml_stylesheet:
fmi_py_dir = os.path.dirname(os.path.abspath(__file__))
#xsl_name = os.path.basename(self.xml_stylesheet)
#xsl_name = os.path.splitext(xsl_name)[0]
self.xml_stylesheet = 'resources/daetools-fmi.xsl'
shutil.copy2(os.path.join(fmi_py_dir, 'daetools-fmi.xsl'), os.path.join(resources_dir, 'daetools-fmi.xsl'))
shutil.copy2(os.path.join(fmi_py_dir, 'daetools-fmi.css'), os.path.join(resources_dir, 'daetools-fmi.css'))
# Copy the additional files to the locations relative to the 'resources' folder
# Additional files are a list of tuples: [('file_path', 'resources_dir_relative_path'), ...]
for py_file, relative_path in additional_files:
destination_file = os.path.join(resources_dir, relative_path)
py_path, py_filename = os.path.split(destination_file)
if not os.path.isdir(py_path):
os.makedirs(py_path)
shutil.copy2(py_file, destination_file)
# Copy all available libcdaeFMU_CS-pyXY.[so/dll/dynlib] to the 'binaries/platform[32/64]' folder
self._copy_solib('Linux', 'x86_64', modelIdentifier, binaries_dir)
self._copy_solib('Linux', 'i386', modelIdentifier, binaries_dir)
self._copy_solib('Windows', 'win32', modelIdentifier, binaries_dir)
self._copy_solib('Windows', 'win64', modelIdentifier, binaries_dir)
self._copy_solib('Darwin', 'x86_64', modelIdentifier, binaries_dir)
self._copy_solib('Darwin', 'i386', modelIdentifier, binaries_dir)
# Generate settings.json file
f = open(os.path.join(resources_dir, 'settings.json'), "w")
settings = {}
settings['simulationFile'] = os.path.basename(py_simulation_file)
settings['callableObjectName'] = callable_object_name
settings['arguments'] = arguments
f.write(json.dumps(settings, indent = 4, sort_keys = True))
f.close()
# Generate initialization.json file
# daeSimulationExplorer.saveJSONSettings(os.path.join(resources_dir, 'init.json'), simulation, callable_object_name)
# Fill in the xml data
self.modelName = modelIdentifier #*
self.guid = uuid.uuid1() #*
self.description = simulation.m.Description
self.author = ''
self.version = ''
self.copyright = ''
self.license = ''
self.generationTool = 'DAE Tools v%s' % daeVersion(True)
self.generationDateAndTime = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.localtime())
self.variableNamingConvention = fmiModelDescription.variableNamingConventionStructured
self.numberOfEventIndicators = 0
self.CoSimulation = fmiCoSimulation()
self.CoSimulation.modelIdentifier = modelIdentifier #*
self.CoSimulation.needsExecutionTool = True
self.CoSimulation.canHandleVariableCommunicationStepSize = True
self.CoSimulation.canHandleEvents = True
self.CoSimulation.canInterpolateInputs = False
self.CoSimulation.maxOutputDerivativeOrder = 0
self.CoSimulation.canRunAsynchronuously = False
self.CoSimulation.canSignalEvents = True
self.CoSimulation.canBeInstantiatedOnlyOncePerProcess = False
self.CoSimulation.canNotUseMemoryManagementFunctions = True
self.CoSimulation.canGetAndSetFMUstate = False
self.CoSimulation.canSerializeFMUstate = False
self.CoSimulation.providesPartialDerivativesOf_DerivativeFunction_wrt_States = False
self.CoSimulation.providesPartialDerivativesOf_DerivativeFunction_wrt_Inputs = False
self.CoSimulation.providesPartialDerivativesOf_OutputFunction_wrt_States = False
self.CoSimulation.providesPartialDerivativesOf_OutputFunction_wrt_Inputs = False
self.ModelStructure = fmiModelStructure()
self.ModelVariables = []
self.UnitDefinitions = []
self.TypeDefinitions = []
self.VendorAnnotations = []
self.DefaultExperiment = fmiDefaultExperiment()
self.LogCategories = []
# Setup a default experiment
self.DefaultExperiment.startTime = 0.0
self.DefaultExperiment.stopTime = simulation.TimeHorizon
self.DefaultExperiment.tolerance = simulation.DAESolver.RelativeTolerance
# Add model variables
fmi_interface = simulation.m.GetFMIInterface()
#print fmi_interface
variableTypesUsed = {}
unitsUsed = {}
for ref, f in sorted(fmi_interface.items()):
if f.type == 'Input':
self._addInput(f)
if not f.variable.VariableType.Name in variableTypesUsed:
variableTypesUsed[f.variable.VariableType.Name] = f.variable.VariableType
if not str(f.variable.VariableType.Units) in unitsUsed:
unitsUsed[str(f.variable.VariableType.Units)] = f.variable.VariableType.Units
elif f.type == 'Output':
self._addOutput(f)
if not f.variable.VariableType.Name in variableTypesUsed:
variableTypesUsed[f.variable.VariableType.Name] = f.variable.VariableType
if not str(f.variable.VariableType.Units) in unitsUsed:
unitsUsed[str(f.variable.VariableType.Units)] = f.variable.VariableType.Units
elif f.type == 'Local':
# If localsAsOutputs is true treat all locals as outputs.
if self.localsAsOutputs:
self._addOutput(f)
else:
self._addLocal(f)
if not f.variable.VariableType.Name in variableTypesUsed:
variableTypesUsed[f.variable.VariableType.Name] = f.variable.VariableType
if not str(f.variable.VariableType.Units) in unitsUsed:
unitsUsed[str(f.variable.VariableType.Units)] = f.variable.VariableType.Units
elif f.type == 'Parameter':
self._addParameter(f)
if not str(f.parameter.Units) in unitsUsed:
unitsUsed[str(f.parameter.Units)] = f.parameter.Units
elif f.type == 'STN':
self._addSTN(f)
else:
raise RuntimeError('Invalid variable reference type')
# Add unit definitions
for unit_name, u in unitsUsed.items():
self._addUnitDefinition(u)
# Add variable types
for vartype_name, var_type in variableTypesUsed.items():
self._addTypeDefinition(var_type)
# Add log categories
cat = fmiLogCategory()
cat.name = 'logAll'
self.LogCategories.append(cat)
# Save model description xml file
self.to_xml(xml_description_filename)
files_to_zip = []
for root, dirs, files in os.walk(tmp_folder):
for file in files:
filename = os.path.join(root, file)
relative_path = os.path.relpath(filename, tmp_folder)
files_to_zip.append( (filename, relative_path) )
zip = zipfile.ZipFile(fmu_filename, "w")
for filename, relative_path in files_to_zip:
zip.write(filename, relative_path)
zip.close()
finally:
# Remove temporary directory
if os.path.isdir(tmp_folder):
#shutil.rmtree(tmp_folder)
pass
def _formatUnits(self, units):
# Format: m.kg2/s-2 meaning m * kg**2 / s**2
positive = []
negative = []
for u, exp in list(units.toDict().items()):
if exp >= 0:
if exp == 1:
positive.append('{0}'.format(u))
elif int(exp) == exp:
positive.append('{0}^{1}'.format(u, int(exp)))
else:
positive.append('{0}^{1}'.format(u, exp))
for u, exp in list(units.toDict().items()):
if exp < 0:
if exp == -1:
negative.append('{0}'.format(u))
elif int(exp) == exp:
negative.append('{0}^{1}'.format(u, int(math.fabs(exp))))
else:
negative.append('{0}^{1}'.format(u, math.fabs(exp)))
if len(positive) == 0:
sPositive = 'rad'
else:
sPositive = '.'.join(positive)
if len(negative) == 0:
sNegative = ''
elif len(negative) == 1:
sNegative = '/' + '.'.join(negative)
else:
sNegative = '/(' + '.'.join(negative) + ')'
return sPositive + sNegative
def _copy_solib(self, platform_system, platform_machine, modelIdentifier, binaries_dir):
# Copy libcdaeFMU_CS-pyXY.[so/dll/dynlib] and libcdaeSimulationLoader-pyXY.[so/dll/dynlib]
# to the 'binaries/platform[32/64]' folder
if platform_system == 'Linux':
so_ext = 'so'
so_ext_pattern = 'so.*'
shared_lib_prefix = 'lib'
shared_lib_postfix = ''
if platform_machine == 'x86_64':
platform_binaries_dir = os.path.join(binaries_dir, 'linux64')
else:
platform_binaries_dir = os.path.join(binaries_dir, 'linux32')
elif platform_system == 'Windows':
so_ext = 'dll'
so_ext_pattern = 'dll'
shared_lib_prefix = ''
shared_lib_postfix = '1'
if platform_machine == 'win64':
platform_binaries_dir = os.path.join(binaries_dir, 'win64')
else:
platform_binaries_dir = os.path.join(binaries_dir, 'win32')
elif platform_system == 'Darwin':
so_ext = 'dylib'
so_ext_pattern = 'dylib'
shared_lib_prefix = 'lib'
shared_lib_postfix = ''
if platform_machine == 'x86_64':
platform_binaries_dir = os.path.join(binaries_dir, 'darwin64')
else:
platform_binaries_dir = os.path.join(binaries_dir, 'darwin32')
else:
raise RuntimeError('Unsupported platform: %s' % platform_system)
solibs_dir = os.path.join(daetools.daetools_dir, 'solibs', '%s_%s' % (platform_system, platform_machine), 'lib')
daetools_fmu_solib = '%s.%s' % (modelIdentifier, so_ext)
if self.useWebService:
daetools_fmi_cs = '%scdaeFMU_CS_WS%s.%s' % (shared_lib_prefix,
shared_lib_postfix,
so_ext)
else:
daetools_fmi_cs = '%scdaeFMU_CS-py%s%s%s.%s' % (shared_lib_prefix,
daetools.python_version_major,
daetools.python_version_minor,
shared_lib_postfix,
so_ext)
daetools_simulation_loader = '%scdaeSimulationLoader-py%s%s%s.%s' % (shared_lib_prefix,
daetools.python_version_major,
daetools.python_version_minor,
shared_lib_postfix,
so_ext)
boost_files = glob.iglob(os.path.join(solibs_dir, "*boost_*-daetools-py%s%s.%s" % (daetools.python_version_major,
daetools.python_version_minor,
so_ext_pattern)))
try:
# Copy FMU_CS
_source = os.path.join(solibs_dir, daetools_fmi_cs)
_target = os.path.join(platform_binaries_dir, daetools_fmu_solib)
#print('copy %s %s' % (_source, _target))
shutil.copy2(_source, _target)
except Exception as e:
# Ignore exceptions, since some of binaries are certainly not available
pass
if not self.useWebService:
try:
# Copy SimulationLoader
_source = os.path.join(solibs_dir, daetools_simulation_loader)
_target = os.path.join(platform_binaries_dir)
#print('copy %s %s' % (_source, _target))
shutil.copy2(_source, _target)
except Exception as e:
# Ignore exceptions, since some of binaries are certainly not available
pass
try:
# Copy boost libs
_target = os.path.join(platform_binaries_dir)
for _source in boost_files:
if os.path.isfile(_source):
#print('copy %s %s' % (_source, _target))
shutil.copy2(_source, _target)
except Exception as e:
# Ignore exceptions, since some of binaries are certainly not available
pass
def _addInput(self, fmi_obj):
sv = fmiScalarVariable()
sv.name = str(fmi_obj.name) #*
sv.valueReference = int(fmi_obj.reference) #*
sv.description = str(fmi_obj.description)
sv.causality = fmiScalarVariable.causalityInput
sv.variability = fmiScalarVariable.variabilityContinuous # Set it to continuous (page 49 FMI-v2.0.pdf)
sv.initial = None # It is not allowed to provide a value for initial if causality = "input" or "independent"
sv.type = fmiReal()
sv.type.declaredType = fmi_obj.variable.VariableType.Name
sv.type.start = fmi_obj.variable.GetValue(list(fmi_obj.indexes))
self.ModelVariables.append(sv)
def _addOutput(self, fmi_obj):
# In general, the index is not equal to he reference but to the index in the ModelVariables list.
# But, the references in daetools start at 1 and increase and they are added in the sorted order.
# So it should be fine to use reference as an index.
# Anyway, the index in the ModelStructure.Outputs is used.
# The indexes in FMI start at 1 (not at zero).
var_index = len(self.ModelVariables) + 1
unknown = fmiVariableDependency()
unknown.index = var_index #*
self.ModelStructure.Outputs.append(unknown)
unknown = fmiVariableDependency()
unknown.index = var_index #*
self.ModelStructure.InitialUnknowns.append(unknown)
sv = fmiScalarVariable()
sv.name = str(fmi_obj.name) #*
sv.valueReference = int(fmi_obj.reference) #*
sv.description = str(fmi_obj.description)
sv.causality = fmiScalarVariable.causalityOutput
sv.variability = fmiScalarVariable.variabilityContinuous
sv.initial = fmiScalarVariable.initialCalculated
sv.type = fmiReal()
sv.type.declaredType = fmi_obj.variable.VariableType.Name
self.ModelVariables.append(sv)
def _addLocal(self, fmi_obj):
# Here, do not add anything in the ModelStructure
sv = fmiScalarVariable()
sv.name = str(fmi_obj.name) #*
sv.valueReference = int(fmi_obj.reference) #*
sv.description = str(fmi_obj.description)
sv.causality = fmiScalarVariable.causalityLocal
sv.variability = fmiScalarVariable.variabilityContinuous
sv.initial = fmiScalarVariable.initialCalculated
sv.type = fmiReal()
sv.type.declaredType = fmi_obj.variable.VariableType.Name
self.ModelVariables.append(sv)
def _addParameter(self, fmi_obj):
sv = fmiScalarVariable()
sv.name = str(fmi_obj.name) #*
sv.valueReference = int(fmi_obj.reference) #*
sv.description = str(fmi_obj.description)
sv.causality = fmiScalarVariable.causalityParameter
sv.variability = fmiScalarVariable.variabilityTunable # If it is tunable it can be changed during the simulation
sv.initial = fmiScalarVariable.initialExact
sv.type = fmiReal()
sv.type.unit = self._formatUnits(fmi_obj.parameter.Units)
sv.type.start = fmi_obj.parameter.GetValue(list(fmi_obj.indexes))
self.ModelVariables.append(sv)
def _addSTN(self, fmi_obj):
sv = fmiScalarVariable()
sv.name = str(fmi_obj.name) #*
sv.valueReference = int(fmi_obj.reference) #*
sv.description = str(fmi_obj.description)
sv.causality = fmiScalarVariable.causalityParameter # If it is input then it can be changed by the simulator
sv.variability = fmiScalarVariable.variabilityTunable
sv.initial = fmiScalarVariable.initialExact
sv.type = fmiString()
sv.type.start = fmi_obj.stn.ActiveState
self.ModelVariables.append(sv)
"""
def _addNumberOfPointsInDomain(self, fmi_obj):
sv = fmiScalarVariable()
sv.name = str(fmi_obj.name) #*
sv.valueReference = int(fmi_obj.reference) #*
sv.description = str(fmi_obj.description) + ' [%s]' % fmi_obj.units
sv.causality = fmiScalarVariable.causalityLocal
sv.variability = fmiScalarVariable.variabilityConstant
sv.initial = fmiScalarVariable.initialExact
self.ModelVariables.append(sv)
def _addDomainPoints(self, fmi_obj):
sv = fmiScalarVariable()
sv.name = str(fmi_obj.name) #*
sv.valueReference = int(fmi_obj.reference) #*
sv.description = str(fmi_obj.description) + ' [%s]' % fmi_obj.units
sv.causality = fmiScalarVariable.causalityParameter
sv.variability = fmiScalarVariable.variabilityTunable
sv.initial = fmiScalarVariable.initialExact
self.ModelVariables.append(sv)
def _addAssignedVariable(self, fmi_obj):
sv = fmiScalarVariable()
sv.name = str(fmi_obj.name) #*
sv.valueReference = int(fmi_obj.reference) #*
sv.description = str(fmi_obj.description) + ' [%s]' % fmi_obj.units
sv.causality = fmiScalarVariable.causalityParameter
sv.variability = fmiScalarVariable.variabilityTunable
sv.initial = fmiScalarVariable.initialExact
self.ModelVariables.append(sv)
def _addAlgebraicVariable(self, fmi_obj):
sv = fmiScalarVariable()
sv.name = str(fmi_obj.name) #*
sv.valueReference = int(fmi_obj.reference) #*
sv.description = str(fmi_obj.description) + ' [%s]' % fmi_obj.units
sv.causality = fmiScalarVariable.causalityLocal
sv.variability = fmiScalarVariable.variabilityContinuous
sv.initial = fmiScalarVariable.initialCalculated
self.ModelVariables.append(sv)
def _addDifferentialVariable(self, fmi_obj):
sv = fmiScalarVariable()
sv.name = str(fmi_obj.name) #*
sv.valueReference = int(fmi_obj.reference) #*
sv.description = str(fmi_obj.description) + ' [%s]' % fmi_obj.units
sv.causality = fmiScalarVariable.causalityLocal
sv.variability = fmiScalarVariable.variabilityContinuous
sv.initial = fmiScalarVariable.initialCalculated
self.ModelVariables.append(sv)
def _addInletPortVariable(self, fmi_obj):
sv = fmiScalarVariable()
sv.name = str(fmi_obj.name) #*
sv.valueReference = int(fmi_obj.reference) #*
sv.description = str(fmi_obj.description) + ' [%s]' % fmi_obj.units
sv.causality = fmiScalarVariable.causalityInput
sv.variability = fmiScalarVariable.variabilityContinuous
sv.initial = fmiScalarVariable.initialExact
self.ModelVariables.append(sv)
def _addOutletPortVariable(self, fmi_obj):
sv = fmiScalarVariable()
sv.name = str(fmi_obj.name) #*
sv.valueReference = int(fmi_obj.reference) #*
sv.description = str(fmi_obj.description) + ' [%s]' % fmi_obj.units
sv.causality = fmiScalarVariable.causalityOutput
sv.variability = fmiScalarVariable.variabilityContinuous
sv.initial = fmiScalarVariable.initialCalculated
self.ModelVariables.append(sv)
"""
def _addUnitDefinition(self, dae_unit):
dae_bu = dae_unit.baseUnit
unit = fmiUnit()
unit.name = self._formatUnits(dae_unit) # *
unit.baseUnit = fmiBaseUnit()
unit.baseUnit.factor = dae_bu.multiplier
unit.baseUnit.offset = 0.0
if dae_bu.L != 0:
unit.baseUnit.m = int(dae_bu.L)
if dae_bu.M != 0:
unit.baseUnit.kg = int(dae_bu.M)
if dae_bu.T != 0:
unit.baseUnit.s = int(dae_bu.T)
if dae_bu.C != 0:
unit.baseUnit.cd = int(dae_bu.C)
if dae_bu.I != 0:
unit.baseUnit.A = int(dae_bu.I)
if dae_bu.O != 0:
unit.baseUnit.K = int(dae_bu.O)
if dae_bu.N != 0:
unit.baseUnit.mol = int(dae_bu.N)
# If all are 0, set rad = 1
if dae_bu.L == 0 and dae_bu.M == 0 and dae_bu.T == 0 and dae_bu.C == 0 and dae_bu.I == 0 and dae_bu.O == 0 and dae_bu.N == 0:
unit.baseUnit.rad = 1
self.UnitDefinitions.append(unit)
def _addTypeDefinition(self, var_type):
st = fmiSimpleType()
st.name = var_type.Name #*
st.type = fmiReal()
st.type.unit = self._formatUnits(var_type.Units)
st.type.min = var_type.LowerBound
st.type.max = var_type.UpperBound
st.type.nominal = var_type.InitialGuess
self.TypeDefinitions.append(st)