Skip to content

Commit

Permalink
Only exchange cov matrix and DE buffer when needed
Browse files Browse the repository at this point in the history
Prior commits guarantee that chains can only be executing a single step in parallel. That being the case, only exchange messages for this data on the prescribed step frequency, instead of on each step.  This reduced communication overhead should be a performance boost at scale.
  • Loading branch information
chimaerase committed Oct 12, 2022
1 parent c3fa0f0 commit 4eac542
Showing 1 changed file with 39 additions and 34 deletions.
73 changes: 39 additions & 34 deletions PTMCMCSampler/PTMCMCSampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,50 +487,55 @@ def PTMCMCOneStep(self, p0, lnlike0, lnprob0, iter):
@return lnprob0: next value of posterior after one MCMC step
"""
# update covariance matrix
cov = None
if self.MPIrank == 0:
if (iter - 1) % self.covUpdate == 0 and (iter - 1) != 0:
# Update covariance matrix
# Synchronous checks of runComplete in sample() guarantee that chains
# are executing the same step in parallel.
cov_update_step = (iter - 1) % self.covUpdate == 0 and (iter - 1) != 0
if cov_update_step:
cov = None
if self.MPIrank == 0:
self._updateRecursive(iter - 1, self.covUpdate)
cov = self.cov
cov = self.comm.bcast(cov, root=0)

if self.MPIrank != 0 and cov is not None:
self.cov[:, :] = cov
for ct, group in enumerate(self.groups):
covgroup = np.zeros((len(group), len(group)))
for ii in range(len(group)):
for jj in range(len(group)):
covgroup[ii, jj] = self.cov[group[ii], group[jj]]
cov = self.comm.bcast(cov, root=0)

self.U[ct], self.S[ct], v = np.linalg.svd(covgroup)
if self.MPIrank != 0 and cov is not None:
self.cov[:, :] = cov
for ct, group in enumerate(self.groups):
covgroup = np.zeros((len(group), len(group)))
for ii in range(len(group)):
for jj in range(len(group)):
covgroup[ii, jj] = self.cov[group[ii], group[jj]]
self.U[ct], self.S[ct], v = np.linalg.svd(covgroup)

# update DE buffer
buffer = None
if self.MPIrank == 0:
if (iter - 1) % self.burn == 0 and (iter - 1) != 0:
# Synchronous checks of runComplete in sample() guarantee that chains
# are executing the same step in parallel.
burn_step = (iter - 1) % self.burn == 0 and (iter - 1) != 0
if burn_step:
buffer = None
if self.MPIrank == 0:
self._updateDEbuffer(iter - 1, self.burn)
buffer = self._DEbuffer

# broadcast to other chains
buffer = self.comm.bcast(buffer, root=0)
# broadcast to other chains
buffer = self.comm.bcast(buffer, root=0)

if self.MPIrank > 0 and buffer is not None:
self._DEbuffer = buffer
if self.MPIrank > 0:
self._DEbuffer = buffer

# randomize cycle
if self.DEJump not in self.propCycle:
self.addProposalToCycle(self.DEJump, self.DEweight)
self.randomizeProposalCycle()
# randomize cycle
if self.DEJump not in self.propCycle:
self.addProposalToCycle(self.DEJump, self.DEweight)
self.randomizeProposalCycle()

# after burn in, add DE jumps;
if self.MPIrank == 0 and (iter - 1) == self.burn:
if self.verbose:
print("Adding DE jump with weight {0}".format(self.DEweight))
self.addProposalToCycle(self.DEJump, self.DEweight)
# after burn in, add DE jumps;
if self.MPIrank == 0 and (iter - 1) == self.burn:
if self.verbose:
print("Adding DE jump with weight {0}".format(self.DEweight))
self.addProposalToCycle(self.DEJump, self.DEweight)

# randomize cycle
self.randomizeProposalCycle()
# randomize cycle
self.randomizeProposalCycle()

# jump proposal ###

Expand Down Expand Up @@ -602,8 +607,8 @@ def PTswap(self, p0, lnlike0, lnprob0, iter):
swapProposed = (iter % self.Tskip) == 0 and iter != 0

# Only exchange swap messages when a swap will be proposed.
# Synchronous checks for runComplete guarantee that chains
# are executing the same step in parallel,
# Synchronous checks of runComplete in sample() guarantee that chains
# are executing the same step in parallel.
if swapProposed:
if self.MPIrank < self.nchain - 1:
hotter_chain = self.MPIrank + 1
Expand Down

0 comments on commit 4eac542

Please sign in to comment.