Source code for daetools.pyDAE.hr_upwind_scheme

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""***********************************************************************************
                           hr_upwind_scheme.py
                DAE Tools: pyDAE module, www.daetools.com
                Copyright (C) Dragan Nikolic
***********************************************************************************
DAE Tools is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License version 3 as published by the Free Software
Foundation. DAE Tools is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with the
DAE Tools software; if not, see <http://www.gnu.org/licenses/>.
************************************************************************************"""
import sys, numpy, math
from daetools.pyDAE import *

# These function operate on adouble objects
MIN     = Min
MAX     = Max
FABS    = Abs
beta    = 1.5 # For Sweby and Osher (1 <= beta  <= 2)
theta   = 1.5 # For van Leer minmod (1 <= theta <= 2)

[docs]class daeHRUpwindSchemeEquation(object): supported_flux_limiters = []
[docs] def __init__(self, variable, domain, phi, r_epsilon = 1e-10, reversedFlow = False): """ """ if not callable(phi): raise RuntimeError('Invalid flux limiter function specified (must be a callable)') if not callable(variable): raise RuntimeError('Invalid variable specified (must be a daeVariable object or a callable)') if not callable(domain): raise RuntimeError('Invalid domain specified (must be a daeDomain object or a callable)') self.c = variable # daeVariable object (or a callable) self.x = domain # daeDomain object self.phi = phi # Flux limiter function (a callable) self.r_epsilon = r_epsilon # epsilon in the upwind ratio (r) expression self.reversedFlow = reversedFlow
[docs] def dc_dt(self, i, variable = None): """Accumulation term in the cell-centered finite-volume discretisation: :math:`\int_{\Omega_i} {\partial c_i \over \partial t} dx` """ x = self.x Nx = self.x.NumberOfPoints if variable: if not callable(variable): raise RuntimeError('Invalid variable specified (must be a daeVariable object or a callable)') c = variable else: c = self.c dc_dt = lambda j: dt(c(j)) if not self.reversedFlow: if i == 0: # Sto ovde imam za 0?? return (x[i+1]-x[i]) * dc_dt(i) else: return (x[i]-x[i-1]) * dc_dt(i) else: # reversible flow if i == Nx-1: # Sto ovde imam za Nx-1?? return (x[Nx-1]-x[Nx-2]) * dc_dt(Nx-1) else: return (x[i]-x[i+1]) * dc_dt(i)
[docs] def dc_dx(self, i, S = None, variable = None): """Convection term in the cell-centered finite-volume discretisation: :math:`c_{i + {1 \over 2}} - c_{i - {1 \over 2}}`. Cell-face state :math:`c_{i+{1 \over 2}}` is given as: :math:`{c}_{i + {1 \over 2}} = c_i + \phi \left( r_{i + {1 \over 2}} \\right) \left( c_i - c_{i-1} \\right)` where :math:`\phi` is the flux limiter function and :math:`r_{i + {1 \over 2}}` the upwind ratio of consecutive solution gradients: :math:`r_{i + {1 \over 2}} = {{c_{i+1} - c_{i} + \epsilon} \over {c_{i} - c_{i-1} + \epsilon}}`. If the source term integral :math:`S= {1 \over u} \int_{\Omega_i} s(x) dx` is not ``None`` then the convection term is given as: :math:`(c-S)_{i + {1 \over 2}} - (c-S)_{i - {1 \over 2}}`. """ if variable: if not callable(variable): raise RuntimeError('Invalid variable specified (must be a daeVariable object or a callable)') c = variable else: c = self.c if not self.reversedFlow: return self._c_edge_plus(i, c, S) - self._c_edge_plus(i-1, c, S) else: # reversible flow return self._c_edge_plus_rev(i, c, S) - self._c_edge_plus_rev(i+1, c, S)
[docs] def d2c_dx2(self, i, variable = None): """Diffusion term in the cell-centered finite-volume discretisation: :math:`\left( \partial c_i \over \partial x \\right)_{i + {1 \over 2}} - \left( \partial c_i \over \partial x \\right)_{i - {1 \over 2}}` """ if variable: if not callable(variable): raise RuntimeError('Invalid variable specified (must be a daeVariable object or a callable)') c = variable else: c = self.c if not self.reversedFlow: return self._dc_dx_edge_plus(i, c) - self._dc_dx_edge_plus(i-1, c) else: # reversible flow return self._dc_dx_edge_plus_rev(i, c) - self._dc_dx_edge_plus_rev(i+1, c)
[docs] def source(self, s, i): """Source term in the cell-centered finite-volume discretisation: :math:`\int_{\Omega_i} s(x) dx` """ # The cell average shouldn't depend on a flow direction. # Anyhow, everything is fine as long as the grid is uniform. return self._AverageOverCell(s,i)
######################################################################## # Implementation details ######################################################################## def _r(self, i, S = None): # Upwind ratio of consecutive solution gradients. # It may include the source term integral S(x). eps = self.r_epsilon c = self.c cs = lambda j: c(j)-S(j) if S else c(j) return (cs(i+1) - cs(i) + eps) / (cs(i) - cs(i-1) + eps) def _r_rev(self, i, S = None): # Upwind ratio of consecutive solution gradients for the reversed flow. # It may include the source term integral S(x). eps = self.r_epsilon c = self.c cs = lambda j: c(j)-S(j) if S else c(j) return (cs(i-1) - cs(i) + eps) / (cs(i) - cs(i+1) + eps) def _c_edge_plus(self, i, c, S): # c at the i+1/2 face (cell outlet) phi = self.phi r = self._r x = self.x Nx = self.x.NumberOfPoints cs = lambda j: c(j)-S(j) if S else c(j) if i == 0: # Right face of the first cell: central interpolation (k=1) return 0.5 * (cs(0) + cs(1)) elif i == Nx-1: # Right face of the last cell: one-sided upwind scheme (k=-1) return cs(i) + 0.5 * (cs(i) - cs(i-1)) elif i > 0 and i < Nx-1: # Other cells: k=1/3 return cs(i) + 0.5 * phi(r(i,S)) * (cs(i) - cs(i-1)) else: raise RuntimeError('c_edge_plus: Invalid index specified: %d (no. points is %d)' % (i, Nx)) def _c_edge_plus_rev(self, i, c, S): # c at the i+1/2 face (cell outlet) for the reversed flow phi = self.phi r = self._r_rev x = self.x Nx = self.x.NumberOfPoints cs = lambda j: c(j)-S(j) if S else c(j) if i == 0: # Left face of the last cell (first in the reversed): return cs(i) + 0.5 * (cs(i) - cs(i+1)) elif i == Nx-1: # Left face of the first cell (last in the reversed): return 0.5 * (cs(Nx-1) + cs(Nx-2)) elif i > 0 and i < Nx-1: # Other cells: k=1/3 return cs(i) + 0.5 * phi(r(i,S)) * (cs(i) - cs(i+1)) else: raise RuntimeError('c_edge_plus_rev: Invalid index specified: %d (no. points is %d)' % (i, Nx)) def _dc_dx_edge_plus(self, i, c): # Diffusion at the i+1/2 face (cell outlet) x = self.x Nx = self.x.NumberOfPoints if i == 0: # Right face of the first cell: biased central-difference return (-8*c(i) + 9*c(i+1) + c(i+2)) / (3 * (x[i+1] - x[i])) elif i == Nx-1: # Right face of the last cell: biased central-difference return (8*c(i) - 9*c(i-1) + c(i-2)) / (3 * (x[i] - x[i-1])) elif i > 0 and i < Nx-1: # Other cells: central-difference O(h^2) return (c(i+1) - c(i)) / (x[i+1] - x[i]) else: raise RuntimeError('dc_dx_edge_plus: Invalid index specified: %d (no. points is %d)' % (i, Nx)) def _dc_dx_edge_plus_rev(self, i, c): # Diffusion at the i+1/2 face (cell outlet) for the reversed flow x = self.x Nx = self.x.NumberOfPoints if i == 0: # Left face of the last cell: biased central-difference return (8*c(i) - 9*c(i+1) + c(i+2)) / (3 * (x[i+1] - x[i])) elif i == Nx-1: # Left face of the first cell: biased central-difference return (-8*c(i) + 9*c(i-1) + c(i-2)) / (3 * (x[i] - x[i-1])) elif i > 0 and i < Nx-1: # Other cells: central-difference O(h^2) #return (c(i) - c(i+1)) / (x[i+1] - x[i]) return (c(i-1) - c(i)) / (x[i] - x[i-1]) else: raise RuntimeError('dc_dx_edge_plus_rev: Invalid index specified: %d (no. points is %d)' % (i, Nx)) def _AverageOverCell(self, f, i): """ i+1/2 Integral over a cell i: S = integral (f(x)*dx) i-1/2 We could use a simple trapezoidal rule: (x - x ) * [f(x ) + f(x )] / 2 i+1/2 i-1/2 i+1/2 i-1/2 or to assume the source everywhere in the cell is equal to the value at the plus edge: (x - x ) * f(x ) i+1/2 i-1/2 i+1/2 The latter gives better results (double check why). """ x = self.x xp = self.x.Points Nx = self.x.NumberOfPoints if i == 0: # The first cell return f(i) * (xp[i+1] - xp[i]) elif i == Nx-1: # The last cell return f(i) * (xp[i] - xp[i-1]) elif i > 0 and i < Nx-1: # Other cells return f(i) * (xp[i] - xp[i-1]) else: raise RuntimeError('VolumeAverage: Invalid index specified: %d (no. points is %d)' % (i, Nx)) @staticmethod def Phi_CHARM(r): """CHARM""" return r * (3*r+1) / ((r+1)**2) @staticmethod def Phi_HCUS(r): """HCUS""" return 1.5 * (r + FABS(r)) / (r + 2) @staticmethod def Phi_HQUICK(r): """HQUICK""" return 2.0 * (r + FABS(r)) / (r + 3) @staticmethod def Phi_Koren(r): """Koren""" #r = 1.0/r return MAX(0.0, MIN(2.0*r, MIN(1.0/3.0 + 2.0*r/3, 2.0))) @staticmethod def Phi_minmod(r): """minmod""" return MAX(0.0, MIN(1.0, r)) @staticmethod def Phi_monotinized_central(r): """MC""" return MAX(0.0, MIN(2.0*r, MIN(0.5*(1+r), 2.0))) @staticmethod def Phi_Osher(r): """Osher""" # 1 <= beta <= 2 return MAX(0.0, MIN(r, beta)) @staticmethod def Phi_ospre(r): """ospre""" return 1.5 * (r*r + r) / (r*r + r + 1.0) @staticmethod def Phi_smart(r): """smart""" return MAX(0.0, MIN(2.0*r, MIN(0.25+0.75*r, 4.0))) @staticmethod def Phi_superbee(r): """superbee""" return MAX(0.0, MAX(MIN(2.0*r, 1.0), MIN(r, 2.0))) @staticmethod def Phi_Sweby(r): """Sweby""" # 1 <= beta <= 2 return MAX(0.0, MAX(MIN(beta*r, 1.0), MIN(r, beta))) @staticmethod def Phi_UMIST(r): """UMIST""" return MAX(0.0, MIN(2.0*r, MIN(0.25+0.75*r, MIN(0.75+0.25*r, 2.0)))) @staticmethod def Phi_vanAlbada1(r): """vanAlbada1""" return (r*r + r) / (r*r + 1.0) @staticmethod def Phi_vanAlbada2(r): """vanAlbada2""" return (2.0*r) / (r*r + 1.0) @staticmethod def Phi_vanLeer(r): """vanLeer""" return (r + FABS(r)) / (1.0 + FABS(r)) @staticmethod def Phi_vanLeer_minmod(r): """vanLeerMinmod""" # 1 <= theta <= 2 return MAX(0.0, MIN(theta*r, MIN(0.5*(1.0+r), theta)))
daeHRUpwindSchemeEquation.supported_flux_limiters = [daeHRUpwindSchemeEquation.Phi_HCUS, daeHRUpwindSchemeEquation.Phi_HQUICK, daeHRUpwindSchemeEquation.Phi_Koren, daeHRUpwindSchemeEquation.Phi_monotinized_central, daeHRUpwindSchemeEquation.Phi_minmod, daeHRUpwindSchemeEquation.Phi_Osher, daeHRUpwindSchemeEquation.Phi_ospre, daeHRUpwindSchemeEquation.Phi_smart, daeHRUpwindSchemeEquation.Phi_superbee, daeHRUpwindSchemeEquation.Phi_Sweby, daeHRUpwindSchemeEquation.Phi_UMIST, daeHRUpwindSchemeEquation.Phi_vanAlbada1, daeHRUpwindSchemeEquation.Phi_vanAlbada2, daeHRUpwindSchemeEquation.Phi_vanLeer, daeHRUpwindSchemeEquation.Phi_vanLeer_minmod] def plot_flux_limiters(): import matplotlib.pyplot n = 30 figure = matplotlib.pyplot.figure(figsize = (9,9), dpi=100, facecolor='white') figure.canvas.setWindowTitle('Flux Limiters') tvdx = numpy.zeros(n) tvdy1 = numpy.zeros(n) tvdy2 = numpy.zeros(n) tvdx[0:5] = numpy.linspace(1e-10, 0.5, 5) tvdx[5:10] = numpy.linspace(0.5, 1.0, 5) tvdx[10:20] = numpy.linspace(1.0, 2.0, 10) tvdx[20:30] = numpy.linspace(2.0, 3.0, 10) tvdy1[0:5] = numpy.linspace(0.0, 1.0, 5) tvdy1[5:10] = numpy.linspace(1.0, 1.0, 5) tvdy1[10:20] = numpy.linspace(1.0, 2.0, 10) tvdy1[20:30] = numpy.linspace(2.0, 2.0, 10) tvdy2[0:5] = numpy.linspace(0.0, 0.5, 5) tvdy2[5:10] = numpy.linspace(0.5, 1.0, 5) tvdy2[10:20] = numpy.linspace(1.0, 1.0, 10) tvdy2[20:30] = numpy.linspace(1.0, 1.0, 10) rs = numpy.linspace(1e-10, 3.0, n) rs_koren = rs phis = numpy.zeros(n) counter = 1 for fl in daeHRUpwindScheme.supported_flux_limiters: if fl == daeHRUpwindScheme.Phi_Koren: phis[:] = [fl(r) for r in rs_koren] else: phis[:] = [fl(r) for r in rs] axes = matplotlib.pyplot.subplot(4, 4, counter) fp9 = matplotlib.font_manager.FontProperties(family='Cantarell', style='normal', variant='normal', weight='normal', size=9) fp11 = matplotlib.font_manager.FontProperties(family='Cantarell', style='normal', variant='normal', weight='normal', size=12) axes.plot(rs, phis, 'r-') axes.fill_between(tvdx, tvdy1, tvdy2, alpha = 0.3) axes.set_title(fl.__doc__, fontproperties=fp11) axes.set_xlim((0.0, 3.0)) axes.set_ylim((0.0, 3.0)) axes.set_xticks([0, 1, 2, 3]) axes.set_yticks([0, 1, 2, 3]) axes.set_ylabel('$\phi( r )$', fontproperties=fp11) axes.set_xlabel('$r$', fontproperties=fp11) axes.grid(True) counter += 1 for xlabel in axes.get_xticklabels(): xlabel.set_fontproperties(fp9) for ylabel in axes.get_yticklabels(): ylabel.set_fontproperties(fp9) matplotlib.pyplot.tight_layout() matplotlib.pyplot.show() if __name__ == "__main__": # Change the functions definitions to work with floats MIN = min MAX = max FABS = math.fabs beta = 1.5 # For Sweby and Osher (1 <= beta <= 2) theta = 1.5 # For van Leer minmod (1 <= theta <= 2) plot_flux_limiters()