Source code for fudge.gnds.reactions.base

# <<BEGIN-copyright>>
# <<END-copyright>>

from pqu import PQU as PQUModule

from PoPs import IDs as IDsPoPsModule
from PoPs.groups import misc as chemicalElementMiscPoPsModule

import xData.ancestry as ancestryModule

import fudge
from fudge.core.utilities import fudgeExceptions

from .. import channels as channelsModule
from ..reactionData.doubleDifferentialCrossSection import doubleDifferentialCrossSection as doubleDifferentialCrossSectionModule
from ..reactionData import crossSection as crossSectionModule
from ..reactionData import availableEnergy as availableEnergyModule
from ..reactionData import availableMomentum as availableMomentumModule

__metaclass__ = type

[docs]class base_reaction( ancestryModule.ancestry ) : """Base class for all types of reaction.""" ancestryMembers = ( 'crossSection', 'doubleDifferentialCrossSection', 'outputChannel' ) def __init__( self, outputChannel, ENDF_MT, documentation = None, label = None, process = None ) : ancestryModule.ancestry.__init__( self ) self.__label = label self.__doubleDifferentialCrossSection = doubleDifferentialCrossSectionModule.component( ) self.__doubleDifferentialCrossSection.setAncestor( self ) self.__crossSection = crossSectionModule.component( ) self.__crossSection.setAncestor( self ) if( not isinstance( outputChannel, channelsModule.channel ) ) : raise fudgeExceptions.FUDGE_Exception( 'ouputChannel not instance of channel class.' ) self.outputChannel = outputChannel self.outputChannel.setAncestor( self ) self.ENDF_MT = int( ENDF_MT ) self.__process = process self.documentation = {} if( not( documentation is None ) ) : self.addDocumentation( documentation ) @property def doubleDifferentialCrossSection( self ) : return( self.__doubleDifferentialCrossSection ) @property def crossSection( self ) : return( self.__crossSection ) @property def label( self ) : """Returns the reaction's label.""" if( self.__label is None ) : self.updateLabel( ) return( self.__label )
[docs] def updateLabel( self ) : """Sets the reaction's label from outputChannel products.""" self.__label = str( self.outputChannel ) if( self.process is not None ) : self.__label += ' [%s]' % self.process
@property def process( self ) : return( self.__process ) @process.setter def process( self, value ) : if( value is not None ) : if( not( isinstance( value, str ) ) ) : raise TypeError( 'process must be a string' ) self.__process = value
[docs] def check( self, info ): """ This method is usually not called directly. Use reactionSuite.check() instead. Checks cross section and outputChannel (and differential cross sections if available). Checks include: do Z/A balance? do Q-value and thresholds agree? Does cross section domain agree with each product distribution/multiplicity domain? Does energy balance? @:param info: dict @:return list of warnings """ from fudge.gnds import warning from . import production as productionModule from fudge.gnds.reactionData.doubleDifferentialCrossSection.chargedParticleElastic.CoulombPlusNuclearElastic \ import CoulombDepositionNotSupported warnings = [] reactionSuite = self.getRootAncestor( ) def particleZA( particleID ) : particle = reactionSuite.PoPs[particleID] if hasattr(particle, 'id') and particle.id in reactionSuite.PoPs.aliases: particle = reactionSuite.PoPs[ particle.pid ] return( chemicalElementMiscPoPsModule.ZA( particle ) ) try: # BRB6 hardwired info['Q'] = self.getQ('eV', final=False) except ValueError: pass cpcount = sum( [ ( particleZA( prod.id ) / 1000 ) > 0 for prod in self.outputChannel ] ) info['CoulombOutputChannel'] = cpcount > 1 differentialCrossSectionWarnings = self.doubleDifferentialCrossSection.check( info ) if differentialCrossSectionWarnings: warnings.append( warning.context("doubleDifferentialCrossSection:", differentialCrossSectionWarnings) ) crossSectionWarnings = self.crossSection.check( info ) if crossSectionWarnings: warnings.append( warning.context("Cross section:", crossSectionWarnings) ) if 'Q' in info: del info['Q'] del info['CoulombOutputChannel'] if info['crossSectionOnly']: return warnings # otherwise continue to check outputs # compare calculated and listed Q-values: if not isinstance(self, productionModule.production): # can't reliably compute Q for production reactions try: Q = self.getQ('eV', final=False) Qcalc = info['availableEnergy_eV'] for prod in self.outputChannel: try: Qcalc -= prod.getMass('eV/c**2') * prod.multiplicity.getConstant() except Exception: # multiplicity is not constant if( prod.id == IDsPoPsModule.photon ) : continue raise ValueError("Non-constant multiplicity") if abs(Q-Qcalc) > PQUModule.PQU(info['dQ']).getValueAs('eV'): if self.outputChannel.process != channelsModule.processes.continuum: warnings.append( warning.Q_mismatch( PQUModule.PQU(Qcalc,'eV'), PQUModule.PQU(Q,'eV'), self ) ) except ValueError: pass # this test only works if multiplicity and Q are both constant for all non-gamma products if not (isinstance( self.outputChannel, channelsModule.sumOfRemainingOutputChannels ) or self.outputChannel.isFission( ) or isinstance( self, productionModule.production ) ) : # check that ZA balances: ZAsum = 0 for product in self.outputChannel: if( product.id == IDsPoPsModule.photon ) : continue ZAsum += particleZA( product.id ) * product.multiplicity.getConstant() if ZAsum != info['compoundZA']: warnings.append( warning.ZAbalanceWarning( self ) ) if self.outputChannel.isFission(): from fudge.gnds.channelData.fissionEnergyReleased import fissionEnergyReleased if isinstance(self.outputChannel.Q.evaluated, fissionEnergyReleased): info['crossSectionDomain'] = self.crossSection.domainMin, self.crossSection.domainMax FERwarnings = self.outputChannel.Q.evaluated.check( info ) if FERwarnings: warnings.append( warning.context("fissionEnergyReleased:", FERwarnings) ) del info['crossSectionDomain'] # disabling for now: only complain if distributions are missing for transportables: """ if (not any( [product.distributions.components for product in self.outputChannel] ) and not any( [dProd.distributions.components for prod in [p for p in self.outputChannel if p.outputChannel is not None] for dProd in prod.outputChannel] ) ): # no distributions found for any reaction product or subsequent decay product warnings.append( warning.noDistributions( self ) ) return warnings """ info['crossSectionDomain'] = self.crossSection.domainMin, self.crossSection.domainMax info['isTwoBody'] = isinstance( self.outputChannel, channelsModule.twoBodyOutputChannel ) for product in self.outputChannel: productWarnings = product.check( info ) if productWarnings: warnings.append( warning.context("Product: %s" % product.label, productWarnings) ) del info['crossSectionDomain'] del info['isTwoBody'] if info['checkEnergyBalance'] and not isinstance( self, productionModule.production ): # Calculate energy deposition data for all products, and for decay products. # Then recursively check the product list for energy balance at each step of the reaction/decay # At each step, if we have energy deposition for *every* product in the list, we can rigorously check # energy balance. Otherwise, can only check that we don't exceed available energy. try: self.calculateAverageProductData( style=info['averageProductDataStyle'], **info['averageProductDataArgs'] ) except CoulombDepositionNotSupported: warnings.append( warning.SkippedCoulombElasticEnergyDeposition( self ) ) except Exception as e: warnings.append( warning.EnergyDepositionExceptionRaised( str(e), self ) ) if info['failOnException']: raise return warnings def checkProductsForEnergyBalance( products, Qs, fission = False, decay = False ): # sample usage: for the reaction n + F19 -> n + (F19_c -> F19 + gamma), this function # should be called twice, to test energy balance at each step of the reaction. # First call: products = [n, F19_c] and Qs = [Q_reaction], # second call: products = [n, F19, gamma] and Qs = [Q_reaction, Q_decay]. edepWarnings = [] averageProductDataLabel = info['averageProductDataStyle'].label energyDep = [] for prod in products: if averageProductDataLabel in prod.energyDeposition: energyDep.append( [ prod.label, prod.energyDeposition[ averageProductDataLabel ] ] ) if energyDep: totalEDep = energyDep[0][1] for idx in range(1,len(energyDep)): if( ( totalEDep.domainMin != energyDep[idx][1].domainMin ) or ( totalEDep.domainMax != energyDep[idx][1].domainMax ) ) : upperEps = 0 if totalEDep.domainMax != energyDep[idx][1].domainMax: upperEps = 1e-8 try: totalEDep, energyDep[idx][1] = totalEDep.mutualify( 1e-8,upperEps,0, energyDep[idx][1], 1e-8,upperEps,0) except Exception as e: edepWarnings.append( warning.EnergyDepositionExceptionRaised( str(e), self ) ) if info['failOnException']: raise return warnings totalEDep = totalEDep + energyDep[idx][1] else: totalEDep = [] Qsum = sum(Qs) def energyDepositedPerProduct( energyDep, ein ): """ if energy doesn't balance, determine how much is deposited in each product """ result = [] availableEnergy = ein + Qsum if availableEnergy == 0: availableEnergy = sum( [edep.evaluate(ein) for p,edep in energyDep] ) for prod, edep in energyDep: edep = edep.evaluate( ein ) if edep is None: result.append( (prod, 0) ) else: result.append( ( prod, 100.0 * edep/availableEnergy ) ) return sorted(result, key=lambda foo: foo[1])[::-1] # now we have total energy deposition for all particles, use to check energy balance. # a few special cases to consider: if fission: # fission products aren't listed (so far, anyway), so about 85% of available energy should be missing: for i,(ein,edep) in enumerate(totalEDep): if edep > abs((ein + Qsum) * info['fissionEnergyBalanceLimit']): edepWarnings.append( warning.fissionEnergyImbalance( PQUModule.PQU(ein, totalEDep.axes[0].unit), i, ein+Qsum, energyDepositedPerProduct(energyDep, ein), self ) ) elif len(products) == len(energyDep): # have full energy dep. data for all products, so we can rigorously check energy balance: for i,(ein,edep) in enumerate(totalEDep): if ( abs(edep - (ein+Qsum)) > abs((ein+Qsum) * info['dEnergyBalanceRelative']) and abs(edep - (ein+Qsum)) > PQUModule.PQU( info['dEnergyBalanceAbsolute'] ) .getValueAs(totalEDep.axes[0].unit) ): edepWarnings.append( warning.energyImbalance( PQUModule.PQU(ein, totalEDep.axes[0].unit), i, ein+Qsum, energyDepositedPerProduct(energyDep, ein), self ) ) else: # missing some products, so just check that outgoing energy doesn't exceed incoming: for i,(ein,edep) in enumerate(totalEDep): if ( (edep - (ein+Qsum)) > ((ein+Qsum) * info['dEnergyBalanceRelative']) and (edep - (ein+Qsum)) > PQUModule.PQU( info['dEnergyBalanceAbsolute'] ) .getValueAs(totalEDep.axes[0].unit) ): edepWarnings.append( warning.energyImbalance( PQUModule.PQU(ein, totalEDep.axes[0].unit), i, ein+Qsum, energyDepositedPerProduct(energyDep, ein), self ) ) if edepWarnings: context = "Energy balance" if decay: context += " (after decay)" context += " for products: " + ', '.join( [prod.id for prod in products] ) warnings.append( warning.context( context, edepWarnings ) ) # now recursively check decay products, if any: for pidx, currentProd in enumerate(products): if currentProd.outputChannel is not None: checkProductsForEnergyBalance( products[:pidx] + [p for p in currentProd.outputChannel] + products[pidx+1:], Qs + [currentProd.outputChannel.Q.getConstant()], decay = True, fission = False # FIXME what about spontaneous fission decay? ) # end of helper function checkProductsForEnergyBalance try: Q = self.outputChannel.Q.getConstant() checkProductsForEnergyBalance( products = [p1 for p1 in self.outputChannel], Qs = [Q], fission = self.outputChannel.isFission(), decay=False ) except ValueError: pass # FIXME this test currently disabled when Q non-constant return warnings
@property def domainMin( self ) : return( self.crossSection.domainMin ) @property def domainMax( self ) : return( self.crossSection.domainMax ) @property def domainUnit( self ) : return( self.crossSection.domainUnit )
[docs] def domainUnitConversionFactor( self, unitTo ) : return( self.crossSection.domainUnitConversionFactor( unitTo ) )
[docs] def convertUnits( self, unitMap ) : """See documentation for reactionSuite.convertUnits.""" from . import reaction as reactionModule self.doubleDifferentialCrossSection.convertUnits( unitMap ) self.crossSection.convertUnits( unitMap ) self.outputChannel.convertUnits( unitMap ) if( isinstance( self, reactionModule.reaction ) ) : self.availableEnergy.convertUnits( unitMap ) self.availableMomentum.convertUnits( unitMap )
[docs] def getENDL_CS_ENDF_MT( self ) : """ Returns the reaction's ENDL C, S and ENDF's MT values as integers in a python dictionary with keys 'C', 'S' and 'MT' (e.g., { 'C' : 11, 'S' : 1, 'MT' : 53 }). """ from fudge.legacy.converting import endf_endl MT = self.ENDF_MT C, S = endf_endl.getCSFromMT( MT ) return( { 'C' : C, 'S' : S, 'MT' : MT } )
[docs] def heatCrossSection( self, temperature, EMin, lowerlimit = None, upperlimit = None, interpolationAccuracy = 0.001, heatAllPoints = False, doNotThin = True, heatBelowThreshold = True, heatAllEDomain = True, setThresholdToZero = False ) : if( len( self.doubleDifferentialCrossSection ) > 0 ) : print( " Skipping doubleDifferentialCrossSection reaction " ) return crossSection = self.crossSection.heat( temperature, EMin, lowerlimit, upperlimit, interpolationAccuracy, heatAllPoints, doNotThin, heatBelowThreshold, heatAllEDomain, setThresholdToZero = setThresholdToZero, addToSuite = True ) return( crossSection )
[docs] def addDocumentation( self, documentation ) : self.documentation[documentation.name] = documentation
[docs] def thresholdQAs( self, unit, final = True ) : return( self.outputChannel.thresholdQAs( unit, final = final ) )
[docs] def getDocumentation( self, name ) : return( self.documentation[name] )
[docs] def getQ( self, unit, final = True ) : """Returns the Q-value for this reaction. Converted to float if possible, otherwise a string value is returned.""" return( self.thresholdQAs( unit, final = final ) )
[docs] def getReactionSuite( self ) : from .. import reactionSuite as reactionSuiteModule return( self.findClassInAncestry( reactionSuiteModule.reactionSuite ) )
[docs] def cullStyles( self, styleList ) : from . import reaction as reactionModule # self.__doubleDifferentialCrossSection.cullStyles( styleList ) self.__crossSection.cullStyles( styleList ) self.outputChannel.cullStyles( styleList ) if( isinstance( self, reactionModule.reaction ) ) : self.availableEnergy.cullStyles( styleList ) self.availableMomentum.cullStyles( styleList )
[docs] def calculateAverageProductData( self, style, indent = '', **kwargs ) : """ Calculate average product data. :param style: The style to use. :param indent: string; The amount to indent and verbose output. :param kwargs: string; All other parameters. :return: """ verbosity = kwargs['verbosity'] indent2 = indent + kwargs['incrementalIndent'] if( verbosity > 0 ) : print( '%s%s' % (indent, self.outputChannel.toString( simpleString=True ) ) ) kwargs['reaction'] = self kwargs['EMin'] = self.domainMin kwargs['EMax'] = self.domainMax self.outputChannel.calculateAverageProductData( style, indent2, **kwargs ) if( hasattr( self, 'availableEnergy' ) ) : axes = availableEnergyModule.defaultAxes( self.domainUnit ) QToPointwiseLinear = self.outputChannel.QToPointwiseLinear( final = True ) availableEnergy = availableEnergyModule.XYs1d( data = [ [ QToPointwiseLinear.domainMin, QToPointwiseLinear.domainMin ], [ self.domainMax, self.domainMax ] ], axes = axes ) availableEnergy += QToPointwiseLinear self.availableEnergy.add( availableEnergyModule.XYs1d( data = availableEnergy, axes = axes, label = style.label ) ) if( hasattr( self, 'availableMomentum' ) ) : massInE = kwargs['projectileMass'] availableMomentum = availableMomentumModule.calculateMomentumPoints( style, massInE, self.domainMin, self.domainMax, self.domainUnit ) self.availableMomentum.add( availableMomentum )
[docs] def processMC_cdf( self, style, tempInfo, indent = '', incrementalIndent = ' ' ) : indent2 = indent + tempInfo['incrementalIndent'] verbosity = tempInfo['verbosity'] if( verbosity > 0 ) : print( '%s%s' % (indent, self.outputChannel.toString( simpleString=True ) ) ) self.outputChannel.processMC_cdf( style, tempInfo, indent2 )
[docs] def processGriddedCrossSections( self, style, verbosity = 0, indent = '', incrementalIndent = ' ', isPhotoAtomic = False ) : self.crossSection.processGriddedCrossSections( style, verbosity = verbosity, indent = indent, incrementalIndent = incrementalIndent, isPhotoAtomic = isPhotoAtomic )
[docs] def processMultiGroup( self, style, tempInfo, indent ) : from . import reaction as reactionModule tempInfo['workFile'].append( 'r%s' % tempInfo['reactionIndex'] ) tempInfo['transferMatrixComment'] = tempInfo['reactionSuite'].inputParticlesToReactionString( suffix = " --> " ) + self.toString( ) indent2 = indent + tempInfo['incrementalIndent'] verbosity = tempInfo['verbosity'] if( verbosity > 0 ) : print( '%s%s' % (indent, self.outputChannel.toString( simpleString=True ) ) ) tempInfo['reaction'] = self norm = tempInfo['groupedFlux'] tempInfo['groupedFlux'] = None crossSection = style.findFormMatchingDerivedStyle( self.crossSection ) # BRB FIXME The next line is a kludge, see note on crossSection.resonancesWithBackground.processMultiGroup. if( isinstance( crossSection, crossSectionModule.reference ) ) : crossSection = crossSection.crossSection if( isinstance( crossSection, crossSectionModule.resonancesWithBackground ) ) : crossSection = crossSection.ancestor['recon'] if( not( isinstance( crossSection, crossSectionModule.XYs1d ) ) ) : crossSection = crossSection.toPointwise_withLinearXYs( accuracy = 1e-5, upperEps = 1e-8 ) tempInfo['crossSection'] = crossSection tempInfo['multiGroupCrossSection'] = self.crossSection.processMultiGroup( style, tempInfo, indent2 ) self.crossSection.remove( style.label ) # Not normalized by tempInfo['groupedFlux'] so remove. tempInfo['groupedFlux'] = norm tempInfo['multiGroupCrossSectionNormed'] = self.crossSection.processMultiGroup( style, tempInfo, indent2 ) # Normalized by tempInfo['groupedFlux']. if( isinstance( self, reactionModule.reaction ) ) : self.availableEnergy.processMultiGroup( style, tempInfo, indent2 ) self.availableMomentum.processMultiGroup( style, tempInfo, indent2 ) self.outputChannel.processMultiGroup( style, tempInfo, indent2 ) del tempInfo['workFile'][-1]
[docs] def toString( self, indent = '' ) : return( self.outputChannel.toString( ) )
[docs] def toXML( self, indent = '', **kwargs ) : return( '\n'.join( self.toXMLList( indent, **kwargs ) ) )
[docs] def toXMLList( self, indent = '', **kwargs ) : from . import reaction as reactionModule incrementalIndent = kwargs.get( 'incrementalIndent', ' ' ) indent2 = indent + incrementalIndent indent3 = indent2 + incrementalIndent attributeString = "" if( self.process is not None ) : attributeString += ' process="%s"' % self.process attributeString += ' ENDF_MT="%s"' % self.ENDF_MT xmlString = [ '%s<%s label="%s"' % ( indent, self.moniker, self.label ) ] fissionGenre = self.outputChannel.getFissionGenre( ) if fissionGenre is not None: attributeString += ' fissionGenre="%s"' % fissionGenre xmlString[-1] += attributeString + '>' if self.documentation: # BRB6 What is this xmlString.append( '%s<documentations>' % indent2 ) for doc in self.documentation: xmlString += self.documentation[doc].toXMLList( indent3, **kwargs ) xmlString[-1] += '</documentations>' xmlString += self.doubleDifferentialCrossSection.toXMLList( indent2, **kwargs ) xmlString += self.crossSection.toXMLList( indent2, **kwargs ) xmlString += self.outputChannel.toXMLList( indent2, **kwargs ) if( isinstance( self, reactionModule.reaction ) ) : xmlString += self.availableEnergy.toXMLList( indent2, **kwargs ) xmlString += self.availableMomentum.toXMLList( indent2, **kwargs ) xmlString[-1] += '</%s>' % self.moniker return xmlString
[docs] @classmethod def parseXMLNode( cls, element, xPath, linkData ) : """Translate a <reaction> element from xml into a reaction class instance.""" xPath.append( '%s[@label="%s"]' % ( element.tag, element.get( 'label' ) ) ) crossSectionComponent = fudge.gnds.reactionData.crossSection.parseXMLNode( element.find( 'crossSection' ), xPath, linkData ) outputChannel = fudge.gnds.channels.parseXMLNode( element.find('outputChannel'), xPath, linkData) outputChannel.fissionGenre = element.get( 'fissionGenre' ) reaction = cls( outputChannel = outputChannel, label = element.get( 'label' ), ENDF_MT = int( element.get('ENDF_MT') ) ) if element.get('process'): reaction.process = element.get('process') for crossSection in crossSectionComponent : reaction.crossSection.add( crossSection ) if( element.find( 'documentations' ) ) : for doc in element.find( 'documentations' ) : reaction.addDocumentation( fudge.gnds.documentation.documentation.parseXMLNode( doc, xPath, linkData)) if( element.find( 'doubleDifferentialCrossSection' ) ) : reaction.doubleDifferentialCrossSection.parseXMLNode( element.find( 'doubleDifferentialCrossSection' ), xPath, linkData ) if( element.find( availableEnergyModule.component.moniker ) ) : reaction.availableEnergy = availableEnergyModule.parseXMLNode( element.find( availableEnergyModule.component.moniker ), xPath, linkData ) if( element.find( availableMomentumModule.component.moniker ) ) : reaction.availableMomentum = availableMomentumModule.parseXMLNode( element.find( availableMomentumModule.component.moniker ), xPath, linkData ) xPath.pop( ) return( reaction )
[docs]def isGNDSReaction( o ) : """Returns True if o is an instance of base_reaction or of a subclass thereof. """ return isinstance(o, base_reaction)