Source code for sccross.data

r"""
Auxiliary functions for :class:`anndata.AnnData` objects
that are not covered in :mod:`scanpy`.
"""

import os
from collections import defaultdict
from itertools import chain
from typing import Callable, List, Mapping, Optional

import anndata
import networkx as nx
import numpy as np
import pandas as pd
import scanpy as sc
import episcanpy as epi
import scipy.sparse
import scipy.stats
import sklearn.cluster
import sklearn.decomposition
import sklearn.feature_extraction.text
import sklearn.linear_model
import sklearn.neighbors
import sklearn.utils.extmath
from anndata import AnnData
from networkx.algorithms.bipartite import biadjacency_matrix
from sklearn.preprocessing import normalize


from . import utils
from .utils import logged, smart_tqdm, Kws


[docs]def lsi( adata: AnnData, n_components: int = 20, use_highly_variable: Optional[bool] = None, **kwargs ) -> None: r""" LSI analysis (following the Seurat v3 approach) Parameters ---------- adata Input dataset n_components Number of dimensions to use use_highly_variable Whether to use highly variable features only, stored in ``adata.var['highly_variable']``. By default uses them if they have been determined beforehand. **kwargs Additional keyword arguments are passed to :func:`sklearn.utils.extmath.randomized_svd` """ if use_highly_variable is None: use_highly_variable = "highly_variable" in adata.var adata_use = adata[:, adata.var["highly_variable"]] if use_highly_variable else adata X = utils.tfidf(adata_use.X) X_norm = normalize(X, norm="l1") X_norm = np.log1p(X_norm * 1e4) X_lsi = sklearn.utils.extmath.randomized_svd(X_norm, n_components, **kwargs)[0] X_lsi -= X_lsi.mean(axis=1, keepdims=True) X_lsi /= X_lsi.std(axis=1, ddof=1, keepdims=True) adata.obsm["X_lsi"] = X_lsi
[docs]def mnn_prior( adatas: [AnnData] ) -> None: r""" MNN prior generation Parameters ---------- adatas Input dataset """ adatas_modify = adatas.copy() for i in range(len(adatas_modify)): if adatas_modify[i].obs['domain'][0] == 'scATAC-seq': adatas_modify[i] = adatas_modify[i].uns['gene'] common_genes = set() for i in range(len(adatas_modify)): adata_i = adatas_modify[i].var.index.values atac_i_1 = [] for g in adata_i: g = g.split('-')[0] atac_i_1.append(g) adatas_modify[i].var.index = atac_i_1 adatas_modify[i].var_names_make_unique() if len(common_genes) == 0: common_genes = set(adatas_modify[i].var.index.values) else: common_genes &= set(adatas_modify[i].var.index.values) for i in range(len(adatas)): adatas_modify[i] = adatas_modify[i][:, list(common_genes)] adatas_mnn = sc.external.pp.mnn_correct(*adatas_modify, k=20) adatas_mnn = adatas_mnn[0] sc.tl.pca(adatas_mnn, n_comps=50) for i in range(len(adatas)): adata_temp = adatas_mnn[adatas_mnn.obs['batch'].isin([str(i)])] if adatas[i].obs['domain'][0] == 'scATAC-seq': adatas[i].obsm['X_lsi'] = np.concatenate((adatas[i].obsm['X_lsi'], adata_temp.obsm['X_pca']), axis=1) else: adatas[i].obsm['X_pca'] = np.concatenate((adatas[i].obsm['X_pca'], adata_temp.obsm['X_pca']), axis=1)
[docs]def geneActivity( adata: AnnData,gtf_file = './reference/gencode.vM30.annotation.gtf', upstream=2000, feature_type='transcript', annotation='HAVANA', raw=False ) -> AnnData: r""" Gene activity score calculation Parameters ---------- adata Input dataset gtf_file GTF reference path upstream upstream finding feature_type feature type annotation annotation type raw raw data or not Returns ------- AnnData Gene activity score data with shape :math:`n_{cell} \times n_{gene}` """ geneAct = epi.tl.geneactivity(adata, gtf_file=gtf_file, key_added='gene', upstream=upstream, feature_type=feature_type, annotation=annotation, raw=raw) sc.pp.normalize_total(geneAct) sc.pp.log1p(geneAct) sc.pp.scale(geneAct) return geneAct
[docs]def aggregate_obs( adata: AnnData, by: str, X_agg: Optional[str] = "sum", obs_agg: Optional[Mapping[str, str]] = None, obsm_agg: Optional[Mapping[str, str]] = None, layers_agg: Optional[Mapping[str, str]] = None ) -> AnnData: r""" Aggregate obs in a given dataset by certain categories Parameters ---------- adata Dataset to be aggregated by Specify a column in ``adata.obs`` used for aggregation, must be discrete. X_agg Aggregation function for ``adata.X``, must be one of ``{"sum", "mean", ``None``}``. Setting to ``None`` discards the ``adata.X`` matrix. obs_agg Aggregation methods for ``adata.obs``, indexed by obs columns, must be one of ``{"sum", "mean", "majority"}``, where ``"sum"`` and ``"mean"`` are for continuous data, and ``"majority"`` is for discrete data. Fields not specified will be discarded. obsm_agg Aggregation methods for ``adata.obsm``, indexed by obsm keys, must be one of ``{"sum", "mean"}``. Fields not specified will be discarded. layers_agg Aggregation methods for ``adata.layers``, indexed by layer keys, must be one of ``{"sum", "mean"}``. Fields not specified will be discarded. Returns ------- aggregated Aggregated dataset """ obs_agg = obs_agg or {} obsm_agg = obsm_agg or {} layers_agg = layers_agg or {} by = adata.obs[by] agg_idx = pd.Index(by.cat.categories) \ if pd.api.types.is_categorical_dtype(by) \ else pd.Index(np.unique(by)) agg_sum = scipy.sparse.coo_matrix(( np.ones(adata.shape[0]), ( agg_idx.get_indexer(by), np.arange(adata.shape[0]) ) )).tocsr() agg_mean = agg_sum.multiply(1 / agg_sum.sum(axis=1)) agg_method = { "sum": lambda x: agg_sum @ x, "mean": lambda x: agg_mean @ x, "majority": lambda x: pd.crosstab(by, x).idxmax(axis=1).loc[agg_idx].to_numpy() } X = agg_method[X_agg](adata.X) if X_agg and adata.X is not None else None obs = pd.DataFrame({ k: agg_method[v](adata.obs[k]) for k, v in obs_agg.items() }, index=agg_idx.astype(str)) obsm = { k: agg_method[v](adata.obsm[k]) for k, v in obsm_agg.items() } layers = { k: agg_method[v](adata.layers[k]) for k, v in layers_agg.items() } for c in obs: if pd.api.types.is_categorical_dtype(adata.obs[c]): obs[c] = pd.Categorical(obs[c], categories=adata.obs[c].cat.categories) return AnnData( X=X, obs=obs, var=adata.var, obsm=obsm, varm=adata.varm, layers=layers )
[docs]def extract_rank_genes_groups( adata: AnnData, groups: Optional[List[str]] = None, filter_by: str = "pvals_adj < 0.01", sort_by: str = "scores", ascending: str = False ) -> pd.DataFrame: r""" Extract result of :func:`scanpy.tl.rank_genes_groups` in the form of marker gene data frame for specific cell groups Parameters ---------- adata Input dataset groups Target groups for which markers should be extracted, by default extract all groups. filter_by Marker filtering criteria (passed to :meth:`pandas.DataFrame.query`) sort_by Column used for sorting markers ascending Whether to sort in ascending order Returns ------- marker_df Extracted marker data frame Note ---- Markers shared by multiple groups will be assign to the group with highest score. """ if "rank_genes_groups" not in adata.uns: raise ValueError("Please call `sc.tl.rank_genes_groups` first!") if groups is None: groups = adata.uns["rank_genes_groups"][sort_by].dtype.names df = pd.concat([ pd.DataFrame({ k: np.asarray(v[g]) for k, v in adata.uns["rank_genes_groups"].items() if k != "params" }).assign(group=g) for g in groups ]) df["group"] = pd.Categorical(df["group"], categories=groups) df = df.sort_values( sort_by, ascending=ascending ).drop_duplicates( subset=["names"], keep="first" ).sort_values( ["group", sort_by], ascending=[True, ascending] ).query(filter_by) df = df.reset_index(drop=True) return df
[docs]@logged def get_metacells( *adatas: AnnData, use_rep: str = None, n_meta: int = None, common: bool = True, seed: int = 0, agg_kwargs: Optional[List[Kws]] = None ) -> List[AnnData]: r""" Aggregate datasets into metacells Parameters ---------- *adatas Datasets to be correlated use_rep Data representation based on which to cluster meta-cells n_meta Number of metacells to use common Whether to return only metacells common to all datasets seed Random seed for k-Means clustering agg_kwargs Keyword arguments per dataset passed to :func:`aggregate_obs` Returns ------- adatas A list of AnnData objects containing the metacells Note ---- When a single dataset is provided, the metacells are clustered with the dataset itself. When multiple datasets are provided, the metacells are clustered jointly with all datasets. """ if use_rep is None: raise ValueError("Missing required argument `use_rep`!") if n_meta is None: raise ValueError("Missing required argument `n_meta`!") adatas = [ AnnData( X=adata.X, obs=adata.obs.set_index(adata.obs_names + f"-{i}"), var=adata.var, obsm=adata.obsm, varm=adata.varm, layers=adata.layers ) for i, adata in enumerate(adatas) ] # Avoid unwanted updates to the input objects get_metacells.logger.info("Clustering metacells...") combined = anndata.concat(adatas) try: import faiss kmeans = faiss.Kmeans( combined.obsm[use_rep].shape[1], n_meta, gpu=False, seed=seed ) kmeans.train(combined.obsm[use_rep]) _, combined.obs["metacell"] = kmeans.index.search(combined.obsm[use_rep], 1) except ImportError: get_metacells.logger.warning( "`faiss` is not installed, using `sklearn` instead... " "This might be slow with a large number of cells. " "Consider installing `faiss` following the guide from " "https://github.com/facebookresearch/faiss/blob/main/INSTALL.md" ) kmeans = sklearn.cluster.KMeans(n_clusters=n_meta, random_state=seed) combined.obs["metacell"] = kmeans.fit_predict(combined.obsm[use_rep]) for adata in adatas: adata.obs["metacell"] = combined[adata.obs_names].obs["metacell"] get_metacells.logger.info("Aggregating metacells...") agg_kwargs = agg_kwargs or [{}] * len(adatas) if not len(agg_kwargs) == len(adatas): raise ValueError("Length of `agg_kwargs` must match the number of datasets!") adatas = [ aggregate_obs(adata, "metacell", **kwargs) for adata, kwargs in zip(adatas, agg_kwargs) ] if common: common_metacells = list(set.intersection(*( set(adata.obs_names) for adata in adatas ))) if len(common_metacells) == 0: raise RuntimeError("No common metacells found!") return [adata[common_metacells].copy() for adata in adatas] return adatas
def _metacell_corr( *adatas: AnnData, skeleton: nx.Graph = None, method: str = "spr" ) -> nx.Graph: if skeleton is None: raise ValueError("Missing required argument `skeleton`!") for adata in adatas: sc.pp.normalize_total(adata) if set.intersection(*(set(adata.var_names) for adata in adatas)): raise ValueError("Overlapping features are currently not supported!") adata = anndata.concat(adatas, axis=1) edgelist = nx.to_pandas_edgelist(skeleton) source = adata.var_names.get_indexer(edgelist["source"]) target = adata.var_names.get_indexer(edgelist["target"]) if method == "pcc": sc.pp.log1p(adata) X = utils.densify(adata.X.T) elif method == "spr": X = utils.densify(adata.X.T) X = np.array([scipy.stats.rankdata(x) for x in X]) else: raise ValueError(f"Unrecognized method: {method}!") mean = X.mean(axis=1) meansq = np.square(X).mean(axis=1) std = np.sqrt(meansq - np.square(mean)) edgelist["corr"] = np.array([ ((X[s] * X[t]).mean() - mean[s] * mean[t]) / (std[s] * std[t]) for s, t in zip(source, target) ]) return nx.from_pandas_edgelist(edgelist, edge_attr=True, create_using=type(skeleton))
[docs]@logged def metacell_corr( *adatas: AnnData, use_rep: str = None, n_meta: int = None, skeleton: nx.Graph = None, method: str = "spr" ) -> nx.Graph: r""" Metacell based correlation Parameters ---------- *adatas Datasets to be correlated, where ``.X`` are raw counts (indexed by domain name) use_rep Data representation based on which to cluster meta-cells n_meta Number of metacells to use skeleton Skeleton graph determining which pair of features to correlate method Correlation method, must be one of {"pcc", "spr"} Returns ------- corr A skeleton-based graph containing correlation as edge attribute "corr" """ for adata in adatas: if not utils.all_counts(adata.X): raise ValueError("``.X`` must contain raw counts!") adatas = get_metacells(*adatas, use_rep=use_rep, n_meta=n_meta, common=True) metacell_corr.logger.info( "Computing correlation on %d common metacells...", adatas[0].shape[0] ) return _metacell_corr(*adatas, skeleton=skeleton, method=method)
def _metacell_regr( *adatas: AnnData, skeleton: nx.DiGraph = None, model: str = "Lasso", **kwargs ) -> nx.DiGraph: if skeleton is None: raise ValueError("Missing required argument `skeleton`!") for adata in adatas: sc.pp.normalize_total(adata) sc.pp.log1p(adata) if set.intersection(*(set(adata.var_names) for adata in adatas)): raise ValueError("Overlapping features are currently not supported!") adata = anndata.concat(adatas, axis=1) targets = [node for node, in_degree in skeleton.in_degree() if in_degree] biadj = biadjacency_matrix( skeleton, adata.var_names, targets, weight=None ).astype(bool).T.tocsr() X = utils.densify(adata.X) Y = utils.densify(adata[:, targets].X.T) coef = [] model = getattr(sklearn.linear_model, model) for target, y, mask in smart_tqdm(zip(targets, Y, biadj), total=len(targets)): X_ = X[:, mask.indices] lm = model(**kwargs).fit(X_, y) coef.append(pd.DataFrame({ "source": adata.var_names[mask.indices], "target": target, "regr": lm.coef_ })) coef = pd.concat(coef) return nx.from_pandas_edgelist(coef, edge_attr=True, create_using=type(skeleton))
[docs]@logged def metacell_regr( *adatas: AnnData, use_rep: str = None, n_meta: int = None, skeleton: nx.DiGraph = None, model: str = "Lasso", **kwargs ) -> nx.DiGraph: r""" Metacell-based regression Parameters ---------- *adatas Datasets to be correlated, where ``.X`` are raw counts (indexed by domain name) use_rep Data representation based on which to cluster meta-cells n_meta Number of metacells to use skeleton Skeleton graph determining which pair of features to correlate model Regression model (should be a class name under :mod:`sklearn.linear_model`) **kwargs Additional keyword arguments are passed to the regression model Returns ------- regr A skeleton-based graph containing regression weights as edge attribute "regr" """ for adata in adatas: if not utils.all_counts(adata.X): raise ValueError("``.X`` must contain raw counts!") adatas = get_metacells(*adatas, use_rep=use_rep, n_meta=n_meta, common=True) metacell_regr.logger.info( "Computing regression on %d common metacells...", adatas[0].shape[0] ) return _metacell_regr(*adatas, skeleton=skeleton, model=model, **kwargs)