Source code for PDSim.core.core

from __future__ import division, absolute_import, print_function

import math
from math import pi
from timeit import default_timer
import inspect
import six

##--  Package imports  --
from PDSim.flow import flow,flow_models
from .containers import STATE_VARS_TM, CVArrays, ControlVolumeCollection,TubeCollection
from PDSim.flow.flow import FlowPathCollection
from . import integrators
from PDSim.misc.datatypes import arraym, empty_arraym
import PDSim.core.callbacks
from PDSim.misc.error_bar import error_ascii_bar

##-- Non-package imports  --
import numpy as np

# If scipy is available, use its optimization function, otherwise, 
# use our implementation (for packaging purposes)
try:
    from scipy.integrate import trapz
except ImportError:
    from PDSim.misc.scipylike import trapz
    
import scipy.optimize

import h5py

# An empty class for storage
[docs] class struct(object): pass
[docs] class IntegratorMixin(object): """ This class contains the methods that will be merged with one of the system of ODE integrators and includes the methods that are specific to PDSim """ def __init__(self, sim, x_state): self.sim = sim self.x_state = x_state
[docs] def get_initial_array(self): # Get the beginning of the cycle configured # Put a copy of the values into the matrices xold = self.x_state.copy() self.sim._put_to_matrices(xold, 0) return xold
[docs] def premature_termination(self): # Once every 100 steps check if you are supposed to abort if self.sim._check_cycle_abort(self.Itheta): return 'abort' else: return False
[docs] def pre_step_callback(self): # Call the step callback if provided if self.sim.callbacks.step_callback is not None: self.h = self.sim.callbacks.step_callback(self.t0, self.h, self.Itheta) disable = self.sim.callbacks.step_callback.disable_adaptive # If we don't want to actually do the step, rather just copy the values # (for instance at the merging angle for scroll machines), we set # stepAccepted to True and move on if disable == 'no_integrate': self.stepAccepted = True # Retrieve the array of values based on the values set in step_callback x = self.sim._get_from_matrices(self.Itheta) # Updates the state, calculates the volumes, prepares all the things needed for derivatives # The crank angle must be evaluated after theta_d in order to ensure that the new volumes are used # The newly calculated values are used self.sim.core.properties_and_volumes(self.sim.CVs.exists_CV, self.t0+self.h+1e-10, STATE_VARS_TM, x) self.xold = x.copy() self.xnew = x.copy() self.__store_values() x = self.sim._get_from_matrices(self.Itheta) if disable != False and x.all_finite(): self.xold = self.sim._get_from_matrices(self.Itheta) # The integrator only cares whether disable is true or not, convert to true or false if disable != False: self.disableAdaptive = True else: self.disableAdaptive = False
def __store_values(self): """ Private method that stores the values in the internal data structure """ self.sim.t[self.Itheta] = self.t0 self.sim._put_to_matrices(self.xold, self.Itheta) flows = self.sim.Flows.get_deepcopy() # If the index we want to fill is beyond the length of FlowStorage, we append it, # otherwise, we replace that entry in the list if self.Itheta > len(self.sim.FlowStorage)-1: self.sim.FlowStorage.append(flows) else: self.sim.FlowStorage[self.Itheta] = flows
[docs] def post_deriv_callback(self): self.__store_values()
[docs] def post_step_callback(self): pass
[docs] def derivs(self, t, x): return self.sim.derivs(t, x)
[docs] def post_integration(self): """ Run this at the end """ # Cache the values self.sim.derivs(self.t0, self.xold) self.__store_values() V, dV = self.sim.CVs.volumes(self.t0) Nexist = self.sim.CVs.Nexist if sorted(self.sim.stateVariables) == ['D','T']: self.sim.CVs.updateStates('T',self.xnew[0:Nexist],'D',self.xnew[Nexist:2*Nexist]) elif sorted(self.sim.stateVariables) == ['M','T']: self.sim.CVs.updateStates('T',self.xnew[0:Nexist],'D',self.xnew[Nexist:2*Nexist]/V) else: raise NotImplementedError
[docs] class EulerIntegrator(IntegratorMixin, integrators.AbstractSimpleEulerODEIntegrator): """ Mixin class using the functions defined in IntegratorMixin and the generalized simple ODE """ def __init__(self, sim, x_state): IntegratorMixin.__init__(self, sim, x_state)
[docs] class HeunIntegrator(IntegratorMixin, integrators.AbstractHeunODEIntegrator): """ Mixin class using the functions defined in IntegratorMixin and the generalized Heun ODE integrator """ def __init__(self, sim, x_state): IntegratorMixin.__init__(self, sim, x_state)
[docs] class RK45Integrator(IntegratorMixin, integrators.AbstractRK45ODEIntegrator): """ Mixin class using the functions defined in IntegratorMixin and the generalized RK45 integrator """ def __init__(self, sim, x_state): IntegratorMixin.__init__(self, sim, x_state)
[docs] class PDSimCore(object): """ This is the main driver class for the model This class is not intended to be run on its own. It must be subclassed and extended to provide functions for mass flow, etc. The basic order of steps that should be followed can be summarized as #. Instantiate the subclass of PDSimCore #. Add each of the control volumes #. Add each of the tubes #. Add all the flow models between CV and tubes #. Add valves (if applicable) #. Connect the callbacks for heat transfer, step, etc. #. Run the model """ def __init__(self,stateVariables=None): """ Initialization of the PDSimCore Parameters ---------- stateVariables : mutable object [list or tuple], optional list of keys for the state variables to be used. Current options are 'T','D' or 'T','M'. Default state variables are 'T','M' """ #Initialize the containers to be empty #: The Valves container class self.Valves = [] #: The :class:`ControlVolumeCollection <PDSim.core.containers.ControlVolumeCollection>` instance #: that contains all the control volumes in the machine self.CVs = ControlVolumeCollection() #: The :class:`FlowPathCollection <PDSim.flow.flow.FlowPathCollection>` #: instance self.Flows = FlowPathCollection() #: A :class:`list` that contains copies of the #: :class:`FlowPathCollection <PDSim.flow.flow.FlowPathCollection>` #: at each crank angle self.FlowStorage = [] self.Tubes = TubeCollection() self.Tlumps = np.zeros((1,1)) self.steps = [] self.__hasValves__ = False # A storage of the initial state vector self.xstate_init = None # A storage of the initial valves vector if isinstance(stateVariables,(list,tuple)): self.stateVariables=list(stateVariables) else: self.stateVariables=['T','M'] self._want_abort = False # Build a structure to hold all the callbacks self.callbacks = PDSim.core.callbacks.CallbackContainer() # Build a dummy class to hold information from the solvers class dummy: pass self.solvers = dummy() self.solvers.lump_eb_history = [] self.solvers.hdisc_history = [] self.solvers.initial_states_history = [] self.verbosity = 0 self.summary = dummy() def _check(self): """ Do some checking before we start the run. Here we check: * Inlet state viscosity and conductivity must be greater than zero """ if self.inlet_state.get_visc() < 0: raise ValueError('Your inlet state viscosity is less than zero. Invalid fluid: ' +self.inlet_state.Fluid) if self.inlet_state.get_cond() < 0: raise ValueError('Your inlet state conductivity is less than zero. Invalid fluid: '+self.inlet_state.Fluid) def _get_from_matrices(self,i): """ Get values back from the matrices and reconstruct the state variable list """ if self.__hasLiquid__==True: raise NotImplementedError else: ValList = [] exists_indices = np.array(self.CVs.exists_indices) for s in self.stateVariables: if s=='T': ValList += self.T[exists_indices,i].tolist() elif s=='D': ValList += self.rho[exists_indices,i].tolist() elif s=='M': ValList += self.m[exists_indices,i].tolist() else: raise KeyError if self.__hasValves__: # Also store the valve values ValList += self.xValves[:,i].tolist() return arraym(ValList) def _statevars_to_dict(self,x): d={} for iS,s in enumerate(self.stateVariables): x_=(x[iS*self.CVs.Nexist:self.CVs.Nexist*(iS+1)]) if s=='T': d['T']=x_ elif s=='D': d['D']=x_ elif s=='M': d['M']=x_ return d def _put_to_matrices(self,x,i): """ Take a state variable list and put back in numpy matrices """ exists_indices=self.CVs.exists_indices Nexist = self.CVs.Nexist Ns = len(self.stateVariables) assert(len(x) == len(self.Valves)*2+Ns*Nexist) if self.__hasLiquid__==True: raise NotImplementedError # self.T[:,i]=x[0:self.NCV] # self.m[:,i]=x[self.NCV:2*self.NCV] else: # self.__hasLiquid__==False for iS, s in enumerate(self.stateVariables): if s=='T': self.T[exists_indices, i] = x[iS*self.CVs.Nexist:self.CVs.Nexist*(iS+1)] elif s=='D': self.rho[exists_indices, i] = x[iS*self.CVs.Nexist:self.CVs.Nexist*(iS+1)] elif s=='M': self.m[exists_indices, i] = x[iS*self.CVs.Nexist:self.CVs.Nexist*(iS+1)] # Left over terms are for the valves if self.__hasValves__: self.xValves[0:len(self.Valves)*2, i] = arraym(x[Ns*Nexist:len(x)]) # In the first iteration, self.core has not been filled, so do not # overwrite with the values in self.core.m and self.core.rho if self.core.m[0] > 0.0 : self.m[exists_indices, i] = self.core.m if self.core.rho[0] > 0.0 : self.rho[exists_indices, i] = self.core.rho self.V[exists_indices, i] = self.core.V self.dV[exists_indices, i] = self.core.dV self.p[exists_indices, i] = self.core.p self.h[exists_indices, i] = self.core.h self.Q[exists_indices,i] = self.core.Q def _postprocess_flows(self): """ In this private method, the flows from each of the flow nodes are summed for each step of the revolution, and then averaged flow rates are calculated. """ def sum_flows(key,Flows): """ Sum all the terms for a given flow key. Flows "into" the node are positive, flows out of the node are negative Use the code in the Cython module """ return flow.sumterms_given_CV(key, Flows) def collect_keys(Tubes,Flows): """ Get all the keys for a given collection of flow elements """ keys=[] for Tube in Tubes: if Tube.key1 not in keys: keys.append(Tube.key1) if Tube.key2 not in keys: keys.append(Tube.key2) for Flow in Flows: if Flow.key1 not in keys: keys.append(Flow.key1) if Flow.key2 not in keys: keys.append(Flow.key2) return keys # Get all the nodes that can exist for tubes and CVs keys=collect_keys(self.Tubes,self.Flows) # Get the instantaneous net flow through each node # and the averaged mass flow rate through each node self.FlowsProcessed=struct() self.FlowsProcessed.summed_mdot={} self.FlowsProcessed.summed_mdoth={} self.FlowsProcessed.mean_mdot={} self.FlowsProcessed.integrated_mdoth={} self.FlowsProcessed.integrated_mdot={} self.FlowsProcessed.t=self.t[0:self.Ntheta] for key in keys: # Empty container numpy arrays self.FlowsProcessed.summed_mdot[key]=np.zeros((self.Ntheta,)) self.FlowsProcessed.summed_mdoth[key]=np.zeros((self.Ntheta,)) assert self.Ntheta == len(self.FlowStorage) for i in range(self.Ntheta): mdot,mdoth=sum_flows(key,self.FlowStorage[i]) self.FlowsProcessed.summed_mdot[key][i]=mdot self.FlowsProcessed.summed_mdoth[key][i]=mdoth # All the calculations here should be done in the time domain, # rather than crank angle. So convert angle to time by dividing # by omega, the rotational speed in rad/s. trange = self.t[self.Ntheta-1]-self.t[0] # integrated_mdoth has units of kJ/rev * f [Hz] --> kJ/s or kW self.FlowsProcessed.integrated_mdoth[key]=trapz(self.FlowsProcessed.summed_mdoth[key], self.t[0:self.Ntheta]/self.omega)*self.omega/trange # integrated_mdot has units of kg/rev * f [Hz] --> kg/s self.FlowsProcessed.integrated_mdot[key]=trapz(self.FlowsProcessed.summed_mdot[key], self.t[0:self.Ntheta]/self.omega)*self.omega/trange self.FlowsProcessed.mean_mdot[key]=np.mean(self.FlowsProcessed.integrated_mdot[key]) # Special-case the tubes. Only one of the nodes can have flow. # The other one is invariant because it is quasi-steady. for Tube in self.Tubes: mdot1 = self.FlowsProcessed.mean_mdot[Tube.key1] mdot2 = self.FlowsProcessed.mean_mdot[Tube.key2] mdot_i1 = self.FlowsProcessed.integrated_mdot[Tube.key1] mdot_i2 = self.FlowsProcessed.integrated_mdot[Tube.key2] mdoth_i1 = self.FlowsProcessed.integrated_mdoth[Tube.key1] mdoth_i2 = self.FlowsProcessed.integrated_mdoth[Tube.key2] #Swap the sign so the sum of the mass flow rates is zero self.FlowsProcessed.mean_mdot[Tube.key1] -= mdot2 self.FlowsProcessed.mean_mdot[Tube.key2] -= mdot1 self.FlowsProcessed.integrated_mdot[Tube.key1] -= mdot_i2 self.FlowsProcessed.integrated_mdot[Tube.key2] -= mdot_i1 self.FlowsProcessed.integrated_mdoth[Tube.key1] -= mdoth_i2 self.FlowsProcessed.integrated_mdoth[Tube.key2] -= mdoth_i1 #For each tube, update the flow going through it #Tube.mdot is always a positive value Tube.mdot = max(abs(mdot1), abs(mdot2)) self.mdot = self.FlowsProcessed.mean_mdot[self.key_inlet] self.FlowsProcessed.collected_data = [] for i, Flow in enumerate(self.Flows): mdot = np.array([Flows[i].mdot for Flows in self.FlowStorage]) edot = np.array([Flows[i].edot for Flows in self.FlowStorage]) data = dict(key1 = Flow.key1, key2 = Flow.key2, fcn = Flow.MdotFcn_str, mdot = mdot, edot = edot, mdot_average = np.trapezoid(mdot, self.t[0:self.Ntheta])/(self.t[self.Ntheta-1]-self.t[0]), Edot_average = np.trapezoid(edot, self.t[0:self.Ntheta])/(self.t[self.Ntheta-1]-self.t[0]) ) self.FlowsProcessed.collected_data.append(data) def _postprocess_HT(self): """ Postprocess the heat transfer terms Here we calculate the mean heat transfer rate over the course of the cycle """ self.HTProcessed=struct() r = list(range(self.Ntheta)) #Remove all the NAN placeholders and replace them with zero values self.Q[np.isnan(self.Q)] = 0.0 #Sum at each step of the revolution self.HTProcessed.summed_Q = np.sum(self.Q, axis = 0) #kW #Get the mean heat transfer rate self.HTProcessed.mean_Q = trapz(self.HTProcessed.summed_Q[r], self.t[r])/(self.t[self.Ntheta-1]-self.t[0])
[docs] def guess_outlet_temp(self, inlet_state, p_outlet, eta_a=0.7): """ Function to guess outlet temperature Using a guess value for the adiabatic efficiency, calculate the guessed outlet temperature. In compressor mode, the adiabatic efficiency is defined by .. math:: \\eta_a = \\frac{h_{2s}-h_1}{h_2-h_1} and in expander mode it is defined by .. math:: \\eta_a = \\frac{h_2-h_1}{h_{2s}-h_1} This function can also be overloaded by the subclass in order to implement a different guess method """ try: # For an adiabatic process in which the ratio of heat capacities # gamma is approximately constant, you can calculate the # relation between T and p along the isentropic path from: # p1^(1-gamma)*T1^gamma = p2^(1-gamma)*T2^gamma h1 = inlet_state.h s1 = inlet_state.s out_state = inlet_state.copy() # See: https://en.wikipedia.org/wiki/Table_of_thermodynamic_equations#Thermodynamic_processes gamma = inlet_state.cp/inlet_state.cv T2s = (inlet_state.p**(1-gamma)*inlet_state.T**gamma/p_outlet**(1-gamma))**(1/gamma) # Solver to correct for non-constant gamma to get isentropic enthalpy h2s, # should be a small correction def objective_h2s(T): out_state.update(dict(T=T, P=p_outlet)) return out_state.s - s1 T2s = scipy.optimize.newton(objective_h2s, T2s) h2s = out_state.h if p_outlet > inlet_state.p: # Compressor Mode h2 = h1 + (h2s-h1)/eta_a else: # Expander Mode h2 = h1 + (h2s-h1)*eta_a # Iterate with Newton's method to get temperature corresponding # to specified h2 def objective_h2(T): out_state.update(dict(T=T, P=p_outlet)) return out_state.h - h2 return scipy.optimize.newton(objective_h2, T2s) except BaseException as be: h1 = inlet_state.h out_state = inlet_state.copy() out_state.update(dict(S = inlet_state.s, P = p_outlet)) h2s = out_state.h if p_outlet > inlet_state.p: # Compressor Mode h2 = h1 + (h2s-h1)/eta_a else: # Expander Mode h2 = h1 + (h2s-h1)*eta_a out_state.update(dict(H = h2, P = p_outlet)) return out_state.T
[docs] def reset_initial_state(self): """ Reset the initial state of the core class, typically after doing a preconditioning run """ for k,CV in zip(self.CVs.keys,self.CVs.CVs): if k in self.exists_CV_init: CV.exists = True else: CV.exists = False #Update the existence of each of the CV self.update_existence() #Only the State variables, not the valves self.x_state = self.xstate_init #Make a copy x = self.xstate_init.copy() #Add the values from the valves if self.__hasValves__: x.extend(empty_arraym(2*len(self.Valves))) self._put_to_matrices(x, 0) #Reset the temporary variables self.xstate_init = None self.exists_CV_init = None
[docs] def update_existence(self): """ Update existence flags for Tubes and control volumes This function is required to be called when the existence of any control volume or tube changes in order to ensure that internal flags are set properly """ # Update the existence flags in all the control volumes self.CVs.rebuild_exists() # Update the array of enthalpies in the tubes self.Tubes.update_existence(self.CVs.Nexist) # Update the existence of each of the flows self.Flows.update_existence(self) # Update the sizes of the internal arrays in self.core self.core.update_size(self.CVs.Nexist)
[docs] def add_flow(self,FlowPath): """ Add a flow path to the model Parameters ---------- FlowPath : :class:`FlowPath <PDSim.flow.flow.FlowPath>` instance An initialized flow path """ #Add FlowPath instance to the list of flow paths self.Flows.append(FlowPath)
[docs] def add_CV(self,CV): """ Add a control volume to the model Parameters ---------- CV : :class:`ControlVolume <PDSim.core.containers.ControlVolume>` instance An initialized control volume """ if CV.key in self.CVs.keys: raise KeyError('Sorry but the key for your Control Volume ['+CV.key+'] is already in use') #Add the CV to the collection self.CVs.add(CV) self.CVs.rebuild_exists()
[docs] def add_tube(self,Tube): """ Add a tube to the model. Parameters ---------- Tube : :class:`Tube <PDSim.core.containers.Tube>` instance An initialized tube. """ #Add it to the list self.Tubes.append(Tube) self.Tubes.update()
[docs] def add_valve(self,Valve): """ Add a valve to the model. Parameters ---------- Valve : :class:`ValveModel <PDSim.flow.flow_models.ValveModel>` instance An initialized valve. """ #Add it to the list self.Valves.append(Valve) self.__hasValves__=True
[docs] def pre_run(self, N = 40000): """ This function gets called before the run begins. It builds large matrices to store values, and does other initialization. """ # Build the full numpy arrays for temperature, volume, etc. self.t=np.zeros((N,)) self.T=np.zeros((self.CVs.N,N)) self.T.fill(np.nan) self.p=self.T.copy() self.h = self.T.copy() self.m = self.T.copy() self.V = self.T.copy() self.dV = self.T.copy() self.rho = self.T.copy() self.Q = self.T.copy() self.xValves = np.zeros((2*len(self.Valves),N)) # Initialize the core class that contains the arrays and the derivs self.core = CVArrays(0) # Update the existence of all the control volumes self.update_existence() # Set a flag about liquid flooding self.__hasLiquid__ = False
[docs] def pre_cycle(self, x0 = None): """ This runs before the cycle is run but after pre_run has been called Parameters ---------- x0 : :class:`arraym <PDSim.misc.datatypes.arraym>` instance """ self.t.fill(np.nan) self.T.fill(np.nan) self.p.fill(np.nan) self.m.fill(np.nan) self.V.fill(np.nan) self.dV.fill(np.nan) self.rho.fill(np.nan) self.Q.fill(np.nan) self.FlowStorage=[] #Get the volumes at theta=0 #Note: needs to occur in this function because V needed to calculate mass a few lines below VdV=[CV.V_dV(0.0,**CV.V_dV_kwargs) for CV in self.CVs.exists_CV] V,dV = zip(*VdV) self.t[0]=0 # If x0 is provided, use its values to initialize the chamber states if x0 is None: # self.CVs.exists_indices is a list of indices of the CV with the same order of entries # as the entries in self.CVs.T self.T[self.CVs.exists_indices, 0] = self.CVs.T self.p[self.CVs.exists_indices, 0] = self.CVs.p self.rho[self.CVs.exists_indices, 0] = self.CVs.rho self.m[self.CVs.exists_indices, 0] = self.CVs.rho*arraym(V) else: #x0 is provided, but need to pad it out to include valve values x0_ = x0.copy() # If x0 is longer than the product of the number of state variables # and CV in existence, the valve data is already included and must not be # added to the array of independent variables if self.__hasValves__ and len(x0) == self.CVs.Nexist*len(self.stateVariables): #Load up the rest of the array with zeros since the valves start closed and at rest x0_.extend(empty_arraym(len(self.Valves)*2)) self._put_to_matrices(x0_, 0) # Assume all the valves to be fully closed and stationary at the beginning of cycle self.xValves[:,0]=0 self.Tubes_hdict={} for Tube in self.Tubes: self.Tubes_hdict[Tube.key1]=Tube.State1.get_h() self.Tubes_hdict[Tube.key2]=Tube.State2.get_h()
[docs] def calc_boundary_work(self): """ This method calculates the boundary work rate using a trapezoidal integration of .. math:: \\dot W_{pv} = -\\int p\\frac{dV}{d\\theta}\\frac{\\omega}{2\\pi} d\\theta for all the control volumes and sets the parameter ``self.Wdot_pv`` with the result. The units of the boundary work are kW. """ def Wdot_one_CV(CVindex): """ calculate the p-v work for one CV """ x0_raw = self.t[0:self.Ntheta] y0_raw = self.p[CVindex, 0:self.Ntheta]*self.dV[CVindex, 0:self.Ntheta] # Convert into chunks that are delimited by nan, if any isnotnan_indices = np.flatnonzero(~np.isnan(y0_raw)) breaks = np.flatnonzero(np.diff(isnotnan_indices) > 1) if len(breaks) != 0: chunks = np.split(isnotnan_indices, np.array(breaks)+1) else: chunks = [isnotnan_indices] return -sum([trapz(y0_raw[ii], x0_raw[ii]) for ii in chunks])*self.omega/(2*pi) self.Wdot_pv = 0.0 for CVindex in range(self.p.shape[0]): self.Wdot_pv+=Wdot_one_CV(CVindex)
[docs] def post_cycle(self): """ This stuff all happens at the end of the cycle. It is a private method not meant to be called externally The following things are done: #. The boundary work is calculated #. The flows are post-processed #. The heat transfer is post-processed #. The mass flow rate is calculated #. The volumetric efficiency is calculated #. The adiabatic efficiency is calculated #. The isentropic power is calculated #. The power input is calculated """ self.calc_boundary_work() self._postprocess_flows() self._postprocess_HT() # Calculate the lumped mass energy balance if self.callbacks.lumps_energy_balance_callback is not None: self.lumps_resid = self.callbacks.lumps_energy_balance_callback() # Convert to an arraym if needed if not isinstance(self.lumps_resid, arraym): self.lumps_resid = arraym(self.lumps_resid) else: raise ValueError('lumps_energy_balance_callback cannot be None') if not hasattr(self,'Qamb'): self.Qamb = 0 # The total mass flow rate self.mdot = self.FlowsProcessed.mean_mdot[self.key_inlet] for key, State in six.iteritems(self.Tubes.Nodes): if key == self.key_inlet: inletState = State if key == self.key_outlet: outletState = State try: Vdisp = self.Vdisp except: Vdisp = self.Vdisp() self.eta_v = self.mdot / (self.omega/(2*pi)*Vdisp*inletState.rho) h1 = inletState.h h2 = outletState.h s1 = inletState.s # Can't use intermediate temperature because the state might be two-phase # for some conditions and you are better off just calculating the enthalpy # directly temp = outletState.copy() def objective(T): temp.update(dict(P=outletState.p, T=T)) return temp.s - s1 scipy.optimize.newton(objective, outletState.T) h2s = temp.h if outletState.p > inletState.p: # Compressor Mode self.eta_a = (h2s-h1)/(h2-h1) self.Wdot_i = self.mdot*(h2s-h1) else: # Expander Mode self.eta_a = (h1-h2)/(h1-h2s) self.Wdot_i = self.mdot*(h1-h2s) # self.Qamb is positive if heat is being added to the lumped mass self.Wdot = self.mdot*(h2-h1)-self.Qamb
def _check_cycle_abort(self, index, I = 100): """ This function will check whether an abort has been requested every ``I`` steps of the solver throughout the rotation Meant for calling by cycle_RK45, cycle_SimpleEuler, cycle_Heun, etc. Primarily this is useful for use with the GUI, where the GUI can pass an abort command to the model Parameters ---------- index : int The index of the step I : int, optional Check abort at this interval """ # % is the symbol for modulus in python if index % I == 0 and self.Abort(): self._want_abort = True return True
[docs] def check_abort(self): """ A callback for use with the graphical user interface to force solver to quit It will check the Scroll.pipe_abort pipe for a ``True`` value, and if it finds one, it will set the Scroll._want_abort value to ``True`` which will be read by the main execution thread Once ``self._want_abort`` is ``True``, it will stay latched ``True`` until the run is terminated """ # If you received an abort request, set a flag in the simulation if self.pipe_abort.poll() and self.pipe_abort.recv(): print('received an abort request') self._want_abort = True # If the run has timed out, quit if default_timer() - self.start_time > self.timeout: print('run timed out') self._want_abort = True return self._want_abort
[docs] def precond_solve(self,**kwargs): """ This function is deprecated and will be removed in a future version """ import warnings msg = 'precond_solve is deprecated and will be removed in a future version. Please use solve instead' warnings.warn(msg, DeprecationWarning) self.solve(**kwargs)
[docs] def connect_callbacks(self, step_callback=None, heat_transfer_callback=None, lumps_energy_balance_callback=None, endcycle_callback=None ): """ Connect up the callbacks for the simulation The callbacks must either be unbound methods or methods of a class derived from PDSimCore No keyword arguments are supported to be passed to the callbacks. The callback is probably a bound method of a PDSimCore instance, in which case you have access to all the data in the class anyway Parameters ---------- step_callback : function, or :class:`StepCallback <PDSim.core.callbacks.StepCallback>` subclass If a function is provided, it must have the call signature:: disable_adaptive,h = step_callback(double t, double h, int i) where ``h`` is the step size that the adaptive solver wants to use, ``t`` is the current value of the independent variable, and ``i`` is the index in the container variables. The return value ``disableAdaptive`` is a boolean value that describes whether the adaptive method should be turned off for this step ( ``False`` : use the adaptive method), and ``h`` is the step size you want to use. If you don't want to disable the adaptive method and use the given step size, just:: return False,h in your code. heat_transfer_callback : function, or :class:`HeatTransferCallback <PDSim.core.callbacks.HeatTransferCallback>` subclass If a function is provided, the heat_transfer_callback function must have the call signature:: Q = heat_transfer_callback(double t) It should return an :class:`arraym <PDSim.misc.datatypes.arraym>` instance with the same length as the number of CV in existence. The entry in the :class:`arraym <PDSim.misc.datatypes.arraym>` is positive if the heat transfer is TO the fluid in the CV in order to maintain the sign convention that energy (or mass) input is positive. lumps_energy_balance_callback : function, or :class:`LumpsEnergyBalanceCallback <PDSim.core.callbacks.LumpsEnergyBalanceCallback>` subclass If a function is provided, the lumps_energy_balance_callback function must have the call signature:: r = lumps_energy_balance_callback() It should return an :class:`arraym <PDSim.misc.datatypes.arraym>` instance with the same length as the number of lumps. The entry in ``r`` is the value of the energy balance. It will be driven to zero by the solver """ if step_callback is None: #No callback is provided, don't do anything pass elif isinstance(step_callback, PDSim.core.callbacks.StepCallback): #If the cythonized step callback is provided, hold onto it self.callbacks.step_callback = step_callback #Otherwise, wrap the desired callback if it has the right signature else: #Check the functional call callargs = inspect.getcallargs(step_callback, 0.0, 1e-10, 0) # Either a non-bound method is provided, or bound method is provided, in which case you get self,t,h,i as the values # t is a subclass of float, h is a subclass of float, is a subclass of int, and self is subclass of PDSimCore if not all([isinstance(arg,(float,int,PDSimCore)) for arg in callargs.values()]): sig_ok = False else: if len(callargs) in [3,4]: sig_ok = True else: sig_ok = False if step_callback is not None and sig_ok: self.callbacks.step_callback = PDSim.core.callbacks.WrappedStepCallback(self, step_callback) else: raise ValueError("step_callback is not possible to be wrapped - neither a subclass of StepCallback nor acceptable function signature") if heat_transfer_callback is None: #No callback is provided, don't do anything pass elif isinstance(heat_transfer_callback, PDSim.core.callbacks.HeatTransferCallback): #If the cythonized heat transfer callback is provided, hold a pointer to it self.callbacks.heat_transfer_callback = heat_transfer_callback else: callargs = inspect.getcallargs(heat_transfer_callback, 0.0) # Either a non-bound method is provided, or bound method is provided, in which case you get self,t as the values # t is a subclass of float, and self is subclass of PDSimCore if not all([isinstance(arg,(float,int,PDSimCore)) for arg in callargs.values()]): sig_ok = False else: if len(callargs) in [1,2]: sig_ok = True else: sig_ok = False #Otherwise, wrap the desired callback if it has the right signature if heat_transfer_callback is not None and sig_ok: self.callbacks.heat_transfer_callback = PDSim.core.callbacks.WrappedHeatTransferCallback(self, heat_transfer_callback) else: raise ValueError("heat_transfer_callback is not possible to be wrapped - neither a subclass of HeatTransferCallback nor an acceptable function") if lumps_energy_balance_callback is None: #No callback is provided, don't do anything pass elif isinstance(lumps_energy_balance_callback, PDSim.core.callbacks.LumpsEnergyBalanceCallback): #If the cythonized lump energy balance callback is provided, hold onto it self.callbacks.lumps_energy_balance_callback = lumps_energy_balance_callback #Otherwise, wrap the desired callback if it has the right signature else: callargs = inspect.getcallargs(lumps_energy_balance_callback) # Either a non-bound method is provided, or bound method is provided, in which case you get self,t as the values # t is a subclass of float, and self is subclass of PDSimCore sig_ok = len(callargs) == 0 or (len(callargs) == 1 and isinstance(list(callargs.values())[0],PDSimCore)) if lumps_energy_balance_callback is not None and sig_ok: #Do functional introspection here where the ``True`` is self.callbacks.lumps_energy_balance_callback = PDSim.core.callbacks.WrappedLumpsEnergyBalanceCallback(self, lumps_energy_balance_callback) else: raise ValueError("lump_energy_balance_callback is not possible to be wrapped - neither a subclass of LumpsEnergyBalanceCallback nor an acceptable function") self.callbacks.endcycle_callback = endcycle_callback
[docs] def one_cycle(self, X, cycle_integrator = 'RK45', cycle_integrator_options = None): """ Only run one cycle Parameters ---------- cycle_integrator : str One of 'RK45','Euler','Heun' cycle_integrator_options : dict options to be passed to the solver function (RK45, Euler, etc.) """ # Make cycle_integrator_options an empty dictionary if not provided if cycle_integrator_options is None: cycle_integrator_options = {} tmin = 0.0 tmax = 2*math.pi else: tmin = cycle_integrator_options['tmin'] tmax = cycle_integrator_options['tmax'] X = arraym(X) # (1). First, run all the tubes for tube in self.Tubes: tube.TubeFcn(tube) # Call update_existence to save the enthalpies for the tubes self.update_existence() try: t1 = default_timer() # Run the pre-cycle code self.pre_cycle() if cycle_integrator == 'Euler': # Default to 7000 steps if not provided N = getattr(self,'EulerN', 7000) integrator = EulerIntegrator(self, X) aborted = integrator.do_integration(N, tmin, tmax) elif cycle_integrator == 'Heun': # Default to 7000 steps if not provided N = getattr(self,'HeunN', 7000) integrator = HeunIntegrator(self, X) aborted = integrator.do_integration(N, tmin, tmax) elif cycle_integrator == 'RK45': # Default to tolerance of 1e-8 if not provided eps_allowed = getattr(self,'RK45_eps', 1e-8) integrator = RK45Integrator(self, X) aborted = integrator.do_integration(tmin, tmax, eps_allowed=eps_allowed) else: raise AttributeError('solver_method should be one of RK45, Euler, or Heun') if aborted == False: integrator.post_integration() self.Itheta = integrator.Itheta self.Ntheta = self.Itheta + 1 # Make sure we got the right number of things assert self.Ntheta == len(self.FlowStorage) self.post_cycle() except ValueError: # debug_plots(self) raise if aborted is None: aborted = False # Quit if you have aborted in one of the cycle solvers if aborted == 'abort': return None t2 = default_timer() print('Elapsed time for cycle is {0:g} s'.format(t2-t1)) mdot_out = self.FlowsProcessed.mean_mdot[self.key_outlet] mdot_in = self.FlowsProcessed.mean_mdot[self.key_inlet] if hasattr(self, 'additional_inlet_keys'): for key in self.additional_inlet_keys: mdot_in += self.FlowsProcessed.mean_mdot[key] if hasattr(self, 'additional_outlet_keys'): for key in self.additional_outlet_keys: mdot_out += self.FlowsProcessed.mean_mdot[key] # We need to find the key at the inlet to the outlet tube. Tube = self.Tubes[self.key_outlet] if Tube.key1 == self.key_outlet: key_outtube_inlet = Tube.key2 elif Tube.key2 == self.key_outlet: key_outtube_inlet = Tube.key1 # This is the so-called hd' state at the outlet of the pump set self.h_outlet_pump_set = (self.FlowsProcessed.integrated_mdoth[key_outtube_inlet] /self.FlowsProcessed.integrated_mdot[key_outtube_inlet]) # It should be equal to the enthalpy of the fluid at the inlet # to the outlet tube at the current Td value h_outlet_Tube = self.Tubes.Nodes[key_outtube_inlet].h # Residual is the difference of these two terms # We put it in kW by multiplying by flow rate self.resid_Td = 0.1*(h_outlet_Tube - self.h_outlet_pump_set)
[docs] def OBJECTIVE_CYCLE(self, Td_Tlumps0, X, epsilon_cycle = 0.003, epsilon_energy_balance = 0.003, cycle_integrator = 'RK45', OneCycle = False, cycle_integrator_options = None, plot_every_cycle = False): """ The Objective function for the energy balance solver Parameters ---------- Td_Tlumps0 : list Discharge temperature and lump temperatures X : :class:`arraym <PDSim.misc.datatypes.arraym>` instance Contains the state variables for all the control volumes in existence, as well as any other integration variables epsilon : float Convergence criterion applied to all of the solvers (DEPRECATED!) epsilon_cycle : float Cycle-cycle convergence criterion epsilon_energy_balance : float Energy balance convergence criterion cycle_integrator : string, one of 'RK45','Euler','Heun' Which solver is to be used to integrate the steps OneCycle : bool If ``True``, stop after one cycle plot_every_cycle : bool If ``True``, make the debug plots at every cycle cycle_integrator_options : dict Options to be passed to cycle integrator """ # Consume the first element as the discharge temp self.Td = float(Td_Tlumps0.pop(0)) # The rest are the lumps in order self.Tlumps = Td_Tlumps0 # The first time this function is run, save the state variables if self.xstate_init is None: self.xstate_init = X self.exists_CV_init = self.CVs.exists_keys i = 0 while True: # Actually run the cycle, runs post_cycle at the end, # sets the parameter lumps_resid in this class # Also sets resid_Td self.one_cycle(X, cycle_integrator = cycle_integrator, cycle_integrator_options = cycle_integrator_options) if self.Abort(): return errors, X = self.callbacks.endcycle_callback() error_metric = np.sqrt(np.sum(np.power(errors, 2))) ### ----------------------------------- ### The lump temperatures ### ----------------------------------- self.solvers.lump_eb_history.append([self.Tlumps, self.lumps_resid]) if len(self.Tlumps) > 1: print("Running multi-lump analysis") if self.OEB_solver == 'MDNR': # Use Multi Dim. Newton Raphson step for multi-lump temperatures w = 1.0 dx = 0.5 x = np.array(self.Tlumps,dtype=np.float) J = np.zeros((len(x),len(x))) error = 999 # If a float is passed in for dx, convert to a numpy-like list the same shape # as x if isinstance(dx,int) or isinstance(dx,float): dx=dx*np.ones_like(x) r0 = np.array(self.lumps_resid)*1000 # Build the Jacobian matrix by columns for jj in range(len(self.Tlumps)): delta = np.zeros_like(x) delta[jj] = dx[jj] self.Tlumps = self.Tlumps + delta ri = self.callbacks.lumps_energy_balance_callback() #print('ri:',ri) ri = np.array(ri) J[:,jj] = (ri-r0)/delta[jj] v = np.linalg.solve(J,-r0) # Calculate new Tlumps Tnew = x + w*v self.Tlumps = Tnew elif self.OEB_solver == 'Broyden': # Use Broyden Method raise('Broyden not implemented yet') else: # Use Relaxed Secant Method for single lump temperature print("Running single-lump analysis") if len(self.solvers.lump_eb_history) == 1: T, EB = self.solvers.lump_eb_history[-1] # T and EB are one-element lists, get floats _T, _EB = T[0], EB[0] # Use the thermal mass to make the step # Here is the logic: # Instantaneous energy balance given by # dU/dt = m*c*(dT/dt) = sum(Qdot) # and if dt = one cycle period (seconds/rev) Deltat = 2*pi/omega # DELTAT = sum(Qdot)*Deltat/(m*c) thermal_capacitance = 0.49*0.001 # [kJ/K] Deltat = (2*np.pi)/self.omega # [s] # Update the lump temperatures Tnew = np.array([_T + _EB*Deltat/thermal_capacitance]) else: # Get the values from the history Tn1, EBn1 = self.solvers.lump_eb_history[-1] Tn2, EBn2 = self.solvers.lump_eb_history[-2] # Convert to numpy arrays Tn1, EBn1, Tn2, EBn2 = [np.array(l) for l in [Tn1, EBn1, Tn2, EBn2]] # Use the relaxed secant method to find the solution Tnew = Tn1 - 0.7*EBn1*(Tn1-Tn2)/(EBn1-EBn2) # Update the lump temperatures self.Tlumps = Tnew.tolist() ### ----------------------------------- ### The discharge enthalpy ### ----------------------------------- # The outlet tube outlet_tube = self.Tubes[self.key_outlet] # Get the keys for the elements of the outlet tube if outlet_tube.key1 == self.key_outlet: key_outtube_inlet = outlet_tube.key2 key_outtube_outlet = outlet_tube.key1 elif outlet_tube.key2 == self.key_outlet: key_outtube_inlet = outlet_tube.key1 if error_metric < 0.1*epsilon_cycle and np.max(np.abs(self.lumps_resid)) < epsilon_energy_balance: # Each time that we get here and we are significantly below the threshold, store the values # Get the current value for the outlet enthalpy of the machine h_outlet = self.Tubes[self.key_outlet].State2.get_h() # Store the values in the list of values self.solvers.hdisc_history.append([h_outlet,self.resid_Td]) if len(self.solvers.hdisc_history) == 1: # The first time we get here, perturb the discharge enthalpy h_target = h_outlet + 5 else: # Get the values from the history hdn1, EBn1 = self.solvers.hdisc_history[-1] hdn2, EBn2 = self.solvers.hdisc_history[-2] # Use the relaxed secant method to find the solution hdnew = hdn1 - 0.75*EBn1*(hdn1-hdn2)/(EBn1-EBn2) # Reset the outlet enthalpy of the outlet tube based on our new # value for it h_target = hdnew if hasattr(self.Tubes.Nodes[self.key_outlet],'pAS') and len(self.Tubes.Nodes[self.key_outlet].pAS.fluid_names()) == 1: self.Tubes.Nodes[self.key_outlet].update_ph(self.Tubes.Nodes[self.key_outlet].p, h_target) else: # Iterate to solve the H, P flash calculation def objective(T): self.Tubes.Nodes[self.key_outlet].update(dict(T=T, P=self.Tubes.Nodes[self.key_outlet].p)) residual = self.Tubes.Nodes[self.key_outlet].h - h_target print(T, residual) return residual scipy.optimize.newton(objective, self.Tubes.Nodes[self.key_outlet].T) print(self.solvers.hdisc_history) print('New outlet T:', self.Tubes.Nodes[self.key_outlet].T, 'K') # Store a copy of the initial temperatures of the chambers self.solvers.initial_states_history.append(self.T[:,0].copy()) if OneCycle: print('Quitting due to OneCycle being set to True') return if plot_every_cycle: from PDSim.plot.plots import debug_plots debug_plots(self) if self.Abort(): print('Quitting because Abort flag hit') return # Reset the flag for the fixed side of the outlet tube #outlet_tube.fixed = old_fixed mdot_out = abs(self.FlowsProcessed.mean_mdot[self.key_outlet]) mdot_in = abs(self.FlowsProcessed.mean_mdot[self.key_inlet]) if hasattr(self, 'additional_inlet_keys'): for key in self.additional_inlet_keys: print('Additional inlet flow:', key, self.FlowsProcessed.mean_mdot[key]*1000, 'g/s') mdot_in += self.FlowsProcessed.mean_mdot[key] if hasattr(self, 'additional_outlet_keys'): for key in self.additional_outlet_keys: print('Additional outlet flow:', key, self.FlowsProcessed.mean_mdot[key]*1000, 'g/s') mdot_out += self.FlowsProcessed.mean_mdot[key] mdot_error = (mdot_out/mdot_in-1)*100 print('===========') print('|| # {i:03d} ||'.format(i=i)) print('===========') print(error_ascii_bar(abs(self.lumps_resid[0]), epsilon_energy_balance), 'energy balance kW ', self.lumps_resid, ' Tlumps: ',self.Tlumps,'K') print(error_ascii_bar(abs(self.resid_Td), epsilon_energy_balance), 'discharge state', self.resid_Td, 'h_pump_set: ', self.h_outlet_pump_set,'kJ/kg', self.Tubes.Nodes[key_outtube_inlet].h, 'kJ/kg') print(error_ascii_bar(error_metric, epsilon_cycle), 'cycle-cycle ', error_metric) print(error_ascii_bar(abs(mdot_error), 1), 'mdot [%]', mdot_error, '|| in:', mdot_in*1000, 'g/s || out:', mdot_out*1000, 'g/s ') # Check all the stopping conditions within_tolerance = [ np.max(np.abs(self.lumps_resid)) < epsilon_energy_balance, abs(self.resid_Td) < epsilon_energy_balance, np.sqrt(np.sum(np.power(errors, 2))) < epsilon_cycle ] # Stop if all conditions are met if all(within_tolerance): break i += 1 # If the abort function returns true, quit this loop if self.Abort(): print('Quitting OBJECTIVE_CYCLE loop in core.solve') return None # Stop else: if len(self.solvers.hdisc_history) == 0: # Store the values in the list of values self.solvers.hdisc_history.append([self.Tubes[self.key_outlet].State2.get_h(),self.resid_Td])
[docs] def solve(self, key_inlet = None, key_outlet = None, solver_method = 'Euler', OneCycle = False, Abort = None, pipe_abort = None, UseNR = False, alpha = 0.5, plot_every_cycle = False, x0 = None, reset_initial_state = False, timeout = 3600, eps_cycle = 0.001, eps_energy_balance = 0.01, cycle_integrator_options = None, max_number_of_steps = 40000, **kwargs): """ This is the driving function for the PDSim model. It can be extended through the use of the callback functions It is highly recommended to call this function using keyword arguments like:: solve(key_inlet = 'inlet.1', key_outlet = 'outlet.1', ....) Parameters ---------- key_inlet : str The key for the flow node that represents the upstream quasi-steady point key_outlet : str The key for the flow node that represents the upstream quasi-steady point solver_method : str OneCycle : bool If ``True``, stop after just one rotation. Useful primarily for debugging purposes Abort : function A function that may be called to determine whether to stop running. If calling Abort() returns ``True``, stop running pipe_abort : UseNR : bool If ``True``, use a multi-dimensional solver to determine the initial state of the state variables for each control volume alpha : float Use a range of ``(1-alpha)*dx, (1+alpha)*dx`` for line search if needed plot_every_cycle : bool If ``True``, make the plots after every cycle (primarily for debug purposes) x0 : arraym The starting values for the solver that modifies the discharge temperature and lump temperatures reset_initial_state : bool If ``True``, use the stored initial state from the previous call to ``solve`` as the starting value for the thermodynamic values for the control volumes timeout : float Number of seconds before the run times out eps_cycle : float Cycle-cycle convergence criterion eps_energy_balance : float Energy balance convergence criterion cycle_integrator_options : dict A dictionary of options to be passed to the cycle integrator max_number_of_steps : int Maximum number of steps allowed per rotation Notes ----- The callbacks ``step_callback`` and ``endcycle_callback`` and ``heat_transfer_callback`` and ``lump_energy_balance_callback`` and ``valves_callback`` should now be passed to the connect_callbacks() function before running precond_solve() or solve() """ if any(cb in kwargs for cb in ['step_callback','endcycle_callback','heat_transfer_callback','lump_energy_balance_callback','valves_callback']): raise NotImplementedError('callback functions are no longer passed to solve() function, rather they are passed to connect_callbacks() function prior to calling solve()') # Save copies of the inlet and outlet states at the root of the HDF5 file # for ease of retrieval self.inlet_state = self.Tubes.Nodes[key_inlet] self.outlet_state = self.Tubes.Nodes[key_outlet] # Carry out some pre-run checks self._check() self.start_time = default_timer() self.timeout = timeout #Connect functions that have been serialized by saving the function name as a string self.connect_flow_functions() #Both inlet and outlet keys must be connected to invariant nodes - # that is they must be part of the tubes which are all quasi-steady if not key_inlet == None and not key_inlet in self.Tubes.Nodes: raise KeyError('key_inlet must be a Tube node') if not key_outlet == None and not key_outlet in self.Tubes.Nodes: raise KeyError('key_outlet must be a Tube node') self.key_inlet = key_inlet self.key_outlet = key_outlet t1=default_timer() # Set up a pipe for accepting a value of True which will abort the run # Used from the GUI to kill process from the top-level thread self.pipe_abort = pipe_abort if len(self.CVs) < 1: raise ValueError('At least one control volume must be added using the add_CV function') if len(self.Flows) <= 1: raise ValueError('At least two flows must be added using the add_flow function') # If a function called pre_solve is provided, call it with no input arguments if hasattr(self,'pre_solve'): self.pre_solve() # This runs before the model starts at all self.pre_run(N = max_number_of_steps) # Check which method is used to do aborting if Abort is None and pipe_abort is not None: # Use the pipe_abort pipe to look at the abort pipe to see whether # to quit self.Abort = self.check_abort elif Abort is None and pipe_abort is None: #Disable the ability to abort, always don't abort self.Abort = lambda : False elif Abort is not None and pipe_abort is None: self.Abort = Abort else: raise ValueError('Only one of Abort and pipe_abort may be provided') # If you want to reset the initial state, use the values that were # cached in the xstate_init array if reset_initial_state is not None and reset_initial_state: self.reset_initial_state() self.pre_cycle(self.xstate_init) else: # (2) Run a cycle with the given values for the temperatures self.pre_cycle() self.x_state = self._get_from_matrices(0).copy() if x0 is None: x0 = [self.Tubes.Nodes[key_outlet].T, self.Tubes.Nodes[key_outlet].T] # Actually run the solver self.OBJECTIVE_CYCLE(x0, self.x_state, cycle_integrator = solver_method, OneCycle = OneCycle, epsilon_energy_balance = eps_energy_balance, epsilon_cycle = eps_cycle, cycle_integrator_options = cycle_integrator_options, plot_every_cycle = plot_every_cycle ) if not self.Abort() and not OneCycle: self.post_solve() if hasattr(self,'resid_Td'): del self.resid_Td # Save the elapsed time for simulation self.elapsed_time = default_timer() - t1
[docs] def get_prune_keys(self): """ Remove some elements when the simulation finishes that are not very useful and/or are very large when stored to file Returns ------- prune_key_list: list A list of HDF5 keys that are to be removed from the HDF5 file. """ return ['/CVs/CVs', '/CVs/Nodes', '/CVs/T', '/CVs/cp', '/CVs/cv', '/CVs/dpdT', '/CVs/exists_CV', '/CVs/exists_indices', '/CVs/exists_keys', '/CVs/h', '/CVs/p', '/CVs/rho', '/callbacks', '/core', '/steps', '/theta', '/Flows' ]
[docs] def attach_HDF5_annotations(self, fName): """ In this function, annotations can be attached to each HDF5 field Parameters ---------- fName : str The file name for the HDF5 file that is to be used """ attrs_dict = { '/t':'The array of the independent variable in the solution, either time or crank angle [rad or s]', '/m':'The NCV x Nt matrix with the mass in each control volume [kg]', '/T':'The NCV x Nt matrix with the temperature in each control volume [K]', '/V':'The NCV x Nt matrix with the volume in each control volume [m^3]', '/dV':'The NCV x Nt matrix with the derivative of volume w.r.t. crank angle in each control volume [m^3/radian]', '/h':'The NCV x Nt matrix with the enthalpy in each control volume [kJ/kg]', '/p':'The NCV x Nt matrix with the pressure in each control volume [kPa]', '/rho':'The NCV x Nt matrix with the density in each control volume [kg/m^3]', '/Q':'The NCV x Nt matrix with the heat transfer TO the gas in each control volume [kW]', '/xL':'The NCV x Nt matrix with the oil mass fraction in each control volume [-]', '/A_shell':'The shell area of the machine [m^2]', '/h_shell':'The heat transfer coefficient between the shell and the ambient [kW/m^2/K]', '/key_inlet':'The key for the inlet node', '/key_outlet':'The key for the outlet node', '/elapsed_time':'The elapsed time for the simulation run [s]', '/eta_a': 'Adiabatic efficiency [-]', '/eta_oi':'Overall isentropic efficiency [-]', '/eta_v':'Volumetric efficiency [-]', '/Qamb':'Rate of heat transfer from the machine TO the ambient [kW]', '/RK45_eps':'Step error tolerance for Runge-Kutta method [varied]', '/Tamb':'Ambient temperature [K]', '/Wdot_pv':'Mechanical power calculated as the integral of -pdV [kW]', '/Wdot_electrical':'Electrical power of the machine [kW]', '/Wdot_forces':'Mechanical power calculated from the mechanical analysis [kW]', '/mdot':'Mass flow rate [kg/s]', '/motor/eta_motor':'Motor efficiency [-]', '/motor/losses':'Losses generated in the motor [kW]', '/motor/suction_fraction':'Fraction of the motor losses that are added to the suction gas [-]', '/motor/type':'The model used to simulate the motor', '/omega':'Rotational speed [rad/s]', '/run_index':'A unique identifier for runs in a batch' } hf = h5py.File(fName,'a') for k, v in attrs_dict.items(): dataset = hf.get(k) if dataset is None: print('bad key',k) else: dataset.attrs['note'] = v hf.close()
[docs] def post_solve(self): """ Do some post-processing to calculate flow rates, efficiencies, etc. """ #Resize all the matrices to keep only the real data print('Ntheta is', self.Ntheta) self.t = self.t[ 0:self.Ntheta] self.T = self.T[:,0:self.Ntheta] self.p = self.p[:,0:self.Ntheta] self.Q = self.Q[:,0:self.Ntheta] self.m = self.m[:,0:self.Ntheta] self.rho = self.rho[:,0:self.Ntheta] self.V = self.V[:,0:self.Ntheta] self.dV = self.dV[:,0:self.Ntheta] self.h = self.h[:,0:self.Ntheta] self.xValves = self.xValves[:,0:self.Ntheta] print('mdot*(h2-h1),P-v,Qamb', self.Wdot, self.Wdot_pv, self.Qamb) print('Mass flow rate is',self.mdot*1000,'g/s') print('Volumetric efficiency is',self.eta_v*100,'%') print('Adiabatic efficiency is',self.eta_a*100,'%') # Restructure the history for easier writing to file and more clear description of what the things are hdisc_history = list(zip(*self.solvers.hdisc_history)) self.solvers.hdisc_history = dict(hd = np.array(hdisc_history[0]), hd_error = np.array(hdisc_history[1])) lump_eb_history = list(zip(*self.solvers.lump_eb_history)) self.solvers.lump_eb_history = dict(Tlumps = np.array(lump_eb_history[0]), lump_eb_error = np.array(lump_eb_history[1])) self.solvers.initial_states_history = np.array(zip(*self.solvers.initial_states_history)).T
[docs] def derivs(self, theta, x): """ Evaluate the derivatives of the state variables derivs() is an internal function that should (probably) not actually be called by any user-provided code, but is documented here for completeness. Parameters ---------- theta : float The value of the independent variable x : :class:`arraym <PDSim.misc.datatypes.arraym>` instance The array of the independent variables (state variables plus valve parameters) Returns ------- dfdt : :class:`arraym <PDSim.misc.datatypes.arraym>` instance """ # Updates the state, calculates the volumes, prepares all the things needed for derivatives self.core.properties_and_volumes(self.CVs.exists_CV, theta, STATE_VARS_TM, x) # Calculate the flows and sum up all the terms self.core.calculate_flows(self.Flows) # Calculate the heat transfer terms if provided if self.callbacks.heat_transfer_callback is not None: self.core.Q = arraym(self.callbacks.heat_transfer_callback(theta)) if not len(self.core.Q) == self.CVs.Nexist: raise ValueError('Length of Q is not equal to length of number of CV in existence') else: self.core.Q = empty_arraym(self.CVs.Nexist) # Calculate the derivative terms and set the derivative of the state vector self.core.calculate_derivs(self.omega, False) # Add the derivatives for the valves if self.__hasValves__: # offset = len(self.stateVariables)*self.CVs.Nexist for i, valve in enumerate(self.Valves): if x[offset+2+i*2-1] < -10: x[offset+2+i*2-1]=0.0 if x[offset+i*2] > valve.x_stopper and x[offset+2+i*2-1] > 0.0 : x[offset+2+i*2-1]=0.0 # Get the values from the input array for this valve xvalve = x[offset+i*2:offset+2+i*2] # Set the values in the valve class valve.set_xv(xvalve) # Get the derivatives of position and derivative of velocity self.core.property_derivs.extend(valve.derivs(self)) return self.core.property_derivs
[docs] def valves_callback(self): """ This is the default valves_callback function that builds the list of derivatives of position and velocity with respect to the crank angle It returns a :class:`list` instance with the valve return values in order """ #Run each valve model in turn to calculate the derivatives of the valve parameters # for each valve f=[] for Valve in self.Valves: f+=Valve.derivs(self) return f
[docs] def IsentropicNozzleFM(self,FlowPath,A,**kwargs): """ A generic isentropic nozzle flow model wrapper Parameters ---------- FlowPath : :class:`FlowPath <PDSim.flow.flow.FlowPath>` instance A fully-instantiated flow path model A : float throat area for isentropic nozzle model [:math:`m^2`] Returns ------- mdot : float The mass flow through the flow path [kg/s] """ try: mdot = flow_models.IsentropicNozzle(A, FlowPath.State_up, FlowPath.State_down) return mdot except ZeroDivisionError: return 0.0
[docs] def IsentropicNozzleFMSafe(self,FlowPath,A,DP_floor,**kwargs): """ A generic isentropic nozzle flow model wrapper with the added consideration that if the pressure drop is below the floor value, there is no flow. This was added to handle the case of the injection line where there is no flow out of the injection which greatly increases the numerical stiffness Parameters ---------- FlowPath : :class:`FlowPath <PDSim.flow.flow.FlowPath>` A fully-instantiated flow path model A : float throat area for isentropic nozzle model [:math:`m^2`] DP_floor: float The minimum pressure drop [kPa] Returns ------- mdot : float The mass flow through the flow path [kg/s] """ try: if FlowPath.State_up.p-FlowPath.State_down.p > DP_floor: mdot = flow_models.IsentropicNozzle(A, FlowPath.State_up, FlowPath.State_down) return mdot else: return 0.0 except ZeroDivisionError: return 0.0
[docs] def step_callback(self,t,h,i): """ The default step_callback which does nothing Parameters ---------- t : float The current value of the independent variable h : float The current step size i : int The current index """ return False,h
[docs] def endcycle_callback(self, eps_wrap_allowed=0.0001): """ This function can be called at the end of the cycle if so desired. Its primary use is to determine whether the cycle has converged for a given set of discharge temperatures and lump temperatures. Parameters ---------- eps_wrap_allowed : float Maximum error allowed, in absolute value Returns ------- redo : bool ``True`` if cycle should be run again with updated inputs, ``False`` otherwise. A return value of ``True`` means that convergence of the cycle has been achieved """ assert self.Ntheta - 1 == self.Itheta #old and new CV keys LHS,RHS=[],[] errorT,error_rho,error_mass,newT,new_rho,new_mass,oldT,old_rho,old_mass={},{},{},{},{},{},{},{},{} for key in self.CVs.exists_keys: # Get the 'becomes' field. If a list, parse each fork of list. If a single key convert # into a list so you can use the same code below if not isinstance(self.CVs[key].becomes, list): becomes = [self.CVs[key].becomes] else: becomes = self.CVs[key].becomes Iold = self.CVs.index(key) for newkey in becomes: # If newkey is 'none', the control volume will die at the end # of the cycle, so just keep going if newkey == 'none': continue Inew = self.CVs.index(newkey) newCV = self.CVs[newkey] # There can't be any overlap between keys if newkey in newT: raise KeyError('newkey [' + newkey + '] is already in newT; becomes keys overlap but should not') #What the state variables were at the start of the rotation oldT[newkey]=self.T[Inew, 0] old_rho[newkey]=self.rho[Inew, 0] #What they are at the end of the rotation newT[newkey]=self.T[Iold,self.Itheta] new_rho[newkey]=self.rho[Iold,self.Itheta] errorT[newkey]=(oldT[newkey]-newT[newkey])/newT[newkey] error_rho[newkey]=(old_rho[newkey]-new_rho[newkey])/new_rho[newkey] #Update the list of keys for setting the exist flags LHS.append(key) RHS.append(newkey) error_T_list = [errorT[key] for key in self.CVs.keys if key in newT] error_rho_list = [error_rho[key] for key in self.CVs.keys if key in new_rho] new_T_list = [newT[key] for key in self.CVs.keys if key in newT] new_rho_list = [new_rho[key] for key in self.CVs.keys if key in new_rho] #Reset the exist flags for the CV - this should handle all the possibilities #Turn off the LHS CV for key in LHS: self.CVs[key].exists=False #Turn on the RHS CV for key in RHS: self.CVs[key].exists=True self.update_existence() # Error values are based on density and temperature independent of # selection of state variables error_list = [] for var in ['T','D']: if var == 'T': error_list += error_T_list elif var == 'D': error_list += error_rho_list elif var == 'M': error_list += error_mass_list else: raise KeyError # Calculate the volumes at the beginning of the next rotation self.core.just_volumes(self.CVs.exists_CV, 0) V = {key:V for key,V in zip(self.CVs.exists_keys,self.core.V)} new_mass_list = [new_rho[key]*V[key] for key in self.CVs.exists_keys] new_list = [] for var in self.stateVariables: if var == 'T': new_list += new_T_list elif var == 'D': new_list += new_rho_list elif var == 'M': new_list += new_mass_list else: raise KeyError # Add values for valves for valve in self.Valves: new_list += list(valve.get_xv()) return arraym(error_list), arraym(new_list)
[docs] def connect_flow_functions(self): """ Reconnect function pointers For pickling purposes, it can sometimes be useful to just give the name of the function relative to the PDSimCore (or derived class). If the function is just a string, reconnect it to the function in the PDSimCore instance """ for Flow in self.Flows: if hasattr(Flow.MdotFcn, 'Function'): if isinstance(Flow.MdotFcn.Function, six.string_types): if hasattr(self,Flow.MdotFcn.Function): Flow.MdotFcn.Function = getattr(self, Flow.MdotFcn.Function) else: raise AttributeError('The name of the function ['+Flow.MdotFcn.Function+']is not found in the PDSimCore derived class instance')
if __name__=='__main__': PC = PDSimCore() PC.attach_HDF5_annotations('runa.h5') print('This is the base class that is inherited by other compressor types. Running this file doesn\'t do anything')