"""***********************************************************************************
simulator.py
DAE Tools: pyDAE module, www.daetools.com
Copyright (C) Dragan Nikolic
***********************************************************************************
DAE Tools is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License version 3 as published by the Free Software
Foundation. DAE Tools is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with the
DAE Tools software; if not, see <http://www.gnu.org/licenses/>.
***********************************************************************************"""
import os, platform, sys, tempfile, traceback, webbrowser, math
from os.path import join, realpath, dirname
from daetools.pyDAE import *
from time import ctime, time, localtime, strftime, struct_time
from PyQt5 import QtCore, QtGui, QtWidgets
from .simulator_ui import Ui_SimulatorDialog
from . import auxiliary
from . import simulation_explorer
python_major = sys.version_info[0]
python_minor = sys.version_info[1]
python_build = sys.version_info[2]
images_dir = join(dirname(__file__), 'images')
[docs]class daeTextEditLog(daeBaseLog):
def __init__(self, TextEdit, ProgressBar, ProgressLabel, App):
daeBaseLog.__init__(self)
self.TextEdit = TextEdit
self.ProgressBar = ProgressBar
self.ProgressLabel = ProgressLabel
self.App = App
self.time = time()
@property
def Name(self):
return 'daeTextEditLog'
[docs] def SetProgress(self, progress):
daeBaseLog.SetProgress(self, progress)
self.ProgressBar.setValue(int(self.Progress))
self.ProgressLabel.setText(self.ETA)
#self.App.processEvents()
[docs] def Message(self, message, severity):
self.TextEdit.append(self.IndentString + message)
if self.TextEdit.isVisible() == True:
self.TextEdit.update()
self.App.processEvents()
[docs]class daeSimulator(QtWidgets.QDialog):
def __init__(self, app, **kwargs):
QtWidgets.QDialog.__init__(self)
self.ui = Ui_SimulatorDialog()
self.ui.setupUi(self)
self.setAttribute(QtCore.Qt.WA_DeleteOnClose)
self.setWindowIcon(QtGui.QIcon(join(images_dir, 'daetools-48x48.png')))
font = QtGui.QFont()
if platform.system() == 'Linux':
font.setFamily("Monospace")
font.setPointSize(9)
elif platform.system() == 'Darwin':
font.setFamily("Monaco")
font.setPointSize(10)
else:
font.setFamily("Courier New")
font.setPointSize(9)
self.ui.textEdit.setFont(font)
self.setWindowTitle("DAE Tools Simulator v%s [py%d.%d]" % (daeVersion(True), python_major, python_minor))
self.available_la_solvers = auxiliary.getAvailableLASolvers()
self.available_nlp_solvers = auxiliary.getAvailableNLPSolvers()
self.ui.DAESolverComboBox.addItem("Sundials IDAS")
for i, la in enumerate(self.available_la_solvers):
self.ui.LASolverComboBox.addItem(la[0], userData = la[1])
self.ui.LASolverComboBox.setItemData(i, la[2], QtCore.Qt.ToolTipRole)
for i, nlp in enumerate(self.available_nlp_solvers):
self.ui.MINLPSolverComboBox.addItem(nlp[0], userData = nlp[1])
self.ui.MINLPSolverComboBox.setItemData(i, nlp[2], QtCore.Qt.ToolTipRole)
menuRun = QtWidgets.QMenu()
actionRun = QtWidgets.QAction('Run', self)
actionShowExplorerAndRun = QtWidgets.QAction('Show simulation explorer and run', self)
menuRun.addAction(actionRun)
menuRun.addAction(actionShowExplorerAndRun)
self.ui.RunButton.setMenu(menuRun)
self.ui.AboutButton.clicked.connect(self.slotAbout)
self.ui.ResumeButton.clicked.connect(self.slotResume)
self.ui.PauseButton.clicked.connect(self.slotPause)
self.ui.MatrixButton.clicked.connect(self.slotOpenSparseMatrixImage)
self.ui.ExportButton.clicked.connect(self.slotExportSparseMatrixAsMatrixMarketFormat)
actionRun.triggered.connect(self.slotRun)
actionShowExplorerAndRun.triggered.connect(self.slotShowExplorerAndRun)
self.app = app
self.simulation = kwargs.get('simulation', None)
self.optimization = kwargs.get('optimization', None)
self.datareporter = kwargs.get('datareporter', None)
self.log = kwargs.get('log', None)
self.daesolver = kwargs.get('daesolver', None)
self.relativeTolerance = kwargs.get('relativeTolerance', None)
self.lasolver = kwargs.get('lasolver', None)
self.nlpsolver = kwargs.get('nlpsolver', None)
self.nlpsolver_setoptions_fn = kwargs.get('nlpsolver_setoptions_fn', None)
self.lasolver_setoptions_fn = kwargs.get('lasolver_setoptions_fn', None)
self.generate_code_fn = kwargs.get('generate_code_fn', None)
self.run_after_simulation_init_fn = kwargs.get('run_after_simulation_init_fn', None)
self.run_before_simulation_fn = kwargs.get('run_before_simulation_fn', None)
self.run_after_simulation_fn = kwargs.get('run_after_simulation_fn', None)
if self.app == None:
if not QtCore.QCoreApplication.instance():
self.app = QtWidgets.QApplication(sys.argv)
if self.simulation == None:
raise RuntimeError('daeSimulator: simulation object must not be None')
self.ui.DAESolverComboBox.setEnabled(False)
if self.lasolver == None:
self.ui.LASolverComboBox.setEnabled(True)
else:
# If LA solver has been sent then clear and disable LASolver combo box
self.ui.LASolverComboBox.clear()
self.ui.LASolverComboBox.addItem(self.lasolver.Name)
self.ui.LASolverComboBox.setEnabled(False)
if self.optimization == None:
# If we are simulating then clear and disable MINLPSolver combo box
self.ui.simulationLabel.setText('Simulation')
self.ui.MINLPSolverComboBox.clear()
self.ui.MINLPSolverComboBox.setEnabled(False)
else:
self.ui.simulationLabel.setText('Optimization')
if(self.nlpsolver == None):
self.ui.MINLPSolverComboBox.setEnabled(True)
else:
# If nlpsolver has been sent then clear and disable MINLPSolver combo box
self.ui.MINLPSolverComboBox.clear()
self.ui.MINLPSolverComboBox.addItem(self.nlpsolver.Name)
self.ui.MINLPSolverComboBox.setEnabled(False)
self.ui.SimulationLineEdit.insert(self.simulation.m.Name)
self.ui.ReportingIntervalDoubleSpinBox.setValue(self.simulation.ReportingInterval)
self.ui.TimeHorizonDoubleSpinBox.setValue(self.simulation.TimeHorizon)
cfg = daeGetConfig()
tcpip = cfg.GetString("daetools.datareporting.tcpipDataReceiverAddress", "127.0.0.1")
port = cfg.GetInteger("daetools.datareporting.tcpipDataReceiverPort", 50000)
self.ui.DataReporterTCPIPAddressLineEdit.setText( tcpip + ':' + str(port) )
[docs] def done(self, status):
#print('daeSimulator.done = {0}'.format(status))
if self.simulation:
self.simulation.Pause()
elif self.optimization:
pass
return QtWidgets.QDialog.done(self, status)
"""
def __del__(self):
# Calling Finalize is not mandatory since it will be called
# in a simulation/optimization destructor
if self.simulation:
self.simulation.Finalize()
elif self.optimization:
self.optimization.Finalize()
"""
#@QtCore.pyqtSlot()
[docs] def slotAbout(self):
from daetools.dae_plotter.about import daeAboutDialog
dlg = daeAboutDialog()
dlg.exec_()
#@QtCore.pyqtSlot()
[docs] def slotPause(self):
self.simulation.Pause()
#@QtCore.pyqtSlot()
[docs] def slotResume(self):
self.simulation.Resume()
self._AfterRunCalls()
#@QtCore.pyqtSlot()
[docs] def slotRun(self):
self.run(False)
self._AfterRunCalls()
#@QtCore.pyqtSlot()
[docs] def slotShowExplorerAndRun(self):
self.run(True)
[docs] def run(self, showExplorer):
try:
self.ui.textEdit.clear()
tcpipaddress = str(self.ui.DataReporterTCPIPAddressLineEdit.text())
self.simulation.ReportingInterval = float(self.ui.ReportingIntervalDoubleSpinBox.value())
self.simulation.TimeHorizon = float(self.ui.TimeHorizonDoubleSpinBox.value())
if self.datareporter == None:
self.datareporter = daeTCPIPDataReporter()
simName = self.simulation.m.Name + strftime(" [%d.%m.%Y %H:%M:%S]", localtime())
if(self.datareporter.Connect(str(tcpipaddress), simName) == False):
self.showMessage("Cannot connect data reporter!\nDid you forget to start DAE Tools Plotter?")
self.datareporter = None
raise RuntimeError("Cannot connect daeTCPIPDataReporter")
if self.log == None:
self.log = daeTextEditLog(self.ui.textEdit, self.ui.progressBar, self.ui.progressLabel, self.app)
else:
log1 = self.log
log2 = daeTextEditLog(self.ui.textEdit, self.ui.progressBar, self.ui.progressLabel, self.app)
self.log = daeDelegateLog()
self.log.AddLog(log1)
self.log.AddLog(log2)
self._logs = [log1, log2]
if self.daesolver == None:
self.daesolver = daeIDAS()
if self.relativeTolerance:
self.daesolver.RelativeTolerance = self.relativeTolerance
lasolverIndex = -1
minlpsolverIndex = -1
# If nlpsolver is not sent then choose it from the selection
if self.nlpsolver == None and len(self.available_nlp_solvers) > 0:
_index = self.ui.MINLPSolverComboBox.itemData(self.ui.MINLPSolverComboBox.currentIndex())
if isinstance(_index, QtCore.QVariant):
minlpsolverIndex = _index.toInt()[0]
else:
if _index != None and _index >= 0:
minlpsolverIndex = int(_index)
# If lasolver is not sent then create it based on the selection
if self.lasolver == None and len(self.available_la_solvers) > 0:
_index = self.ui.LASolverComboBox.itemData(self.ui.LASolverComboBox.currentIndex())
if isinstance(_index, QtCore.QVariant):
lasolverIndex = _index.toInt()[0]
else:
if _index != None and _index >= 0:
lasolverIndex = int(_index)
self.lasolver = auxiliary.createLASolver(lasolverIndex)
self.ui.RunButton.setEnabled(False)
self.ui.MINLPSolverComboBox.setEnabled(False)
self.ui.DAESolverComboBox.setEnabled(False)
self.ui.LASolverComboBox.setEnabled(False)
self.ui.DataReporterTCPIPAddressLineEdit.setEnabled(False)
self.ui.ReportingIntervalDoubleSpinBox.setEnabled(False)
self.ui.TimeHorizonDoubleSpinBox.setEnabled(False)
self.ui.MatrixButton.setEnabled(False)
self.ui.ExportButton.setEnabled(False)
if lasolverIndex in [auxiliary.laAmesos_Klu,
auxiliary.laAmesos_Superlu,
auxiliary.laAmesos_Umfpack,
auxiliary.laAztecOO,
auxiliary.laIntelPardiso,
auxiliary.laSuperLU,
auxiliary.laSuperLU_MT,
auxiliary.laSuperLU_CUDA,
auxiliary.laCUSP]:
self.ui.MatrixButton.setEnabled(True)
self.ui.ExportButton.setEnabled(True)
if self.lasolver:
if isinstance(self.lasolver, tuple):
if len(self.lasolver) != 2:
raise RuntimeError('Invalid linear solver specified: %s' % self.lasolver)
lasolverType = self.lasolver[0]
preconditioner = self.lasolver[1]
self.daesolver.SetLASolver(lasolverType, preconditioner)
else:
self.daesolver.SetLASolver(self.lasolver)
if self.optimization == None:
self.simulation.Initialize(self.daesolver, self.datareporter, self.log)
if self.run_after_simulation_init_fn:
self.run_after_simulation_init_fn(self.simulation, self.log)
if(self.lasolver_setoptions_fn):
self.lasolver_setoptions_fn(self.lasolver)
self.simulation.SolveInitial()
if not self.simulation.IsSolveInitial:
raise RuntimeError('The initial conditions have not been calculated')
if self.run_before_simulation_fn:
self.run_before_simulation_fn(self.simulation, self.log)
if self.generate_code_fn:
self.generate_code_fn(self.simulation, self.log)
if showExplorer:
explorer = simulation_explorer.daeSimulationExplorer(self.app, simulation = self.simulation)
explorer.exec_()
self.simulation.Run()
# Do not do cleanup nor call Finalize for the user may have been paused the simulation/optimisation
else:
# If nlpsolver is not sent then create it based on the selection
if(self.nlpsolver == None):
self.nlpsolver = auxiliary.createNLPSolver(minlpsolverIndex)
self.optimization.Initialize(self.simulation, self.nlpsolver, self.daesolver, self.datareporter, self.log)
if(self.nlpsolver_setoptions_fn):
self.nlpsolver_setoptions_fn(self.nlpsolver)
self.optimization.Run()
# Do not do cleanup nor call Finalize for the user may have been paused the simulation/optimisation
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
messages = traceback.format_tb(exc_traceback)
self.ui.textEdit.append('\n'.join(messages))
self.ui.textEdit.append(str(e))
if self.ui.textEdit.isVisible() == True:
self.ui.textEdit.update()
self.app.processEvents()
def _AfterRunCalls(self):
if self.optimization == None:
if self.simulation.CurrentTime == self.simulation.TimeHorizon: # We reached the end of simulation
if self.run_after_simulation_fn:
self.run_after_simulation_fn(self.simulation, self.log)
self.simulation.Finalize()
else:
if self.simulation.CurrentTime == self.simulation.TimeHorizon: # We reached the end of simulation
if self.run_after_simulation_fn:
self.run_after_simulation_fn(self.simulation, self.log)
self.optimization.Finalize()
[docs] def showMessage(self, msg):
QtWidgets.QMessageBox.warning(self, "daeSimulator", str(msg))
#@QtCore.pyqtSlot()
[docs] def slotOpenSparseMatrixImage(self):
if(self.lasolver != None):
tmpdir = tempfile.gettempdir() + os.sep
matName = tmpdir + self.simulation.m.Name + ".xpm"
self.lasolver.SaveAsXPM(matName)
url = QtCore.QUrl(matName)
try:
from daetools.pyDAE.web_view_dialog import daeWebView
wv = daeWebView(url)
wv.resize(400, 400)
wv.setWindowTitle("Sparse matrix: " + matName)
wv.exec_()
except Exception as e:
webbrowser.open_new_tab(matName)
#@QtCore.pyqtSlot()