Source code for vireoSNP.utils.vireo_bulk

# Identification of donor abundance in bulk sample
import numpy as np

__docformat__ = "restructuredtext en"

__all__ = ['VireoBulk']

[docs] class VireoBulk(): """ Estimate of donor abundance in a multipexed bulk sample Varibale to infer ----------------- psi: numpy.array (n_donor, ) The fractional abundance of each donor in the mixture theta: numpy.array (n_GT, ) The alternative allele rate in each genotype category Parameters ---------- n_GT: int, number of genotype categories n_donor: int, number of donors in the mixture """
[docs] def __init__(self, n_donor, n_GT=3, psi_init=None, theta_init=[0.01, 0.5, 0.99]): self.n_GT = n_GT self.n_donor = n_donor self.psi = np.random.dirichlet([1] * n_donor) self.theta = np.random.rand(n_GT) if psi_init is not None: if n_donor != len(psi_init): print("Warning: n_donor != len(psi_init)") else: self.psi = np.random.dirichlet([1] * n_donor) if theta_init is not None: if n_GT != len(theta_init): print("Warning: n_GT != len(theta_init)") else: self.theta = theta_init
[docs] def fit(self, AD, DP, GT_prob, max_iter=200, min_iter=5, epsilon_conv=1e-3, learn_theta=True, delay_fit_theta=0, model="EM", verbose=False): """Fit the unknown variable psi and theta with EM algorithm Parameters ---------- AD: numpy.array, (n_variant, ), int The count vector for alternative allele in all variants DP: numpy.array (n_variant, ), int The count vector for depths in all variants (i.e., two alleles) GT_prob: numpy.array, (n_variants, n_donor, n_GT) The probability tensor for each genotype in each donor learn_theta: bool Whether learn theta, otherwise use theta_init delay_fit_theta: int The number of steps to delay in updating theta max_iter : int Maximum number of iterations min_iter : Minimum number of iterations epsilon_conv : float Threshold for detecting convergence model: string The algorithm used to fit the model. Only "EM" is supported for Expectation-Maximumization algorithm verbose : bool Whether print out log info """ BD = DP - AD logLik = np.zeros(max_iter) for it in range(max_iter): # E step: expectation of count assignment probability theta_mat = np.tensordot(GT_prob, self.theta, axes=(2, 0)) #np.dot Z1 = theta_mat * np.expand_dims(self.psi, 0) Z1 = Z1 / np.sum(Z1, axis=1, keepdims=True) Z0 = (1 - theta_mat) * np.expand_dims(self.psi, 0) Z0 = Z0 / np.sum(Z0, axis=1, keepdims=True) # M step: maximize logLikehood over psi and theta psi_raw = np.dot(AD, Z1) + np.dot(BD, Z0) self.psi = psi_raw / np.sum(psi_raw) if learn_theta and it >= delay_fit_theta: theta_s1 = np.dot(AD, np.sum(GT_prob * np.expand_dims(Z1, 2), axis = 1)) theta_s2 = np.dot(BD, np.sum(GT_prob * np.expand_dims(Z0, 2), axis = 1)) self.theta = theta_s1 / (theta_s1 + theta_s2) # Likelihood and check convergence theta_vct = np.dot(np.dot(GT_prob, self.theta), self.psi) logLik[it] = np.sum( AD * np.log(theta_vct) + BD * np.log(1 - theta_vct)) if it > min_iter: if logLik[it] < logLik[it - 1]: if verbose: print("Warning: logLikelihood decreases!\n") elif it == max_iter - 1: if verbose: print("Warning: VB did not converge!\n") elif logLik[it] - logLik[it - 1] < epsilon_conv: break self.logLik = logLik[it] self.logLik_all = logLik[:it]
[docs] def LR_test(self, **kwargs): """Likelihood ratio test for psi vector in a null hypothesis. Use **kwargs for psi_null, AD, DP, GT_prob, log in vireoSNP.LikRatio_test() function. Note, AD, DP, GT_prob the same as the self.fit() function. """ return LikRatio_test(psi=self.psi, theta=self.theta, **kwargs)
[docs] def LikRatio_test(psi, psi_null, AD, DP, GT_prob, theta, log=False): """Likelihood ratio test for psi vector in a null hypothesis. Please use the same AD, DP, and GT_prob as the fit() function. Parameters ---------- psi: numpy.array (n_donor, ) The fractional abundance of each donor in the mixture for alternative hypothesis psi_null: numpy.array (n_donor, ) The psi vector in a null hypothesis AD: numpy.array, (n_variant, ), int The count vector for alternative allele in all variants DP: numpy.array (n_variant, ), int The count vector for depths in all variants (i.e., two alleles) GT_prob: numpy.array, (n_variants, n_donor, n_GT) The probability tensor for each genotype in each donor theta: numpy.array (n_GT, ) The alternative allele rate in each genotype category log: bool If return p value in logarithm scale Return ------ statistic: float The calculated chi2-statistic. pvalue: float The single-tailed p-value. """ from scipy.stats import chi2 BD = DP - AD theta_vct_alt = np.dot(np.dot(GT_prob, theta), psi) logLik_alt = np.sum( AD * np.log(theta_vct_alt) + BD * np.log(1 - theta_vct_alt)) theta_vct_null = np.dot(np.dot(GT_prob, theta), psi_null) logLik_null = np.sum( AD * np.log(theta_vct_null) + BD * np.log(1 - theta_vct_null)) LR = 2 * (logLik_alt - logLik_null) df = len(psi_null) - 1 if log: pval = chi2.logsf(LR, df) else: pval = chi2.sf(LR, df) return LR, pval