Source code for cap2.capalyzer.table_builder.parsers

import pandas as pd
import logging

logger = logging.getLogger(__name__)  # Same name as calling module


[docs]def parse_pileup(local_path, sparse=1): """Return a pandas dataframe with info from a pileup file. `sparse` is an int >= 1 if `sparse` is > 1 values will be averaged making the table more smaller. """ compression = 'gzip' tbl = pd.read_csv( local_path, sep='\t', names=['seq', 'pos', 'ref_base', 'read_count', 'read_results', 'quality'], compression=compression, ) if sparse > 1: tbl = tbl.set_index(['seq', 'pos']).rolling(sparse, center=True).mean() tbl = tbl.dropna() tbl = tbl.reset_index() tbl = tbl.query('pos % @sparse == 0') return tbl
[docs]def parse_taxa_report(local_path, **kwargs): try: return _parse_taxa_report(local_path, **kwargs) except Exception: logger.debug(f'[ParseTaxaReport] failed to parse {local_path}') raise
def _parse_taxa_report(local_path, **kwargs): """Return a dict of taxa_name to read_counts.""" out, abundance_sum = {}, 0 with open(local_path) as taxa_file: for line_num, line in enumerate(taxa_file): line = line.strip() tkns = line.split('\t') if not line or len(tkns) < 2: continue if len(tkns) == 2: taxon = tkns[0] taxon = taxon.split('|')[-1] abundance = float(tkns[1]) elif len(tkns) == 6: taxon = tkns[5].strip() taxon_rank = tkns[3].strip().lower() if len(taxon_rank) > 1: continue taxon = f'{taxon_rank}__{taxon}' abundance = float(tkns[1]) else: if line_num == 0: continue taxon = tkns[1] abundance = float(tkns[3]) if (not kwargs.get('species_only', False)) or ('s__' in taxon): out[taxon] = abundance abundance_sum += abundance if kwargs.get('normalize', False): out = {k: v / abundance_sum for k, v in out.items()} if kwargs.get('minimum_abundance', 0): out = {k: v for k, v in out.items() if v >= kwargs['minimum_abundance']} return out