import itertools
import numpy as np
from scipy.stats import entropy
from scipy.sparse import csc_matrix
from scipy.special import logsumexp, digamma, betaln
from .vireo_base import normalize, loglik_amplify, beta_entropy, get_binom_coeff
[docs]class BinomMixtureVB():
"""Binomial mixture model with variational inference
The prior can be set via set_prior() before fitting the model.
Key properties
--------------
beta_mu: numpy array (n_var, n_donor)
Beta mean parameter of theta's posterior
beta_sum: numpy array (n_var, n_donor)
Beta concetration parameter of theta's posterior
ID_prob: numpy array (n_cell, n_donor)
Posterior cell assignment probability to each donor
"""
[docs] def __init__(self, n_cell, n_var, n_donor, fix_beta_sum=False,
beta_mu_init=None, beta_sum_init=None, ID_prob_init=None):
"""Initialise Vireo model
Note, multiple initializations are highly recomended to avoid local
optima.
Parameters
----------
n_cell : int.
Number of cells
n_var : int.
Number of variants
n_donor : int.
Number of donors
fix_beta_sum: bool.
Whether fixing the concetration parameter of theta's posterior
beta_mu_init: numpy array (n_var, n_donor)
Initial value of beta_mu, the mean parameter of theta
beta_sum_init: numpy array (n_var, n_donor)
Initial value of beta_sum, the concetration parameter of theta
ID_prob_init: numpy array (n_cell, n_donor)
Initial value of ID_prob, cell assignment probability to each donor
"""
self.n_var = n_var
self.n_cell = n_cell
self.n_donor = n_donor
self.fix_beta_sum = fix_beta_sum
self.ID_prob_init = ID_prob_init
self.beta_mu_init = beta_mu_init
self.beta_sum_init = beta_sum_init
# set priors; you can re-set by run this function
self.set_prior()
# initial key parameters
self.set_initial(
self.beta_mu_init, self.beta_sum_init, self.ID_prob_init
)
[docs] def set_initial(self, beta_mu_init=None, beta_sum_init=None,
ID_prob_init=None):
"""Random initialization
"""
# initial key parameters
if beta_mu_init is not None:
self.beta_mu = beta_mu_init
else:
self.beta_mu = np.ones((self.n_var, self.n_donor)) * 0.5
if beta_sum_init is not None:
self.beta_sum = beta_sum_init
else:
self.beta_sum = np.ones(self.beta_mu.shape) * 30
if ID_prob_init is not None:
self.ID_prob = normalize(ID_prob_init, axis=1)
else:
self.ID_prob = normalize(np.random.rand(self.n_cell, self.n_donor))
self.ELBO_iters = np.array([])
[docs] def set_prior(self, ID_prior=None, beta_mu_prior=None,
beta_sum_prior=None):
"""Set prior for key variables: theta and ID_prob.
The priors are in the same shape as its according variables.
"""
if beta_mu_prior is None:
beta_mu_prior = np.ones((self.n_var, self.n_donor)) * 0.5
if beta_sum_prior is None:
beta_sum_prior = np.ones(beta_mu_prior.shape) * 2.0
self.theta_s1_prior = beta_mu_prior * beta_sum_prior
self.theta_s2_prior = (1 - beta_mu_prior) * beta_sum_prior
if ID_prior is not None:
if len(ID_prior.shape) == 1:
ID_prior = np.expand_dims(ID_prior, axis=0)
self.ID_prior = ID_prior
else:
self.ID_prior = normalize(np.ones((self.n_cell, self.n_donor)))
@property
def theta_s1(self):
"""Beta concetration1 parameter for theta posterior"""
return self.beta_mu * self.beta_sum
@property
def theta_s2(self):
"""Beta concetration2 parameter for theta posterior"""
return (1 - self.beta_mu) * self.beta_sum
def get_E_logLik(self, AD, DP):
"""Get the expecation of logLikelihood
E_theta [P(AD|DP, theta, Z)]
"""
BD = DP - AD
# shape: (n_cell, n_donor)
_E_logLik_mat = (
AD.T @ digamma(self.theta_s1) +
BD.T @ digamma(self.theta_s2) -
DP.T @ digamma(self.theta_s1 + self.theta_s2)
)
return _E_logLik_mat
def update_theta_size(self, AD, DP):
"""Coordinate ascent for updating theta posterior parameters
"""
BD = DP - AD
_theta_s1 = AD @ self.ID_prob #(n_var, n_donor)
_theta_s2 = BD @ self.ID_prob #(n_var, n_donor)
_theta_s1 += self.theta_s1_prior
_theta_s2 += self.theta_s2_prior
self.beta_mu = _theta_s1 / (_theta_s1 + _theta_s2)
if self.fix_beta_sum == False:
self.beta_sum = _theta_s1 + _theta_s2
def update_ID_prob(self, AD=None, DP=None, logLik_ID=None):
"""Coordinate ascent for updating assignment probability
"""
if logLik_ID is None:
logLik_ID = self.get_E_logLik(AD, DP)
self.ID_prob = normalize(np.exp(loglik_amplify(
logLik_ID + np.log(self.ID_prior))))
def get_ELBO(self, AD=None, DP=None, logLik_ID=None):
"""Calculating variational evidence lower bound with current parameters
logLik_ID: numpy array (n_cell, n_donor), the output from update_ID_prob
"""
if logLik_ID is None:
self.get_E_logLik(AD, DP)
LB_p = np.sum(logLik_ID * self.ID_prob)
KL_ID = np.sum(entropy(self.ID_prob, self.ID_prior, axis=-1))
KL_theta = beta_entropy(
np.append(
np.expand_dims(self.theta_s1, 1),
np.expand_dims(self.theta_s2, 1), axis = 1),
np.append(
np.expand_dims(self.theta_s1_prior, 1),
np.expand_dims(self.theta_s2_prior, 1), axis = 1))
return LB_p - KL_ID - KL_theta
def _fit_BV(self, AD, DP, max_iter=200, min_iter=20, epsilon_conv=1e-2,
verbose=True):
"""Fit Vireo model with coordinate ascent
"""
ELBO = np.zeros(max_iter)
for it in range(max_iter):
self.update_theta_size(AD, DP)
_logLik_ID = self.get_E_logLik(AD, DP)
self.update_ID_prob(logLik_ID = _logLik_ID)
ELBO[it] = self.get_ELBO(logLik_ID = _logLik_ID)
if it > min_iter:
if ELBO[it] - ELBO[it - 1] < -1e-6:
if verbose:
print("Warning: ELBO decreases %.8f to %.8f!\n"
%(ELBO[it - 1], ELBO[it]))
elif it == max_iter - 1:
if verbose:
print("Warning: VB did not converge!\n")
elif ELBO[it] - ELBO[it - 1] < epsilon_conv:
break
self.ELBO_iters = np.append(self.ELBO_iters, ELBO[:it])
[docs] def fit(self, AD, DP, n_init=10, max_iter=200, max_iter_pre=100,
random_seed=None, **kwargs):
"""Fit VB with multiple initializations
Parameters
----------
AD : scipy.sparse.csc_matrix (n_var, n_cell)
Sparse count matrix for alternative allele
DP : scipy.sparse.csc_matrix (n_var, n_cell)
Sparse count matrix for depths, alternative + refeerence alleles
n_inits : int
Number of random initialisations to use
max_iter : int
Maximum number of iterations for _fit_BV() in best initial
max_iter_pre : int
Maximum number of iterations for _fit_BV() in multiple initials
min_iter :
Minimum number of iterations for _fit_BV()
epsilon_conv : float
Threshold for detecting convergence for _fit_BV()
verbose : bool
Whether print out log info for _fit_BV()
random_seed : None or int
Random seed in numpy.random for multiple initializations
"""
if random_seed is not None:
np.random.seed(random_seed)
if type(DP) is np.ndarray and np.mean(DP > 0) < 0.3:
print("Warning: input matrices is %.1f%% sparse, "
%(100 - np.mean(DP > 0) * 100) +
"change to scipy.sparse.csc_matrix" )
AD = csc_matrix(AD)
DP = csc_matrix(DP)
_binom_coeff = np.sum(get_binom_coeff(AD, DP, is_log = True))
self.ELBO_inits = []
for i in range(n_init):
self.set_initial(
self.beta_mu_init, self.beta_sum_init, self.ID_prob_init
)
self._fit_BV(AD, DP, max_iter=max_iter_pre, **kwargs)
self.ELBO_inits.append(self.ELBO_iters[-1])
## first or better initialization
if i == 0 or (self.ELBO_iters[-1] > np.max(self.ELBO_inits[:-1])):
_ID_prob_best = self.ID_prob + 0
_beta_mu_best = self.beta_mu + 0
_beta_sum_best = self.beta_sum + 0
_ELBO_iters_best = self.ELBO_iters + 0
## Re-fit with best parameters
self.set_initial(_beta_mu_best, _beta_sum_best, _ID_prob_best)
self.ELBO_iters = _ELBO_iters_best
self._fit_BV(AD, DP, max_iter=max_iter, **kwargs)
## add binomial coefficient constants
self.ELBO_iters = self.ELBO_iters + _binom_coeff
self.ELBO_inits = np.array(self.ELBO_inits) + _binom_coeff