from __future__ import division, print_function
import abc, math
from PDSim.misc.datatypes import arraym
import numpy as np
import matplotlib.pyplot as plt
[docs]
class AbstractODEIntegrator(object):
__metaclass__ = abc.ABCMeta
[docs]
@abc.abstractmethod
def get_initial_array(self):
pass
[docs]
@abc.abstractmethod
def pre_step_callback(self):
pass
[docs]
@abc.abstractmethod
def post_deriv_callback(self):
pass
[docs]
@abc.abstractmethod
def post_step_callback(self):
pass
[docs]
@abc.abstractmethod
def derivs(self):
pass
[docs]
@abc.abstractmethod
def do_integration(self):
pass
[docs]
class AbstractSimpleEulerODEIntegrator(AbstractODEIntegrator):
__metaclass__ = abc.ABCMeta
[docs]
def do_integration(self,N,tmin,tmax):
"""
The simple Euler ODE integrator
Parameters
----------
N : integer
Number of steps taken (not including the initial step).
tmin : float
Starting value of the independent variable. ``t`` is in the closed range [``tmin``, ``tmax``]
tmax : float
Ending value for the independent variable. ``t`` is in the closed range [``tmin``, ``tmax``]
Returns
-------
abort_flag
If an abort has been requested (by returning a value other than ``False`` from ``premature_termination``), return value from ``premature_termination``. ``None`` otherwise
"""
# Step variables
self.t0 = tmin
self.h = (tmax-tmin)/(N)
# Get the initial value
self.xold = self.get_initial_array()
for self.Itheta in range(N):
# Check for termination
abort = self.premature_termination()
if abort != False: return abort
# Call the pre-step callback
self.pre_step_callback()
# Derivatives evaluated at old values of t = t0
self.f1 = self.derivs(self.t0, self.xold)
# Call post derivative callback after the first derivative evaluation (which might cache values)
self.post_deriv_callback()
# Calculate the new values
self.xnew = self.xold + self.h*self.f1
# Everything from this step is finished, now update for the next
# step coming
self.t0 += self.h
self.xold = self.xnew
# Call the post-step callback
self.post_step_callback()
# Bump up once more
self.Itheta += 1
# Make sure we end up at the right place
assert((self.t0 - tmax) < 1e-10)
# No termination was requested
return False
[docs]
class AbstractHeunODEIntegrator(AbstractODEIntegrator):
__metaclass__ = abc.ABCMeta
[docs]
def do_integration(self, N, tmin, tmax):
"""
The Heun system of ODE integrator
Parameters
----------
N : integer
Number of steps taken. There will be N+1 entries in the state matrices
tmin : float
Starting value of the independent variable. ``t`` is in the closed range [``tmin``, ``tmax``]
tmax : float
Ending value for the independent variable. ``t`` is in the closed range [``tmin``, ``tmax``]
Returns
-------
abort_flag
If an abort has been requested (by returning a value other than ``False`` from ``premature_termination``), return value from ``premature_termination``. ``None`` otherwise
"""
# Step variables
self.t0 = tmin
self.h = (tmax-tmin)/(N)
# Get the initial value
self.xold = self.get_initial_array()
for self.Itheta in range(N):
# Check for termination
abort = self.premature_termination()
if abort != False: return abort
# Call the pre-step callback
self.pre_step_callback()
# Step 1: derivatives evaluated at old values
self.f1 = self.derivs(self.t0, self.xold)
# Call post derivative callback after the first derivative evaluation (which might cache values)
self.post_deriv_callback()
# Predicted values based on extrapolation using initial derivatives
self.xtemp = self.xold + self.h*self.f1
# Step 2: Evaluated at predicted step
self.f2 = self.derivs(self.t0 + self.h, self.xtemp)
# Get corrected values
self.xnew = self.xold + self.h/2.0*(self.f1 + self.f2)
# Everything from this step is finished, now update for the next
# step coming
self.t0 += self.h
self.xold = self.xnew
# Call the post-step callback
self.post_step_callback()
# Bump up once more
self.Itheta += 1
# No termination was requested
return False
[docs]
class AbstractRK45ODEIntegrator(AbstractODEIntegrator):
__metaclass__ = abc.ABCMeta
[docs]
def do_integration(self,
tmin=0,
tmax=2.0*math.pi,
hmin=1e-4,
eps_allowed=1e-10,
step_relax=0.9,
valves_callback = None,
UseCashKarp=True,
**kwargs):
"""
This function implements an adaptive Runge-Kutta-Feldberg 4th/5th order
solver for the system of equations
Parameters
----------
hmin : float
Minimum step size, something like 1e-5 usually is good. Don't make this too big or you may not be able to get a stable solution
tmin : float
Starting value of the independent variable. ``t`` is in the closed range [``tmin``, ``tmax``]
tmax : float
Ending value for the independent variable. ``t`` is in the closed range [``tmin``, ``tmax``]
eps_allowed : float
Maximum absolute error of any CV per step allowed. Don't make this parameter too big or you may not be able to get a stable solution. Also don't make it too small because then you are going to run into truncation error.
step_relax : float, optional
The relaxation factor that is used in the step resizing algorithm. Should be less than 1.0; you can play with this parameter to improve the adaptive resizing, but should not be necessary.
Returns
-------
abort_flag
If an abort has been requested (by returning a value other than ``False`` from ``premature_termination``), return value from ``premature_termination``. ``None`` otherwise
Notes
-----
Mathematically the adaptive solver can be expressed as::
k1=h*dy(xn ,t)
k2=h*dy(xn+1.0/4.0*k1 ,t+1.0/4.0*h)
k3=h*dy(xn+3.0/32.0*k1+9.0/32.0*k2 ,t+3.0/8.0*h)
k4=h*dy(xn+1932.0/2197.0*k1-7200.0/2197.0*k2+7296.0/2197.0*k3 ,t+12.0/13.0*h)
k5=h*dy(xn+439.0/216.0*k1-8.0*k2+3680.0/513.0*k3-845.0/4104.0*k4 ,t+h)
k6=h*dy(xn-8.0/27.0*k1+2.0*k2-3544.0/2565.0*k3+1859.0/4104.0*k4-11.0/40.0*k5 ,t+1.0/2.0*h)
where the function dy(y,t) returns a vector of the ODE expressions.
The new value is calculated from::
xnplus=xn+gamma1*k1+gamma2*k2+gamma3*k3+gamma4*k4+gamma5*k5+gamma6*k6
In the adaptive solver, the errors for a given step can be calculated from::
error=1.0/360.0*k1-128.0/4275.0*k3-2197.0/75240.0*k4+1.0/50.0*k5+2.0/55.0*k6
If the maximum absolute error is above allowed error, the step size is decreased and the step is
tried again until the error is below tolerance. If the error is better than required, the step
size is increased to minimize the number of steps required.
Before the step is run, a callback the ``step_callback`` method of this class is called. In the ``step_callback`` callback function you can do anything you want, but you must return
"""
# Get the starting array of variables
self.xold = self.get_initial_array()
# Start at an index of 0
self.Itheta = 0
self.t0 = tmin
self.tmax = tmax
self.h = hmin
# gamma1=16.0/135.0
# gamma2=0.0
# gamma3=6656.0/12825.0
# gamma4=28561.0/56430.0
# gamma5=-9.0/50.0
# gamma6=2.0/55.0
#t is the independent variable here, where t takes on values in the bounded range [tmin,tmax]
while (self.t0 < self.tmax - 1e-10):
# Check for termination
abort = self.premature_termination()
if abort != False: return abort
self.stepAccepted = False
while not self.stepAccepted:
# reset the flag
self.disableAdaptive = False
if self.t0 + self.h > self.tmax:
self.disableAdaptive = True
self.h = self.tmax - self.t0
#
self.pre_step_callback()
# We check stepAccepted again because if the derived class
# sets the variable stepAccepted, we should not actually do the evaluation
if not self.stepAccepted:
if self.h < hmin and not self.disableAdaptive:
# Step is too small, just use the minimum step size
self.h = 1.0*hmin
self.disableAdaptive = True
# Step 1: derivatives evaluated at old values
self.f1 = self.derivs(self.t0, self.xold)
# Call post derivative callback after the first derivative evaluation (which might cache values)
self.post_deriv_callback()
self.xnew1 = self.xold+self.h*(1.0/5.0)*self.f1
self.f2 = self.derivs(self.t0+1.0/5.0*self.h, self.xnew1)
self.xnew2 = self.xold+self.h*(+3.0/40.0*self.f1+9.0/40.0*self.f2)
self.f3 = self.derivs(self.t0+3.0/10.0*self.h, self.xnew2)
self.xnew3 = self.xold+self.h*(3.0/10.0*self.f1-9.0/10.0*self.f2+6.0/5.0*self.f3)
self.f4 = self.derivs(self.t0+3.0/5.0*self.h, self.xnew3)
self.xnew4 = self.xold+self.h*(-11.0/54.0*self.f1+5.0/2.0*self.f2-70/27.0*self.f3+35.0/27.0*self.f4)
self.f5 = self.derivs(self.t0+self.h, self.xnew4)
self.xnew5 = self.xold+self.h*(1631.0/55296*self.f1+175.0/512.0*self.f2+575.0/13824.0*self.f3+44275.0/110592.0*self.f4+253.0/4096.0*self.f5)
self.f6 = self.derivs(self.t0+7/8*self.h, self.xnew5)
# Updated values at the next step using 5-th order
self.xnew = self.xold + self.h*(37/378*self.f1 + 250/621*self.f3 + 125/594*self.f4 + 512/1771*self.f6)
error = self.h*(-277/64512*self.f1+6925/370944*self.f3-6925/202752*self.f4-277.0/14336*self.f5+277/7084*self.f6)
max_error = np.sqrt(np.sum(np.power(error, 2)))
# If the error is too large, make the step size smaller and try
# the step again
if (max_error > eps_allowed):
if not self.disableAdaptive:
# Take a smaller step next time, try again on this step
# But only if adaptive mode is on
self.h *= step_relax*(eps_allowed/max_error)**(0.3)
self.stepAccepted=False
else:
# Accept the step regardless of whether the error
# is too large or not
self.stepAccepted = True
else:
self.stepAccepted = True
else:
pass
# print('accepted')
self.t0 += self.h
self.Itheta += 1
self.xold = self.xnew
self.post_step_callback()
# The error is already below the threshold
if (max_error < eps_allowed and self.disableAdaptive == False and max_error > 0):
# Take a bigger step next time, since eps_allowed>max_error
self.h *= step_relax*(eps_allowed/max_error)**(0.2)
if not (self.t0 - tmax) < 1e-3:
raise AssertionError('(self.t0 - tmax) [' + str(self.t0 - tmax) + '] > 1e-3')
# No termination was requested
return False
if __name__ == '__main__':
class TestIntegrator(object):
"""
Implements the functions needed to satisfy the ABC requirements
This is the problem from wikipedia, where y' = y, with the explicit solution y = exp(t)
"""
def __init__(self):
self.x, self.y = [], []
def post_deriv_callback(self): pass
def premature_termination(self): return False
def get_initial_array(self):
return arraym([1.0])
def pre_step_callback(self):
if self.Itheta == 0:
self.x.append(self.t0)
self.y.append(self.xold[0])
def post_step_callback(self):
self.x.append(self.t0)
self.y.append(self.xold[0])
def derivs(self, t0, xold):
return arraym([xold[0]])
class TestEulerIntegrator(TestIntegrator, AbstractSimpleEulerODEIntegrator):
""" Mixin class using the functions defined in TestIntegrator """
pass
class TestHeunIntegrator(TestIntegrator, AbstractHeunODEIntegrator):
""" Mixin class using the functions defined in TestIntegrator """
pass
class TestRK45Integrator(TestIntegrator, AbstractRK45ODEIntegrator):
""" Mixin class using the functions defined in TestIntegrator """
pass
for N in [4, 11, 21]:
TEI = TestEulerIntegrator()
TEI.do_integration(N, 0.0, 4.0)
plt.plot(TEI.x, TEI.y, 'o-', label = 'Euler: ' + str(N))
for N in [4, 11]:
THI = TestHeunIntegrator()
THI.do_integration(N, 0.0, 4.0)
plt.plot(THI.x, THI.y, '^-', label = 'Heun: ' + str(N))
TRKI = TestRK45Integrator()
TRKI.do_integration(0.0, 4.0, eps_allowed = 1e-5)
plt.plot(TRKI.x, TRKI.y, '^-', label = 'RK45')
t = np.linspace(0, 4, 500)
plt.plot(t, np.exp(t), 'k', lw = 1)
plt.legend(loc='best')
plt.show()