#!/usr/bin/env python3
__author__ = "Darrin Lemmer"
__version__ = "1.0.0"
__email__ = "dlemmer@tgen.org"
'''
Created on Mar 4, 2014
@author: dlemmer
'''
import logging
def _parse_args():
import argparse
parser = argparse.ArgumentParser(description="Meant to be called from the pipeline automatically.")
parser.add_argument("--config", required=True, help="Path to the configuration xml file.")
return parser.parse_args()
def _submit_job(job_submitter, command, job_parms, waitfor_id=None, hold=False, notify=False):
import subprocess
import re
import os
# TODO(jtravis): remove unused output variable
output = jobid = None
logging.info("command = %s" % command)
if job_submitter == "PBS":
waitfor = ""
if waitfor_id:
dependency_string = waitfor_id[1] if len(waitfor_id) > 1 else 'afterok'
waitfor = "-W depend=%s:%s" % (dependency_string, waitfor_id[0])
queue = ""
if job_parms["queue"]:
queue = "-q %s" % job_parms["queue"]
args = job_parms["args"]
if hold:
args += " -h"
if notify:
args += " -m e"
submit_command = "qsub -V -d \'%s\' -w \'%s\' -l ncpus=%s,mem=%sgb,walltime=%s:00:00 -m a -N \'%s\' %s %s %s" % (
job_parms["work_dir"], job_parms["work_dir"], job_parms['num_cpus'], job_parms['mem_requested'],
job_parms['walltime'], job_parms['name'], waitfor, queue, args)
logging.debug("submit_command = %s", submit_command)
output = subprocess.getoutput("echo \"%s\" | %s - " % (command, submit_command))
logging.debug("output = %s" % output)
job_match = re.search('^(\d+)\..*$', output)
if job_match:
jobid = job_match.group(1)
else:
logging.warning("Job not submitted!!")
print("WARNING: Job not submitted: %s" % output)
elif job_submitter == "SLURM":
waitfor = ""
if waitfor_id:
dependency_string = waitfor_id[1] if len(waitfor_id) > 1 else 'afterok'
waitfor = "-d %s:%s" % (dependency_string, waitfor_id[0])
queue = ""
if job_parms["queue"]:
queue = "-p %s" % job_parms["queue"]
args = job_parms["args"]
if hold:
args += " -H"
if notify:
args += " --mail-type=END"
submit_command = "sbatch -D \'%s\' -c%s --mem=%s000 --time=%s:00:00 --mail-type=FAIL -J \'%s\' %s %s %s" % (
job_parms["work_dir"], job_parms['num_cpus'], job_parms['mem_requested'], job_parms['walltime'],
job_parms['name'], waitfor, queue, args)
logging.debug("submit_command = %s" % submit_command)
output = subprocess.getoutput("%s --wrap=\"%s\"" % (submit_command, command))
logging.debug("output = %s" % output)
job_match = re.search('^Submitted batch job (\d+)$', output)
if job_match:
jobid = job_match.group(1)
else:
logging.warning("Job not submitted!!")
print("WARNING: Job not submitted: %s" % output)
elif job_submitter == "SGE":
waitfor = ""
if waitfor_id:
waitfor = "-hold_jid %s" % (re.sub(":", ",", waitfor_id[0]))
queue = ""
if job_parms["queue"]:
queue = "-q %s" % job_parms["queue"]
args = job_parms["args"]
if hold:
args += " -h"
if notify:
args += " -m e"
mem_needed = float(job_parms['mem_requested']) * 1024 * 1024
# Apparently the number of processors a job uses is controlled by the queue it is running on in SGE, so there is no way to request a specific number of CPUs??
submit_command = "qsub -V -wd \'%s\' -l h_data=%s,h_rt=%s:00:00 -m a -N \'%s\' %s %s %s" % (
job_parms["work_dir"], mem_needed, job_parms['walltime'], job_parms['name'], waitfor,
queue, args)
logging.debug("submit_command = %s", submit_command)
output = subprocess.getoutput("echo \"%s\" | %s" % (command, submit_command))
logging.debug("output = %s" % output)
job_match = re.search('^\D*(\d+)\s.*$', output)
if job_match:
jobid = job_match.group(1)
else:
logging.warning("Job not submitted!!")
print("WARNING: Job not submitted: %s" % output)
else:
command = re.sub('\n', '; ', command)
work_dir = job_parms['work_dir']
dependency_check = ""
if waitfor_id:
if re.search(":", waitfor_id[0]):
pid_filename = os.path.join(work_dir, "%s_dependent_pids" % job_parms['name'])
pid_file = open(pid_filename, 'w')
pids = waitfor_id[0].split(":")
pid_file.write("\n".join(pids))
pid_file.close()
dependency_check = "while [ -s %s ]; do sleep 600; for pid in `cat %s`; do kill -0 \"$pid\" 2>/dev/null || sed -i \"/^$pid$/d\" %s; done; done; rm %s; " % (
pid_filename, pid_filename, pid_filename, pid_filename)
else:
pid = waitfor_id[0]
dependency_check = "while kill -0 %s; do sleep 300; done; " % pid
# cpu_check = "grep 'cpu ' /proc/stat | awk '{usage=($2+$4)/($2+$4+$5)} END {print 1-usage}'"
total_mem = subprocess.getoutput("free -g | grep Mem: | awk '{ print $2 }'")
mem_requested = job_parms['mem_requested']
if mem_requested > total_mem:
mem_requested = total_mem
print("total_mem = %s" % total_mem)
mem_needed = float(mem_requested) * 750
mem_check = "while [ `free -m | grep cache: | awk '{ print $4 }'` -lt %d ]; do sleep 300; done; " % mem_needed
submit_command = "%s%s%s" % (dependency_check, mem_check, command)
logging.debug("submit_command = %s" % submit_command)
output_log = os.open(os.path.join(work_dir, "%s.out" % job_parms['name']), os.O_WRONLY | os.O_CREAT)
proc = subprocess.Popen(submit_command, stderr=subprocess.STDOUT, stdout=output_log, shell=True, cwd=work_dir)
jobid = str(proc.pid)
logging.info("jobid = %s" % jobid)
return jobid
def _release_hold(job_submitter, job_id):
import subprocess
if job_submitter == "PBS" or job_submitter == "SGE":
command = "qrls %s" % job_id
elif job_submitter == "SLURM":
command = "scontrol release %s" % job_id
else:
return
logging.info("command = %s", command)
output = subprocess.getoutput(command)
logging.debug("output = %s", output)
def _index_reference(configuration):
import os
import re
output_folder = configuration["output_folder"]
job_parms = configuration["index"][3]
ref_path = configuration["reference"][1]
ref_folder = os.path.join(output_folder, "reference")
if not os.path.exists(ref_folder):
os.makedirs(ref_folder)
# Copy the reference as $output_folder/reference/reference.fasta, verifying its format first. Replace it if it already exists.
reference = os.path.join(ref_folder, "reference.fasta")
if os.path.exists(reference):
os.remove(reference)
index_commands = ["format_fasta --inputfasta %s --outputfasta %s" % (ref_path, reference)]
# Gather all of the index commands that need to be run
bwa_done = False
for aligner in configuration["aligners"]:
(name, path) = aligner[0:2]
if re.search('bwa', name, re.IGNORECASE):
if not bwa_done:
index_commands.append("%s index %s" % (path, reference))
bwa_done = True
elif re.search('b(ow)?t(ie)?2', name, re.IGNORECASE):
bt2path = os.path.split(path)[0]
bt2_build_path = os.path.join(bt2path, "bowtie2-build")
index_commands.append("%s %s reference" % (bt2_build_path, reference))
elif re.search('novo', name, re.IGNORECASE):
novopath = os.path.split(path)[0]
novoindex_path = os.path.join(novopath, "novoindex")
index_commands.append("%s %s.idx %s" % (novoindex_path, reference, reference))
elif re.search('snap', name, re.IGNORECASE):
index_commands.append("%s index %s %s" % (path, reference, os.path.join(ref_folder, "snap")))
else:
print("Unknown aligner \'%s\' found, don't know how to index the reference for it. Skipping..." % name)
#if we are using GATK, we also need to create a Sequence Dictionary and samtools index of the reference
if next((v for i, v in enumerate(configuration["snpcallers"]) if re.search('gatk', v[0], re.IGNORECASE)), None):
#picard_path = configuration["picard"][1] or ""
picard_path = configuration["picard"][1]
picard_memory = 2
if configuration["picard"][3]:
picard_memory = configuration["picard"][3]['mem_requested'] or 2
samtools_path = configuration["samtools"][1] or "samtools"
out_file = os.path.join(ref_folder, "reference.dict")
index_commands.append("java -Xmx%sG -jar %s CreateSequenceDictionary R=%s O=%s" % (picard_memory, picard_path, reference, out_file))
index_commands.append("%s faidx %s" % (samtools_path, reference))
command = "\n".join(index_commands)
job_parms['work_dir'] = ref_folder
job_id = _submit_job(configuration["job_submitter"], command, job_parms, hold=True)
return job_id, reference
def _run_bwa(read_tuple, aligner, samtools, job_submitter, index_job_id, reference, output_folder):
import re
import os
(name, read1) = read_tuple[0:2]
read2 = read_tuple[2] if len(read_tuple) >= 3 else ""
bam_string = "\'@RG\\tID:%s\\tSM:%s\'" % (name, name)
sampath = samtools[1]
(aligner_name, path, args, job_parms) = aligner
ncpus = job_parms['num_cpus']
aligner_command = ""
if re.search('mem', aligner_name, re.IGNORECASE):
aligner_name = "bwamem"
aligner_command = "%s mem -R %s %s -t %s %s %s %s" % (path, bam_string, args, ncpus, reference, read1, read2)
else:
aligner_name = "bwa"
# Parse read file basename
old_format_string = "-I" if re.search('(?:.*\/)?[^\/]+?_[12]_sequence\.txt(?:\.gz)?$', read1,
re.IGNORECASE) else ""
work_dir = os.path.join(output_folder, aligner_name)
command_parts = []
if read2:
output_file_1 = os.path.join(work_dir, "%s-R1.sai" % name)
output_file_2 = os.path.join(work_dir, "%s-R2.sai" % name)
command_parts.append("%s aln %s %s %s -t %s -f %s %s" % (
path, old_format_string, reference, read1, ncpus, output_file_1, args))
command_parts.append("%s aln %s %s %s -t %s -f %s %s" % (
path, old_format_string, reference, read2, ncpus, output_file_2, args))
command_parts.append("%s sampe -r %s %s %s %s %s %s %s" % (
path, bam_string, reference, output_file_1, output_file_2, read1, read2, args))
else:
output_file = os.path.join(work_dir, "%s.sai" % name)
command_parts.append("%s aln %s %s %s -t %s -f %s %s" % (
path, old_format_string, reference, read1, ncpus, output_file, args))
command_parts.append(
"%s samse -r %s %s %s %s %s" % (path, bam_string, reference, output_file, read1, args))
aligner_command = "\n".join(command_parts)
bam_nickname = "%s-%s" % (name, aligner_name)
samview_command = "%s view -S -b -h -" % sampath
samsort_command = "%s sort - %s" % (sampath, bam_nickname)
samindex_command = "%s index %s.bam" % (sampath, bam_nickname)
command = "%s | %s | %s \n %s" % (aligner_command, samview_command, samsort_command, samindex_command)
work_dir = os.path.join(output_folder, aligner_name)
if not os.path.exists(work_dir):
os.makedirs(work_dir)
final_file = os.path.join(work_dir, "%s.bam" % bam_nickname)
job_parms['name'] = "nasp_%s_%s" % (aligner_name, name)
job_parms['work_dir'] = work_dir
job_id = _submit_job(job_submitter, command, job_parms, (index_job_id,))
return bam_nickname, job_id, final_file
def _run_bowtie2(read_tuple, aligner, samtools, job_submitter, index_job_id, reference, output_folder):
"""
Args:
read_tuple:
aligner (list): name, path, args, job parameters
samtools:
job_submitter (str):
index_job_id (tuple): (jobid, action)
reference:
output_folder:
Returns:
tuple: bam nickname, job id, path to bam file
"""
import os
(name, read1) = read_tuple[0:2]
read2 = read_tuple[2] if len(read_tuple) >= 3 else ""
read_string = "-1 %s -2 %s" % (read1, read2) if read2 else "-U %s" % read1
ref_string = os.path.splitext(reference)[0]
bam_string = "--rg-id \'%s\' --rg \'SM:%s\'" % (name, name)
sampath = samtools[1]
(path, args, job_parms) = aligner[1:4]
aligner_name = "bowtie2"
ncpus = job_parms['num_cpus']
aligner_command = "%s %s -p %s %s -x %s %s" % (path, args, ncpus, bam_string, ref_string, read_string)
bam_nickname = "%s-%s" % (name, aligner_name)
samview_command = "%s view -S -b -h -" % sampath
samsort_command = "%s sort - %s" % (sampath, bam_nickname)
samindex_command = "%s index %s.bam" % (sampath, bam_nickname)
command = "%s | %s | %s \n %s" % (aligner_command, samview_command, samsort_command, samindex_command)
work_dir = os.path.join(output_folder, aligner_name)
if not os.path.exists(work_dir):
os.makedirs(work_dir)
final_file = os.path.join(work_dir, "%s.bam" % bam_nickname)
job_parms['name'] = "nasp_%s_%s" % (aligner_name, name)
job_parms['work_dir'] = work_dir
job_id = _submit_job(job_submitter, command, job_parms, (index_job_id,))
return bam_nickname, job_id, final_file
def _run_novoalign(read_tuple, aligner, samtools, job_submitter, index_job_id, reference, output_folder):
import os
(name, read1) = read_tuple[0:2]
read2 = read_tuple[2] if len(read_tuple) >= 3 else ""
paired_string = "-i PE 500,100" if read2 else ""
bam_string = "\'@RG\\tID:%s\\tSM:%s\'" % (name, name)
sampath = samtools[1]
(path, args, job_parms) = aligner[1:4]
aligner_name = "novo"
ncpus = job_parms['num_cpus']
aligner_command = "%s -f %s %s %s -c %s -o SAM %s -d %s.idx %s" % (
path, read1, read2, paired_string, ncpus, bam_string, reference, args)
bam_nickname = "%s-%s" % (name, aligner_name)
samview_command = "%s view -S -b -h -" % sampath
samsort_command = "%s sort - %s" % (sampath, bam_nickname)
samindex_command = "%s index %s.bam" % (sampath, bam_nickname)
command = "%s | %s | %s \n %s" % (aligner_command, samview_command, samsort_command, samindex_command)
work_dir = os.path.join(output_folder, aligner_name)
if not os.path.exists(work_dir):
os.makedirs(work_dir)
final_file = os.path.join(work_dir, "%s.bam" % bam_nickname)
job_parms['name'] = "nasp_%s_%s" % (aligner_name, name)
job_parms['work_dir'] = work_dir
job_id = _submit_job(job_submitter, command, job_parms, (index_job_id,))
return bam_nickname, job_id, final_file
def _run_snap(read_tuple, aligner, samtools, job_submitter, index_job_id, reference, output_folder):
import os
(name, read1) = read_tuple[0:2]
read2 = read_tuple[2] if len(read_tuple) >= 3 else ""
paired_string = "paired" if read2 else "single"
sampath = samtools[1]
(path, args, job_parms) = aligner[1:4]
aligner_name = "snap"
reads = []
for read in (read1, read2):
reads.append(read)
read_string = " ".join(reads)
ref_dir = os.path.join(os.path.join(output_folder, "reference"), aligner_name)
ncpus = job_parms['num_cpus']
bam_nickname = "%s-%s" % (name, aligner_name)
aligner_command = "%s %s %s %s -t %s -b %s -o -sam -" % (
path, paired_string, ref_dir, read_string, ncpus, args)
samview_command = "%s view -S -b -h -" % (sampath)
samsort_command = "%s sort - %s" % (sampath, bam_nickname)
samindex_command = "%s index %s.bam" % (sampath, bam_nickname)
command = "%s | %s | %s \n %s" % (aligner_command, samview_command, samsort_command, samindex_command)
work_dir = os.path.join(output_folder, aligner_name)
if not os.path.exists(work_dir):
os.makedirs(work_dir)
final_file = os.path.join(work_dir, "%s.bam" % bam_nickname)
job_parms['name'] = "nasp_%s_%s" % (aligner_name, name)
job_parms['work_dir'] = work_dir
job_id = _submit_job(job_submitter, command, job_parms, (index_job_id,))
return bam_nickname, job_id, final_file
def _run_gatk(nickname, bam_file, snpcaller, job_submitter, aligner_job_id, reference, output_folder):
import os
(path, args, job_parms) = snpcaller[1:4]
snpcaller_name = "gatk"
ncpus = job_parms['num_cpus']
memory = job_parms['mem_requested']
vcf_nickname = "%s-%s" % (nickname, snpcaller_name)
work_dir = os.path.join(output_folder, snpcaller_name)
if not os.path.exists(work_dir):
os.makedirs(work_dir)
command = "java -Xmx%sG -jar %s -T UnifiedGenotyper -dt NONE -glm BOTH -I %s -R %s -nt %s -o %s.vcf -out_mode EMIT_ALL_CONFIDENT_SITES -baq RECALCULATE %s" % (
memory, path, bam_file, reference, ncpus, vcf_nickname, args)
final_file = os.path.join(work_dir, "%s.vcf" % vcf_nickname)
job_parms['name'] = "nasp_%s_%s" % (snpcaller_name, nickname)
job_parms['work_dir'] = work_dir
job_id = _submit_job(job_submitter, command, job_parms, (aligner_job_id,))
return vcf_nickname, job_id, final_file
def _run_solsnp(nickname, bam_file, snpcaller, job_submitter, aligner_job_id, reference, output_folder):
import os
(path, args, job_parms) = snpcaller[1:4]
snpcaller_name = "solsnp"
memory = job_parms['mem_requested']
vcf_nickname = "%s-%s" % (nickname, snpcaller_name)
work_dir = os.path.join(output_folder, snpcaller_name)
if not os.path.exists(work_dir):
os.makedirs(work_dir)
final_file = os.path.join(work_dir, "%s.vcf" % vcf_nickname)
bam_link = os.path.join(work_dir, os.path.splitext(os.path.basename(bam_file))[0])
os.symlink(bam_file, bam_link)
command = "java -Xmx%sG -jar %s INPUT=%s REFERENCE_SEQUENCE=%s OUTPUT=%s SUMMARY=true CALCULATE_ALLELIC_BALANCE=true MINIMUM_COVERAGE=1 PLOIDY=Haploid STRAND_MODE=None OUTPUT_FORMAT=VCF OUTPUT_MODE=AllCallable %s" % (
memory, path, bam_link, reference, final_file, args)
job_parms['name'] = "nasp_%s_%s" % (snpcaller_name, nickname)
job_parms['work_dir'] = work_dir
job_id = _submit_job(job_submitter, command, job_parms, (aligner_job_id,))
return vcf_nickname, job_id, final_file
def _run_varscan(nickname, bam_file, snpcaller, samtools, job_submitter, aligner_job_id, reference, output_folder):
import os
import re
(path, args, job_parms) = snpcaller[1:4]
sampath = samtools[1]
snpcaller_name = "varscan"
memory = job_parms['mem_requested']
vcf_nickname = "%s-%s" % (nickname, snpcaller_name)
test = re.match('^(.*)-[a-z]*$', nickname, re.IGNORECASE)
read_nickname = test.group(1) if test else nickname
work_dir = os.path.join(output_folder, snpcaller_name)
if not os.path.exists(work_dir):
os.makedirs(work_dir)
sample_list = os.path.join(work_dir, "%s.txt" % nickname)
pileup_file = os.path.join(os.path.dirname(bam_file), "%s.mpileup" % nickname)
final_file = os.path.join(work_dir, "%s.vcf" % vcf_nickname)
command_parts = ["echo %s > %s" % (read_nickname, sample_list),
"%s mpileup -B -d 10000000 -f %s %s > %s" % (sampath, reference, bam_file, pileup_file),
"java -Xmx%sG -jar %s mpileup2cns %s --output-vcf 1 --vcf-sample-list %s > %s %s" % (
memory, path, pileup_file, sample_list, final_file, args)]
command = "\n".join(command_parts)
job_parms['name'] = "nasp_%s_%s" % (snpcaller_name, nickname)
job_parms['work_dir'] = work_dir
job_id = _submit_job(job_submitter, command, job_parms, (aligner_job_id,))
return vcf_nickname, job_id, final_file
def _run_samtools(nickname, bam_file, snpcaller, samtools, job_submitter, aligner_job_id, reference, output_folder):
import os
(path, args, job_parms) = snpcaller[1:4]
sampath = samtools[1]
snpcaller_name = "samtools"
vcf_nickname = "%s-%s" % (nickname, snpcaller_name)
work_dir = os.path.join(output_folder, snpcaller_name)
if not os.path.exists(work_dir):
os.makedirs(work_dir)
final_file = os.path.join(work_dir, "%s.vcf" % vcf_nickname)
command_parts = ["%s mpileup -uD -d 10000000 -f %s %s" % (sampath, reference, bam_file),
"%s view -ceg %s - > %s" % (path, args, final_file)]
command = " | ".join(command_parts)
job_parms['name'] = "nasp_%s_%s" % (snpcaller_name, nickname)
job_parms['work_dir'] = work_dir
job_id = _submit_job(job_submitter, command, job_parms, (aligner_job_id,))
return vcf_nickname, job_id, final_file
def _find_dups(configuration, index_job_id, reference):
import os
(name, path, _, job_parms) = configuration["dup_finder"]
command = "find_duplicates --nucmerpath %s --reference %s" % (path, reference)
work_dir = os.path.dirname(reference)
if not os.path.exists(work_dir):
# NOTE: This directory is implicitly created by _index_reference.
os.makedirs(work_dir)
final_file = os.path.join(work_dir, "duplicates.txt")
job_parms['name'] = "nasp_%s" % name
job_parms['work_dir'] = work_dir
job_id = _submit_job(configuration["job_submitter"], command, job_parms, (index_job_id,))
return job_id, final_file
def _convert_external_genome(assembly, configuration, index_job_id, reference):
import os
(tool, path, args, job_parms) = configuration["assembly_importer"]
(nucmer_path, nucmer_args) = configuration["dup_finder"][1:3]
(name, fasta) = assembly
work_dir = os.path.join(configuration["output_folder"], "external")
if not os.path.exists(work_dir):
os.makedirs(work_dir)
new_fasta = os.path.join(work_dir, os.path.basename(fasta))
command_parts = ["format_fasta --inputfasta %s --outputfasta %s" % (fasta, new_fasta),
"convert_external_genome --nucmerpath %s --nucmerargs \'%s\' --deltafilterpath %s --deltafilterargs \'%s\' --reference %s --external %s --name %s" % (
nucmer_path, nucmer_args, path, args, reference, new_fasta, name)]
command = "\n".join(command_parts)
final_file = os.path.join(work_dir, "%s.frankenfasta" % name)
job_parms['name'] = "nasp_%s_%s" % (tool, name)
job_parms['work_dir'] = work_dir
job_id = _submit_job(configuration["job_submitter"], command, job_parms, (index_job_id,))
return job_id, final_file
def _trim_adapters(read_tuple, configuration):
import os
(_, path, args, job_parms) = configuration["read_trimmer"]
(name, read1) = read_tuple[0:2]
read2 = read_tuple[2] if len(read_tuple) >= 3 else None
trim_dir = os.path.join(configuration["output_folder"], 'trimmed')
if not os.path.exists(trim_dir):
os.makedirs(trim_dir)
#job_params = {'queue':'', 'mem_requested':6, 'num_cpus':4, 'walltime':8, 'args':''}
job_parms['name'] = "nasp_trim_%s" % name
job_parms['work_dir'] = trim_dir
if read2:
out_reads1 = [name+"_R1_trimmed.fastq", name+"_R1_unpaired.fastq"]
out_reads2 = [name+"_R2_trimmed.fastq", name+"_R2_unpaired.fastq"]
out_reads = [os.path.join(trim_dir, out_reads1[0]), os.path.join(trim_dir, out_reads2[0])]
command = "java -jar %s PE -threads %s %s %s %s %s %s %s %s" % (path, job_parms['num_cpus'], read1, read2, out_reads1[0], out_reads1[1], out_reads2[0], out_reads2[1], args)
else:
out_reads = [os.path.join(trim_dir, name+"_trimmed.fastq")]
command = "java -jar %s SE -threads %s %s %s %s" % (path, job_parms['num_cpus'], read1, out_reads[0], args)
jobid = _submit_job(configuration["job_submitter"], command, job_parms)
return (tuple([name] + out_reads), jobid)
def _align_reads(read_tuple, configuration, index_job_id, reference):
import re
aligner_output = []
for aligner in configuration["aligners"]:
name = aligner[0]
if re.search('bwa', name, re.IGNORECASE):
(bam_nickname, job_id, final_file) = _run_bwa(read_tuple, aligner, configuration["samtools"],
configuration["job_submitter"], index_job_id, reference,
configuration["output_folder"])
if job_id:
aligner_output.append((bam_nickname, job_id, final_file, name))
elif re.search('b(ow)?t(ie)?2', name, re.IGNORECASE):
(bam_nickname, job_id, final_file) = _run_bowtie2(read_tuple, aligner, configuration["samtools"],
configuration["job_submitter"], index_job_id, reference,
configuration["output_folder"])
if job_id:
aligner_output.append((bam_nickname, job_id, final_file, name))
elif re.search('novo', name, re.IGNORECASE):
(bam_nickname, job_id, final_file) = _run_novoalign(read_tuple, aligner, configuration["samtools"],
configuration["job_submitter"], index_job_id, reference,
configuration["output_folder"])
if job_id:
aligner_output.append((bam_nickname, job_id, final_file, name))
elif re.search('snap', name, re.IGNORECASE):
(bam_nickname, job_id, final_file) = _run_snap(read_tuple, aligner, configuration["samtools"],
configuration["job_submitter"], index_job_id, reference,
configuration["output_folder"])
if job_id:
aligner_output.append((bam_nickname, job_id, final_file, name))
else:
print("Unknown aligner \'%s\' found, don't know what to do. Skipping..." % name)
return aligner_output
def _call_snps(aligner_output, configuration, reference):
import re
snpcaller_output = []
for (nickname, aligner_job_id, bam_file, aligner_name) in aligner_output:
if aligner_job_id:
for snpcaller in configuration["snpcallers"]:
name = snpcaller[0]
if re.search('gatk', name, re.IGNORECASE):
(vcf_nickname, job_id, final_file) = _run_gatk(nickname, bam_file, snpcaller,
configuration["job_submitter"], aligner_job_id,
reference, configuration["output_folder"])
if job_id:
snpcaller_output.append((vcf_nickname, job_id, final_file, aligner_name, name))
elif re.search('solsnp', name, re.IGNORECASE):
(vcf_nickname, job_id, final_file) = _run_solsnp(nickname, bam_file, snpcaller,
configuration["job_submitter"], aligner_job_id,
reference, configuration["output_folder"])
if job_id:
snpcaller_output.append((vcf_nickname, job_id, final_file, aligner_name, name))
elif re.search('varscan', name, re.IGNORECASE):
(vcf_nickname, job_id, final_file) = _run_varscan(nickname, bam_file, snpcaller,
configuration["samtools"],
configuration["job_submitter"], aligner_job_id,
reference, configuration["output_folder"])
if job_id:
snpcaller_output.append((vcf_nickname, job_id, final_file, aligner_name, name))
elif re.search('samtools', name, re.IGNORECASE):
(vcf_nickname, job_id, final_file) = _run_samtools(nickname, bam_file, snpcaller,
configuration["samtools"],
configuration["job_submitter"], aligner_job_id,
reference, configuration["output_folder"])
if job_id:
snpcaller_output.append((vcf_nickname, job_id, final_file, aligner_name, name))
else:
print("Unknown SNP caller \'%s\' found, don't know what to do. Skipping..." % name)
return snpcaller_output
def _index_bams(configuration, index_job_id):
import os
alignments = configuration["alignments"]
output_folder = configuration["output_folder"]
job_parms = configuration["bam_index"][3]
sampath = configuration["samtools"][1]
picard_path = configuration["picard"][1] or ""
bam_folder = os.path.join(output_folder, "bams")
if not os.path.exists(bam_folder):
os.makedirs(bam_folder)
bam_files = []
command_parts = []
for (name, bam) in alignments:
new_file = os.path.join(bam_folder, "%s.bam" % name)
bam_files.append((name, new_file))
# GATK requires bams have a ReadGroup header. If one if not present,
# assign a default value.
command_parts.append((
"if ! {samtools} view -H {in_bam} | grep \"^@RG\"; then\n"
" java -jar {picard} AddOrReplaceReadGroups"
" INPUT={in_bam}"
" OUTPUT={out_bam}"
" SORT_ORDER=coordinate"
" ID=1"
" SM={sample_name}"
" LB={sample_name}"
" PL=illumina"
" PU=1\n"
"else\n"
" ln -s -f {in_bam} {out_bam}\n"
"fi"
).format(samtools=sampath, picard=picard_path, out_bam=bam, in_bam=new_file, sample_name=os.path.splitext(bam)[0]))
command_parts.append("%s index %s" % (sampath, new_file))
command = "\n".join(command_parts)
job_parms['work_dir'] = bam_folder
job_id = _submit_job(configuration["job_submitter"], command, job_parms, (index_job_id,))
return bam_files, job_id
def _create_matrices(configuration, reference, dups_file, vcf_files, franken_fastas, job_ids):
import nasp.matrix_DTO as matrix_DTO
import os
output_dir = configuration['output_folder']
path = configuration["matrix_generator"][1]
job_parms = configuration["matrix_generator"][3]
matrix_parms = {'reference-fasta': reference, 'reference-dups': dups_file}
if "coverage_filter" in configuration:
matrix_parms['minimum-coverage'] = configuration['coverage_filter']
if "proportion_filter" in configuration:
matrix_parms['minimum-proportion'] = configuration['proportion_filter']
matrix_parms['matrix-folder'] = os.path.join(output_dir, 'matrices')
if not os.path.exists(matrix_parms['matrix-folder']):
os.makedirs(matrix_parms['matrix-folder'])
matrix_parms['stats-folder'] = os.path.join(output_dir, 'statistics')
if not os.path.exists(matrix_parms['stats-folder']):
os.makedirs(matrix_parms['stats-folder'])
if 'filter_matrix_format' in configuration:
matrix_parms['filter-matrix-format'] = configuration['filter_matrix_format']
dto_file = os.path.join(output_dir, "matrix_dto.xml")
matrix_DTO.write_dto(matrix_parms, franken_fastas, vcf_files, dto_file)
jobs_to_wait_for = (":".join(job_ids), 'afterany') if job_ids else None
command = "%s matrix --dto-file %s --num-threads %s" % (path, dto_file, job_parms['num_cpus'])
job_parms['work_dir'] = output_dir
job_id = _submit_job(configuration["job_submitter"], command, job_parms, jobs_to_wait_for, notify=True)
return job_id
def _export_matrices(configuration, matrix_job_id):
import os
gonasp_path = configuration["matrix_generator"][1]
matrix_folder = os.path.join(configuration['output_folder'], 'matrices')
job_parms = {'name': 'nasp_export', 'num_cpus': '4', 'mem_requested': '4', 'walltime': '8', 'queue': '', 'args': '', 'work_dir': matrix_folder}
commands = []
# The command will be of the following form:
# gonasp export --type fasta bestsnp.tsv > bestsnp.fasta &
# gonasp export --type vcf bestsnp.tsv > bestsnp.vcf &
# wait
# The '&' will launch each export commands as background tasks then the 'wait' will wait for them to finish
for type in ['vcf', 'fasta']:
for exported_matrix in ['bestsnp', 'missingdata']:
commands.append("{0} export --type {1} {2}.tsv > {2}.{1}".format(gonasp_path, type, exported_matrix))
commands.append('wait')
command = ' & '.join(commands)
job_id = _submit_job(configuration["job_submitter"], command, job_parms, (matrix_job_id,), notify=False)
[docs]def begin(configuration):
(index_job_id, reference) = _index_reference(configuration)
if not index_job_id:
print("Failed to submit the index job, there is no point in continuing. Please try again.")
raise SystemExit()
dups_file = None
job_ids = []
vcf_files = []
franken_fastas = []
if configuration["find_dups"] != "False":
(job_id, dups_file) = _find_dups(configuration, index_job_id, reference)
if job_id:
job_ids.append(job_id)
for assembly in configuration["assemblies"]:
(job_id, final_file) = _convert_external_genome(assembly, configuration, index_job_id, reference)
if job_id:
job_ids.append(job_id)
franken_fastas.append((assembly[0], "nucmer", final_file))
if configuration["alignments"]:
pre_aligned = []
(bam_files, bamindex_job_id) = _index_bams(configuration, index_job_id)
for (name, bam) in bam_files:
pre_aligned.append((name, bamindex_job_id, bam, "pre-aligned"))
snpcaller_output = _call_snps(pre_aligned, configuration, reference)
for (vcf_nickname, job_id, final_file, aligner, snpcaller) in snpcaller_output:
if job_id:
job_ids.append(job_id)
vcf_files.append((vcf_nickname, aligner, snpcaller, final_file))
for read_tuple in configuration["reads"]:
dependent_job_id = None
if "trim_reads" in configuration and configuration["trim_reads"] == "True":
(read_tuple, dependent_job_id) = _trim_adapters(read_tuple, configuration)
dependencies = index_job_id
if dependent_job_id:
dependencies += ":"+dependent_job_id
aligner_output = _align_reads(read_tuple, configuration, dependencies, reference)
snpcaller_output = _call_snps(aligner_output, configuration, reference)
for (vcf_nickname, job_id, final_file, aligner, snpcaller) in snpcaller_output:
if job_id:
job_ids.append(job_id)
vcf_files.append((vcf_nickname, aligner, snpcaller, final_file))
for (name, vcf) in configuration["vcfs"]:
vcf_files.append((name, "pre-aligned", "pre-called", vcf))
matrix_job_id = _create_matrices(configuration, reference, dups_file, vcf_files, franken_fastas, job_ids)
_export_matrices(configuration, matrix_job_id)
_release_hold(configuration["job_submitter"], index_job_id)
[docs]def main():
import nasp.configuration_parser as configuration_parser
commandline_args = _parse_args()
configuration = configuration_parser.parse_config(commandline_args.config)
begin(configuration)
if __name__ == "__main__":
main()