"""
.. module:: reporters
:platform: Unix, Windows
:synopsis: a module for defining OpenMM reporter classes.
.. moduleauthor:: Charlles R. A. Abreu <abreu@eq.ufrj.br>
.. _pandas.DataFrame: https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html
.. _StateDataReporter: http://docs.openmm.org/latest/api-python/generated/simtk.openmm.app.statedatareporter.StateDataReporter.html
.. _CustomIntegrator: http://docs.openmm.org/latest/api-python/generated/simtk.openmm.openmm.CustomIntegrator.html
.. _CustomCVForce: docs.openmm.org/latest/api-python/generated/simtk.openmm.openmm.CustomCVForce.html
"""
import sys
import numpy as np
import pandas as pd
from simtk import openmm
from simtk import unit
from simtk.openmm import app
from .computers import PressureComputer
from .computers import _MoleculeTotalizer
from .utils import InputError
class _MultiStream:
def __init__(self, outputs):
self._outputs = list()
for output in outputs:
self._outputs.append(open(output, 'w') if isinstance(output, str) else output)
def __del__(self):
for output in self._outputs:
if output != sys.stdout and output != sys.stderr:
output.close()
def write(self, message):
for output in self._outputs:
output.write(message)
def flush(self):
for output in self._outputs:
output.flush()
class _AtomsMM_Reporter():
"""
Base class for reporters.
"""
def __init__(self, file, reportInterval, **kwargs):
self._reportInterval = reportInterval
self._requiresInitialization = True
self._needsPositions = False
self._needsVelocities = False
self._needsForces = False
self._needEnergy = False
extraFile = kwargs.pop('extraFile', None)
if extraFile is None:
self._out = open(file, 'w') if isinstance(file, str) else file
else:
self._out = _MultiStream([file, extraFile])
self._separator = kwargs.pop('separator', ',')
def _initialize(self, simulation, state):
pass
def _generateReport(self, simulation, state):
pass
def describeNextReport(self, simulation):
"""
Get information about the next report this object will generate.
Parameters
----------
simulation : Simulation
The Simulation to generate a report for
Returns
-------
tuple
A five element tuple. The first element is the number of steps
until the next report. The remaining elements specify whether
that report will require positions, velocities, forces, and
energies respectively.
"""
steps = self._reportInterval - simulation.currentStep % self._reportInterval
return (steps, self._needsPositions, self._needsVelocities, self._needsForces, self._needEnergy)
def report(self, simulation, state):
"""
Generate a report.
Parameters
----------
simulation : Simulation
The Simulation to generate a report for
state : State
The current state of the simulation
"""
if self._requiresInitialization:
self._initialize(simulation, state)
self._requiresInitialization = False
self._generateReport(simulation, state)
[docs]class ExtendedStateDataReporter(app.StateDataReporter):
"""
An extension of OpenMM's StateDataReporter_ class, which outputs information about a simulation,
such as energy and temperature, to a file.
All original functionalities of StateDataReporter_ are preserved and the following ones are
included:
1. Report the Coulomb contribution of the potential energy (keyword: `coulombEnergy`):
This contribution includes both real- and reciprocal-space terms.
2. Report the atomic virial of a fully-flexible system (keyword: `atomicVirial`):
Considering full scaling of atomic coordinates in a box volume change (i.e. without any
distance constraints), the internal virial of the system is given by
.. math::
W = -\\sum_{i,j} r_{ij} E^\\prime(r_{ij}),
where :math:`E^\\prime(r)` is the derivative of the pairwise interaction potential as a
function of the distance between to atoms. Such interaction includes van der Waals, Coulomb,
and bond-stretching contributions. Bond-bending and dihedral angles are not considered
because they are invariant to full volume-scaling of atomic coordinates.
3. Report the nonbonded contribution of the atomic virial (keyword: `nonbondedVirial`):
The nonbonded virial is given by
.. math::
W_\\mathrm{nb} = -\\sum_{i,j} r_{ij} E_\\mathrm{nb}^\\prime(r_{ij}),
where :math:`E_\\mathrm{nb}^\\prime(r)` is the derivative of the nonbonded pairwise
potential, which comprises van der Waals and Coulomb interactions only.
4. Report the atomic pressure of a fully-flexible system (keyword: `atomicPressure`):
.. math::
P = \\frac{2 K + W}{3 V},
where :math:`K` is the kinetic energy sum for all atoms in the system. If keyword
`bathTemperature` is employed (see below), the instantaneous kinetic energy is substituted
by its equipartition-theorem average
:math:`\\left\\langle K \\right\\rangle = 3 N_\\mathrm{atoms} k_B T/2`,
where :math:`T` is the heat-bath temperature.
5. Report the molecular virial of a system (keyword: `molecularVirial`):
To compute the molecular virial, only the center-of-mass coordinates of the molecules are
considered to scale in a box volume change, while the internal molecular structure is kept
unaltered. The molecular virial is computed from the nonbonded part of the atomic virial by
using the formulation of Ref. :cite:`Hunenberger_2002`:
.. math::
W_\\mathrm{mol} = W - \\sum_{i} (\\mathbf{r}_i - \\mathbf{r}_i^\\mathrm{cm}) \\cdot \\mathbf{F}_i,
where :math:`\\mathbf{r}_i` is the coordinate of atom i, :math:`\\mathbf{F}_i` is the
resultant pairwise force acting on it (excluding bond-bending and dihedral angles), and
:math:`\\mathbf{r}_i^\\mathrm{cm}` is the center-of-mass coordinate of its containing
molecule.
6. Report the molecular pressure of a system (keyword: `molecularPressure`):
.. math::
P = \\frac{2 K_\\mathrm{mol} + W_\\mathrm{mol}}{3 V},
where :math:`K_\\mathrm{mol}` is the center-of-mass kinetic energy summed for all molecules
in the system. If keyword `bathTemperature` is employed (see below), the instantaneous
kinetic energy is substituted by its equipartition-theorem average
:math:`\\left\\langle K_\\mathrm{mol} \\right\\rangle = 3 N_\\mathrm{mols} k_B T/2`,
where :math:`T` is the heat-bath temperature.
7. Report the center-of-mass kinetic energy (keyword: `molecularKineticEnergy`):
.. math::
K_\\mathrm{mol} = \\frac{1}{2} \\sum_{i=1}^{N_\\mathrm{mol}} M_i v_{\\mathrm{cm}, i}^2,
where :math:`N_\\mathrm{mol}` is the number of molecules in the system, :math:`M_i` is the
total mass of molecule `i`, and :math:`v_{\\mathrm{cm}, i}` is the center-of-mass velocity
of molecule `i`.
8. Report potential energies at multiple global parameter states (keyword: `globalParameterStates`):
Computes and reports the potential energy of the system at a number of provided global
parameter states.
9. Report global parameter values (keyword: `globalParameters`):
Reports the values of specified global parameters.
10. Report derivatives of energy with respect to global parameters (keyword: `energyDerivatives`):
Computes and reports derivatives of the potential energy of the system at the current
state with respect to specified global parameters.
11. Report values of collective variables (keyword: `collectiveVariables`)
Report the values of a set of collective variables.
12. Allow specification of an extra file for reporting (keyword: `extraFile`).
This can be used for replicating a report simultaneously to `sys.stdout` and to a file
using a unique reporter.
Keyword Args
------------
coulombEnergy : bool, optional, default=False
Whether to write the Coulomb contribution of the potential energy to the file.
atomicVirial : bool, optional, default=False
Whether to write the total atomic virial to the file.
nonbondedVirial : bool, optional, default=False
Whether to write the nonbonded contribution to the atomic virial to the file.
atomicPressure : bool, optional, default=False
Whether to write the internal atomic pressure to the file.
molecularVirial : bool, optional, default=False
Whether to write the molecular virial to the file.
molecularPressure : bool, optional, default=False
Whether to write the internal molecular pressure to the file.
molecularKineticEnergy : bool, optional, default=False
Whether to write the molecular center-of-mass kinetic energy to the file.
globalParameterStates : pandas.DataFrame_, optional, default=None
A DataFrame containing context global parameters (column names) and sets of values
thereof. If it is provided, then the potential energy will be reported for every state
these parameters define.
globalParameters : list(str), optional, default=None
A list of global parameter names. If it is provided, then the values of these parameters
will be reported.
energyDerivatives : list(str), optional, default=None
A list of global parameter names. If it is provided, then the derivatives of the
total potential energy with respect to these parameters will be reported. It is
necessary that the calculation of these derivatives has been activated beforehand
(see, for instance, CustomIntegrator_).
collectiveVariables : list(openmm.CustomCVForce), optional, default=None
A list of CustomCVForce_ objects. If it is provided, then the values of all collective
variables associated with these objects will be reported.
pressureComputer : :class:`~atomsmm.computers.PressureComputer`, optional, default=None
A computer designed to determine pressures and virials. This is mandatory if any keyword
related to virial or pressure is set as `True`.
extraFile : str or file, optional, default=None
Extra file to write to, specified as a file name or a file object.
"""
def __init__(self, file, reportInterval, **kwargs):
self._coulombEnergy = kwargs.pop('coulombEnergy', False)
self._atomicVirial = kwargs.pop('atomicVirial', False)
self._nonbondedVirial = kwargs.pop('nonbondedVirial', False)
self._atomicPressure = kwargs.pop('atomicPressure', False)
self._molecularVirial = kwargs.pop('molecularVirial', False)
self._molecularPressure = kwargs.pop('molecularPressure', False)
self._molecularKineticEnergy = kwargs.pop('molecularKineticEnergy', False)
self._globalParameterStates = kwargs.pop('globalParameterStates', None)
self._globalParameters = kwargs.pop('globalParameters', None)
self._energyDerivatives = kwargs.pop('energyDerivatives', None)
self._collectiveVariables = kwargs.pop('collectiveVariables', None)
self._pressureComputer = kwargs.pop('pressureComputer', None)
extra = kwargs.pop('extraFile', None)
if extra is None:
super().__init__(file, reportInterval, **kwargs)
else:
super().__init__(_MultiStream([file, extra]), reportInterval, **kwargs)
self._computing = any([self._coulombEnergy,
self._atomicVirial,
self._nonbondedVirial,
self._atomicPressure,
self._molecularVirial,
self._molecularPressure,
self._molecularKineticEnergy])
if self._computing:
if self._pressureComputer is not None and not isinstance(self._pressureComputer, PressureComputer):
raise InputError('keyword "pressureComputer" requires a PressureComputer instance')
self._needsPositions = True
self._needsForces = any([self._needsForces,
self._molecularVirial,
self._molecularPressure])
self._needsVelocities = any([self._needsVelocities,
self._molecularPressure,
self._atomicPressure,
self._molecularKineticEnergy])
self._backSteps = -sum([self._speed, self._elapsedTime, self._remainingTime])
def _add_item(self, lst, item):
if self._backSteps == 0:
lst.append(item)
else:
lst.insert(self._backSteps, item)
def _constructHeaders(self):
headers = super()._constructHeaders()
if self._coulombEnergy:
self._add_item(headers, 'Coulomb Energy (kJ/mole)')
if self._atomicVirial:
self._add_item(headers, 'Atomic Virial (kJ/mole)')
if self._nonbondedVirial:
self._add_item(headers, 'Nonbonded Virial (kJ/mole)')
if self._atomicPressure:
self._add_item(headers, 'Atomic Pressure (atm)')
if self._molecularVirial:
self._add_item(headers, 'Molecular Virial (kJ/mole)')
if self._molecularPressure:
self._add_item(headers, 'Molecular Pressure (atm)')
if self._molecularKineticEnergy:
self._add_item(headers, 'Molecular Kinetic Energy (kJ/mole)')
if self._globalParameterStates is not None:
for index in self._globalParameterStates.index:
self._add_item(headers, 'Energy[{}] (kJ/mole)'.format(index))
if self._globalParameters is not None:
for name in self._globalParameters:
self._add_item(headers, name)
if self._energyDerivatives is not None:
for name in self._energyDerivatives:
self._add_item(headers, 'diff(E,{})'.format(name))
if self._collectiveVariables is not None:
for force in self._collectiveVariables:
for index in range(force.getNumCollectiveVariables()):
name = force.getCollectiveVariableName(index)
self._add_item(headers, name)
return headers
def _constructReportValues(self, simulation, state):
values = super()._constructReportValues(simulation, state)
if self._computing:
computer = self._pressureComputer
computer.import_configuration(state)
atomicVirial = computer.get_atomic_virial().value_in_unit(unit.kilojoules_per_mole)
if self._coulombEnergy:
coulombVirial = computer.get_coulomb_virial()
self._add_item(values, coulombVirial.value_in_unit(unit.kilojoules_per_mole))
if self._atomicVirial:
self._add_item(values, atomicVirial)
if self._nonbondedVirial:
nonbondedVirial = computer.get_dispersion_virial() + computer.get_coulomb_virial()
self._add_item(values, nonbondedVirial.value_in_unit(unit.kilojoules_per_mole))
if self._atomicPressure:
atomicPressure = computer.get_atomic_pressure()
self._add_item(values, atomicPressure.value_in_unit(unit.atmospheres))
if self._molecularVirial or self._molecularPressure:
forces = state.getForces(asNumpy=True)
if self._molecularVirial:
molecularVirial = computer.get_molecular_virial(forces)
self._add_item(values, molecularVirial.value_in_unit(unit.kilojoules_per_mole))
if self._molecularPressure:
molecularPressure = computer.get_molecular_pressure(forces)
self._add_item(values, molecularPressure.value_in_unit(unit.atmospheres))
if self._molecularKineticEnergy:
molKinEng = computer.get_molecular_kinetic_energy()
self._add_item(values, molKinEng.value_in_unit(unit.kilojoules_per_mole))
if self._globalParameterStates is not None:
original = dict()
for name in self._globalParameterStates.columns:
original[name] = simulation.context.getParameter(name)
latest = original.copy()
for index, row in self._globalParameterStates.iterrows():
for name, value in row.items():
if value != latest[name]:
simulation.context.setParameter(name, value)
latest[name] = value
energy = simulation.context.getState(getEnergy=True).getPotentialEnergy()
self._add_item(values, energy.value_in_unit(unit.kilojoules_per_mole))
for name, value in original.items():
if value != latest[name]:
simulation.context.setParameter(name, value)
if self._globalParameters is not None:
for name in self._globalParameters:
self._add_item(values, simulation.context.getParameter(name))
if self._energyDerivatives is not None:
mystate = simulation.context.getState(getParameterDerivatives=True)
derivative = mystate.getEnergyParameterDerivatives()
for name in self._energyDerivatives:
self._add_item(values, derivative[name])
if self._collectiveVariables is not None:
for force in self._collectiveVariables:
for cv in force.getCollectiveVariableValues(simulation.context):
self._add_item(values, cv)
return values
[docs]class XYZReporter(_AtomsMM_Reporter):
"""
Outputs to an XYZ-format file a series of frames containing the coordinates, velocities,
momenta, or forces on all atoms in a Simulation.
.. note::
Coordinates are expressed in nanometers, velocities in nanometer/picosecond, momenta in
dalton*nanometer/picosecond, and forces in dalton*nanometer/picosecond^2.
To use this reporter, create an XYZReporter object and append it to the Simulation's list of
reporters.
Keyword Args
------------
output : str, default='positions'
Which kind of info to report. Valid options are 'positions', 'velocities', 'momenta' and
'forces'.
groups : set(int), default=None
Which force groups to consider in the force calculations. If this is `None`, then all
force groups will be evaluated.
"""
def __init__(self, file, reportInterval, **kwargs):
self._output = kwargs.get('output', 'positions')
self._groups = kwargs.get('groups', None)
if self._output == 'positions':
self._unit = unit.angstroms
elif self._output == 'velocities':
self._unit = unit.angstroms/unit.picoseconds
elif self._output == 'momenta':
self._unit = unit.dalton*unit.angstroms/unit.picoseconds
elif self._output == 'forces':
self._unit = unit.dalton*unit.angstroms/unit.picoseconds**2
else:
raise InputError('Unrecognizable keyword value')
super().__init__(file, reportInterval, **kwargs)
self._needsPositions = self._output == 'positions'
self._needsVelocities = self._output in ['velocities', 'momenta']
self._needsForces = self._output == 'forces'
def _initialize(self, simulation, state):
self._symbols = [atom.element.symbol for atom in simulation.topology.atoms()]
sys = simulation.system
self._N = sys.getNumParticles()
if self._output == 'momenta':
mass = [sys.getParticleMass(i).value_in_unit(unit.dalton) for i in range(self._N)]
self._mass = np.vstack([mass, mass, mass]).transpose()*unit.dalton
def _get_values(self, simulation, state):
if self._output == 'positions':
values = state.getPositions(asNumpy=True)
elif self._output == 'velocities':
values = state.getVelocities(asNumpy=True)
elif self._output == 'momenta':
values = self._mass*state.getVelocities(asNumpy=True)
elif self._groups is None:
values = state.getForces(asNumpy=True)
else:
new_state = simulation.context.getState(getForces=True, groups=self._groups)
values = new_state.getForces(asNumpy=True)
return values.value_in_unit(self._unit)
def _write(self, step, N, names, values):
print(N, file=self._out)
pd.DataFrame(index=names, data=values).to_csv(
self._out,
sep='\t',
header=[f'{self._output} in {self._unit} at time step {step}', '', ''],
)
def _generateReport(self, simulation, state):
values = self._get_values(simulation, state)
self._write(simulation.currentStep, self._N, self._symbols, values)
[docs]class CenterOfMassReporter(XYZReporter):
"""
Outputs to an XYZ-format file a series of frames containing the center-of-mass coordinates,
center-of-mass velocities, total momenta, or resultant forces on all molecules in a Simulation.
.. note::
Coordinates are expressed in nanometers, velocities in nanometer/picosecond, momenta in
dalton*nanometer/picosecond, and forces in dalton*nanometer/picosecond^2.
To use this reporter, create an CenterOfMassReporter object and append it to the Simulation's
list of reporters.
Keyword Args
------------
output : str, default='positions'
Which kind of info to report. Valid options are 'positions', 'velocities', 'momenta' and
'forces'.
groups : set(int), default=None
Which force groups to consider in the force calculations. If this is `None`, then all
force groups will be evaluated.
"""
def _initialize(self, simulation, state):
super()._initialize(simulation, state)
self._mols = _MoleculeTotalizer(simulation.context, simulation.topology)
def _generateReport(self, simulation, state):
values = self._get_values(simulation, state)
if self._output in ['positions', 'velocities']:
cm_values = self._mols.massFrac.dot(values)
else:
cm_values = self._mols.selection.dot(values)
self._write(simulation.currentStep, self._mols.nmols, self._mols.residues, cm_values)
[docs]class CustomIntegratorReporter(_AtomsMM_Reporter):
"""
Outputs global and per-DoF variables of a CustomIntegrator instance.
Keyword Args
------------
describeOnly : bool, optional, default=True
Whether to output only descriptive statistics that summarize the activated per-Dof
variables.
"""
def __init__(self, file, reportInterval, **kwargs):
super().__init__(file, reportInterval, **kwargs)
self._describeOnly = kwargs.pop('describeOnly', True)
self._variables = []
for key, value in kwargs.items():
if value is True:
self._variables.append(key)
if not self._variables:
raise InputError("No global or perDof variables have been passed")
def _initialize(self, simulation, state):
integrator = self._integrator = simulation.integrator
if not isinstance(integrator, openmm.CustomIntegrator):
raise Exception("simulation.integrator is not a CustomIntegrator")
self._globals = {}
for index in range(integrator.getNumGlobalVariables()):
variable = integrator.getGlobalVariableName(index)
if variable in self._variables:
self._globals[variable] = index
self._perDof = {}
for index in range(integrator.getNumPerDofVariables()):
variable = integrator.getPerDofVariableName(index)
if variable in self._variables:
self._perDof[variable] = index
if set(self._variables) != set(self._globals) | set(self._perDof):
raise InputError("Unknown variables have been passed")
def _generateReport(self, simulation, state):
for variable, index in self._globals.items():
value = self._integrator.getGlobalVariable(index)
print('{}\n{}'.format(variable, value), file=self._out)
for variable, index in self._perDof.items():
values = self._integrator.getPerDofVariable(index)
titles = ['{}.{}'.format(variable, dir) for dir in ['x', 'y', 'z']]
df = pd.DataFrame(data=np.array(values), columns=titles)
if self._describeOnly:
print(df.describe(), file=self._out)
else:
df.to_csv(self._out, sep='\t')
[docs]class ExpandedEnsembleReporter(_AtomsMM_Reporter):
"""
Performs an Expanded Ensemble simulation and reports the energies of multiple states.
Parameters
----------
states : pandas.DataFrame_
A DataFrame containing context global parameters (column names) and sets of values
thereof. The potential energy will be reported for every state these parameters define.
If one of the variables is named as `weight`, then its set of values will be assigned
to every state as an importance sampling weight. Otherwise, all states will have
identical weights. States which are supposed to only have their energies reported, with
no actual visits, can have their weights set up to `-inf`.
temperature : unit.Quantity
The system temperature.
Keyword Args
------------
reportsPerExchange : int, optional, default=1
The number of reports between attempts to exchange the global parameter state, that is,
the exchange interval measured in units of report intervals.
"""
def __init__(self, file, reportInterval, states, temperature, **kwargs):
self._parameter_states = states.copy()
self._nstates = len(states.index)
self._reports_per_exchange = kwargs.pop('reportsPerExchange', 1)
super().__init__(file, reportInterval, **kwargs)
if 'weight' in states:
self._weights = self._parameter_states.pop('weight').values
finite = np.where(np.isfinite(self._weights))[0]
self._first_state = finite[0]
self._last_state = finite[-1]
else:
self._weights = np.zeros(self._nstates)
self._first_state = 0
self._last_state = self._nstates - 1
kT = (unit.MOLAR_GAS_CONSTANT_R*temperature).value_in_unit(unit.kilojoules_per_mole)
self._beta = 1.0/kT
self._nreports = 0
self._overall_visits = np.zeros(self._nstates, dtype=int)
self._downhill_visits = np.zeros(self._nstates, dtype=int)
self._probability_accumulators = np.zeros(self._nstates)
self._downhill = False
self._counting_started = False
self._regime_change = []
def _initialize(self, simulation, state):
headers = ['step', 'state']
for index in self._parameter_states.index:
headers.append('Energy[{}] (kJ/mole)'.format(index))
print(*headers, sep=self._separator, file=self._out)
def _register_visit(self, state):
if self._downhill:
if state == self._first_state:
self._downhill = False
self._regime_change.append(self._nreports)
elif state == self._last_state:
self._downhill = True
self._regime_change.append(self._nreports)
if self._counting_started:
self._overall_visits[state] += 1
if self._downhill:
self._downhill_visits[state] += 1
else:
self._counting_started = self._downhill is True
def _generateReport(self, simulation, state):
energies = np.zeros(self._nstates)
original = dict()
for name in self._parameter_states.columns:
original[name] = simulation.context.getParameter(name)
latest = original.copy()
for i, (index, row) in enumerate(self._parameter_states.iterrows()):
for name, value in row.items():
if value != latest[name]:
simulation.context.setParameter(name, value)
latest[name] = value
energy = simulation.context.getState(getEnergy=True).getPotentialEnergy()
energies[i] = energy.value_in_unit(unit.kilojoules_per_mole)
self._nreports += 1
exponents = self._weights - self._beta*energies
probabilities = np.exp(exponents - np.amax(exponents))
probabilities /= np.sum(probabilities)
self._probability_accumulators += probabilities
if self._nreports % self._reports_per_exchange == 0:
state = np.random.choice(self._nstates, p=probabilities)
for name, value in self._parameter_states.iloc[state].items():
if value != latest[name]:
simulation.context.setParameter(name, value)
self._register_visit(state)
print(simulation.currentStep, state, *energies, sep=self._separator, file=self._out)
def _isochronal_delta(self, f, n):
N = len(f)
b = 3/(n*(n+1)*(2*n+1))
seq = np.arange(1, n+1)
a = (b/2)*np.array([n*(n+1)-k*(k-1) for k in seq])
ind = np.argsort(f)
fa = f[ind]
delta = np.empty(N)
delta[0] = -fa[0]/2 + np.sum(a*fa[1:n+1])
for i in range(1, N-1):
delta[i] = b*np.sum([k*(fa[min(i+k, N-1)] - fa[max(i-k, 0)]) for k in seq])
delta[N-1] = fa[N-1]/2 - np.sum(np.flip(a)*fa[N-n-1:N-1])
delta[ind] = delta
return delta
def read_csv(self, file, **kwargs):
comment = kwargs.pop('comment', '#')
separator = kwargs.pop('sep', self._separator)
df = pd.read_csv(file, comment=comment, sep=separator, **kwargs)
energies = np.zeros(self._nstates)
for index, row in df.iterrows():
state = int(row['state'])
for i in self._parameter_states.index:
energies[i] = row['Energy[{}] (kJ/mole)'.format(i)]
self._nreports += 1
exponents = self._weights - self._beta*energies
probabilities = np.exp(exponents - np.amax(exponents))
probabilities /= np.sum(probabilities)
self._probability_accumulators += probabilities
if self._nreports % self._reports_per_exchange == 0:
self._register_visit(state)
[docs] def state_sampling_analysis(self, staging_variable=None, to_file=True, isochronal_n=2):
"""
Build histograms of states visited during the overall process as well as during downhill
walks.
Returns
-------
pandas.DataFrame_
"""
mask = self._overall_visits > 0
frame = pd.DataFrame(self._parameter_states)[mask]
histogram = self._overall_visits[mask]
downhill_fraction = self._downhill_visits[mask]/histogram
weight = self._weights[mask]
frame['weight'] = weight
frame['histogram'] = histogram/np.sum(histogram)
frame['downhill_fraction'] = downhill_fraction
if self._counting_started:
probability = self._probability_accumulators[mask]/self._nreports
free_energy = weight - np.log(probability)
free_energy -= free_energy[0]
delta = self._isochronal_delta(downhill_fraction, isochronal_n)
isochronal_weight = weight + 0.5*np.log(delta/probability)
frame['free_energy'] = free_energy
frame['isochronal_histogram'] = np.sqrt(delta*probability)
frame['isochronal_weight'] = isochronal_weight - isochronal_weight[0]
if staging_variable is not None:
x = frame[staging_variable].values
f = downhill_fraction
n = len(x)
optimal_pdf = np.sqrt(np.diff(f)/np.diff(x)) # Stepwise optimal PDF
area = optimal_pdf*np.diff(x) # Integral in each interval
optimal_cdf = np.cumsum(area)/np.sum(area) # Piecewise linear optimal CDF
optimal_x = np.interp(np.linspace(0, 1, n), np.insert(optimal_cdf, 0, 0), x)
frame['staging_{}'.format(staging_variable)] = optimal_x
frame['staging_weight'] = np.interp(optimal_x, x, free_energy)
if to_file:
print('# {0} State Sampling Analysis {0}'.format('-'*40), file=self._out)
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
print('# ' + frame.to_string(index=False).replace('\n', '\n# '), file=self._out)
return frame
def walking_time_analysis(self, history=False, to_file=True):
times = np.diff(np.array(self._regime_change))
downhill = self._reportInterval*times[0::2]
uphill = self._reportInterval*times[1::2]
if history:
df = pd.DataFrame({'downhill': pd.Series(downhill),
'uphill': pd.Series(uphill)})
print('# {0} Walking Time History {0}'.format('-'*10), file=self._out)
print('# ' + df.to_string().replace('\n', '\n# '), file=self._out)
df = pd.DataFrame(index=['count', 'mean time'],
columns=['downhill', 'uphill'],
data=[[downhill.size, uphill.size],
[downhill.mean(), uphill.mean()]],
dtype='object')
if to_file:
print('# {0} Walking Time Analysis {0}'.format('-'*10), file=self._out)
print('# ' + df.to_string().replace('\n', '\n# '), file=self._out)
return df