import luigi
from os.path import join, abspath, dirname
from glob import glob
import subprocess
from ..config import PipelineConfig
from ..utils.conda import CondaPackage
from ..utils.cap_task import CapDbTask
DB_DATE = '2020-06-01'
'cap2/databases/2020-06-08/taxa_kraken2/db_download_flag',
class Kraken2DBDataDown(CapDbTask):
config_filename = luigi.Parameter()
cores = luigi.IntParameter(default=1)
MODULE_VERSION = 'v0.1.0'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pkg = CondaPackage(
package="kraken2",
executable="kraken2",
channel="bioconda",
env="CAP_v2_kraken2",
config_filename=self.config_filename,
)
self.config = PipelineConfig(self.config_filename)
self.libraries = [
'archaea', 'bacteria', 'plasmid', 'viral', 'fungi', 'protozoa', 'human'
]
self.kraken_db_dir = 'taxa_kraken2'
self.download_libs = True
def tool_version(self):
return self.run_cmd(f'{self.pkg.bin} --version').stderr.decode('utf-8')
def requires(self):
return [self.pkg]
@classmethod
def _module_name(cls):
return 'kraken2_taxa_db_down'
@classmethod
def dependencies(cls):
return ['kraken2', DB_DATE]
@property
def kraken2_db(self):
return join(self.config.db_dir, self.kraken_db_dir)
def output(self):
download_flag = luigi.LocalTarget(join(self.kraken2_db, 'db_download_flag'))
download_flag.makedirs()
return {'flag': download_flag}
def run(self):
if self.config.db_mode == PipelineConfig.DB_MODE_BUILD:
if self.download_libs:
self.download_kraken2_db()
open(self.output()['flag'].path, 'w').close()
def download_kraken2_db(self):
cmd = f'{self.pkg.bin}-build --use-ftp --download-taxonomy --db {self.kraken2_db}'
self.run_cmd(cmd)
for library in self.libraries:
cmd = (
f'PATH=$PATH:{dirname(abspath(self.pkg.bin))} '
f'{self.pkg.bin}-build '
f'--use-ftp '
f'--download-library {library} '
f'--db {self.kraken2_db}'
)
self.run_cmd(cmd)
[docs]class Kraken2DB(CapDbTask):
config_filename = luigi.Parameter()
cores = luigi.IntParameter(default=1)
MODULE_VERSION = 'v0.1.0'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pkg = CondaPackage(
package="kraken2",
executable="kraken2",
channel="bioconda",
env="CAP_v2_kraken2",
config_filename=self.config_filename,
)
self.download_task = Kraken2DBDataDown(
config_filename=self.config_filename,
cores=self.cores,
)
self.config = PipelineConfig(self.config_filename)
self.db_size = 120 * (1000 ** 3) # 120 GB
def requires(self):
return self.pkg, self.download_task
@classmethod
def _module_name(cls):
return 'kraken2_taxa_db'
@classmethod
def dependencies(cls):
return ['kraken2', DB_DATE, Kraken2DBDataDown]
@property
def kraken2_db(self):
return self.download_task.kraken2_db
def output(self):
db_taxa = luigi.LocalTarget(join(self.kraken2_db, 'hash.k2d'))
db_taxa.makedirs()
return {'kraken2_db_taxa': db_taxa}
def run(self):
if self.config.db_mode == PipelineConfig.DB_MODE_BUILD:
self.build_kraken2_db()
else:
self.download_kraken2_db_from_s3()
def build_kraken2_db(self):
cmd = (
f'{self.pkg.bin}-build '
'--build '
f'--max-db-size {self.db_size} '
f'--db {self.kraken2_db}'
)
self.run_cmd(cmd)
def download_kraken2_db_from_s3(self):
paths = [
'cap2/databases/2020-06-08/taxa_kraken2/hash.k2d',
'cap2/databases/2020-06-08/taxa_kraken2/opts.k2d',
'cap2/databases/2020-06-08/taxa_kraken2/seqid2taxid.map',
'cap2/databases/2020-06-08/taxa_kraken2/taxo.k2d',
'cap2/databases/2020-06-08/taxa_kraken2/unmapped.txt',
]
for path in paths:
cmd = (
'wget '
f'--directory-prefix={dirname(self.output()["kraken2_db_taxa"].path)} '
f'https://s3.wasabisys.com/metasub-microbiome/{path} '
)
self.run_cmd(cmd)
class BrakenKraken2DB(CapDbTask):
config_filename = luigi.Parameter()
cores = luigi.IntParameter(default=1)
MODULE_VERSION = 'v0.1.0'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pkg = CondaPackage(
package="bracken==2.6.0",
executable="bracken",
channel="bioconda",
env="CAP_v2_bracken_kraken2",
config_filename=self.config_filename,
)
self.kraken2_db_task = Kraken2DB(
config_filename=self.config_filename,
cores=self.cores,
)
self.config = PipelineConfig(self.config_filename)
self.read_lengths = [75, 100, 125, 150, 175, 200, 225, 250]
def requires(self):
return self.pkg, self.kraken2_db_task
@classmethod
def _module_name(cls):
return 'bracken_kraken2_taxa_db'
@classmethod
def dependencies(cls):
return ['bracken', DB_DATE, Kraken2DB]
@property
def kraken2_db(self):
return self.kraken2_db_task.kraken2_db
def output(self):
out = {}
for rlen in self.read_lengths:
out[f'bracken_kraken2_db_{rlen}'] = luigi.LocalTarget(join(
self.kraken2_db, f'database{rlen}mers.kmer_distrib'
))
return out
def get_index(self, length):
for index_len in sorted(self.read_lengths, reverse=True):
if length > index_len:
break
# index_len is now the largest index shorter than length or the smallest
return index_len, self.output()[f'bracken_kraken2_db_{index_len}']
def run(self):
if self.config.db_mode == PipelineConfig.DB_MODE_BUILD:
for rlen in self.read_lengths:
self.build_bracken_db(rlen)
else:
self.download_bracken_db_from_s3()
def build_bracken_db(self, read_len):
cmd = (
f'PATH=${{PATH}}:{dirname(abspath(self.pkg.bin))} '
f'{self.pkg.bin}-build '
f'-d {self.kraken2_db} '
f'-t {self.cores} '
f'-k 35 '
f'-l {read_len} '
f'-x {dirname(abspath(self.kraken2_db_task.pkg.bin))}/ '
'; '
f'test -e {self.kraken2_db}/database{read_len}mers.kmer_distrib'
)
self.run_cmd(cmd)
def download_bracken_db_from_s3(self):
paths = [
'cap2/databases/2020-06-08/taxa_kraken2/database100mers.kmer_distrib',
'cap2/databases/2020-06-08/taxa_kraken2/database100mers.kraken',
'cap2/databases/2020-06-08/taxa_kraken2/database125mers.kmer_distrib',
'cap2/databases/2020-06-08/taxa_kraken2/database125mers.kraken',
'cap2/databases/2020-06-08/taxa_kraken2/database150mers.kmer_distrib',
'cap2/databases/2020-06-08/taxa_kraken2/database150mers.kraken',
'cap2/databases/2020-06-08/taxa_kraken2/database175mers.kmer_distrib',
'cap2/databases/2020-06-08/taxa_kraken2/database175mers.kraken',
'cap2/databases/2020-06-08/taxa_kraken2/database200mers.kmer_distrib',
'cap2/databases/2020-06-08/taxa_kraken2/database200mers.kraken',
'cap2/databases/2020-06-08/taxa_kraken2/database225mers.kmer_distrib',
'cap2/databases/2020-06-08/taxa_kraken2/database225mers.kraken',
'cap2/databases/2020-06-08/taxa_kraken2/database250mers.kmer_distrib',
'cap2/databases/2020-06-08/taxa_kraken2/database250mers.kraken',
'cap2/databases/2020-06-08/taxa_kraken2/database75mers.kmer_distrib',
'cap2/databases/2020-06-08/taxa_kraken2/database75mers.kraken',
]
for path in paths:
cmd = (
'wget '
f'--directory-prefix={dirname(self.output()["bracken_kraken2_db_150"].path)} '
f'https://s3.wasabisys.com/metasub-microbiome/{path} '
)
self.run_cmd(cmd)