Source code for cap2.pipeline.preprocessing.error_correct_reads
import luigi
import subprocess
from os.path import join, dirname, basename
from yaml import load
from shutil import rmtree
from ..config import PipelineConfig
from ..utils.conda import CondaPackage
from ..utils.cap_task import CapTask
from .map_to_human import RemoveHumanReads
[docs]class ErrorCorrectReads(CapTask):
module_description = """
This module error corrects reads using BayesHammer.
Motivation: reads can contain base errors introduced
during sequencing. Running error correction reduces
the total number of errors. This can improve the
classification rate and decrease the misclassification
rate.
Negatives: error correction can remove SNPs present in
secondary strains, as such error corrected reads should
not be used to call SNPs.
"""
MODULE_VERSION = 'v0.2.2'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pkg = CondaPackage(
package="spades",
executable="spades.py",
channel="bioconda",
config_filename=self.config_filename,
)
self.config = PipelineConfig(self.config_filename)
self.out_dir = self.config.out_dir
self.nonhuman_reads = RemoveHumanReads.from_cap_task(self)
def requires(self):
return self.pkg, self.nonhuman_reads
def tool_version(self):
return self.run_cmd(f'{self.pkg.bin} --version').stderr.decode('utf-8')
@classmethod
def dependencies(cls):
return ["spades", RemoveHumanReads]
@classmethod
def _module_name(cls):
return 'error_corrected_reads'
def output(self):
out = {
'error_corrected_reads_1': self.get_target('R1', 'fastq.gz'),
}
if self.paired:
out['error_corrected_reads_2'] = self.get_target('R2', 'fastq.gz')
return out
def _run(self):
if self.paired:
return self._run_paired()
return self._run_single()
def _run_single(self):
r1 = self.nonhuman_reads.output()['nonhuman_reads_1']
cmd = self.pkg.bin
cmd += f' --only-error-correction --meta -U {r1.path} '
outdir = f'{self.sample_name}.error_correction_out'
cmd += f' -t {self.cores} -o {outdir} '
if self.max_ram > 0:
cmd += f' -m {self.max_ram} '
self.run_cmd(cmd) # runs error correction but leaves output in a dir
config_path = f'{self.sample_name}.error_correction_out/corrected/corrected.yaml'
spades_out = load(open(config_path).read())
ec_r1 = spades_out[0]['left reads']
assert len(ec_r1) == 1
paths = self.output()['error_corrected_reads_1']
cmd = f'mv {ec_r1[0]} {paths[0].path} '
self.run_cmd(cmd)
rmtree(outdir)
def _run_paired(self):
r1 = self.nonhuman_reads.output()['nonhuman_reads_1']
r2 = self.nonhuman_reads.output()['nonhuman_reads_2']
cmd = self.pkg.bin
cmd += f' --only-error-correction --meta -1 {r1.path} -2 {r2.path}'
outdir = f'{self.sample_name}.error_correction_out'
cmd += f' -t {self.cores} -o {outdir}'
self.run_cmd(cmd) # runs error correction but leaves output in a dir
config_path = f'{self.sample_name}.error_correction_out/corrected/corrected.yaml'
spades_out = load(open(config_path).read())
ec_r1 = spades_out[0]['left reads']
assert len(ec_r1) == 1
ec_r2 = spades_out[0]['right reads']
assert len(ec_r2) == 1
paths = self.output()['error_corrected_reads_1'], self.output()['error_corrected_reads_2']
cmd = f'mv {ec_r1[0]} {paths[0].path} && mv {ec_r2[0]} {paths[1].path}'
self.run_cmd(cmd)
rmtree(outdir)