#!/usr/bin/env python3
__author__ = "David Smith"
__version__ = "0.9.9"
__email__ = "dsmith@tgen.org"
import logging
def _parse_args():
"""
Parse command line arguments and return an object whose attributes are the
settings provided, as per standard argparse behavior.
For backward compatibility when we were changing dispatchers, the options
could be directly provided on the command line, or a single argument could
specify the path to the XML file with the remaining arguments in it. As
the old dispatcher is deprecated, so perhaps is every option below except
'--dto-file', which would then be made required.
"""
import argparse
parser = argparse.ArgumentParser(description="Meant to be called from the pipeline automatically.")
parser.add_argument("--mode", required=True, choices=['commandline', 'xml'],
help="Data passing mode, must be set to 'commandline' or 'xml'.")
parser.add_argument("--reference-fasta", help="Path to input reference fasta file.")
parser.add_argument("--reference-dups", help="Path to input reference dups file.")
parser.add_argument("--input-files", nargs="+", help="Path to input VCF/fasta files for matrix conversion.")
parser.add_argument("--matrix-folder", default="matrices", help="Name of folder to write output matries to.")
parser.add_argument("--stats-folder", default="statistics", help="Name of folder to write statistics files to.")
parser.add_argument("--minimum-coverage", type=int, default=10, help="Minimum coverage depth at a position.")
parser.add_argument("--minimum-proportion", type=float, default=0.9,
help="Minimum proportion of reads that must match the call at a position.")
parser.add_argument("--num-threads", type=int, default=1, help="Number of threads to use when processing input.")
parser.add_argument("--dto-file", help="Path to a matrix_dto XML file that defines all the parameters.")
return parser.parse_args()
def _parse_input_config(commandline_args):
"""
Set the configuration options from the XML file and add them as attributes
to the object returned by _parse_args().
Since getting these options directly from the command line is perhaps
deprecated, we should probably store the configuration options from the
XML file in a separate dictionary, instead of mixing the two data sources
together as now.
"""
import nasp.matrix_DTO as matrix_DTO
(matrix_parms, input_files) = matrix_DTO.parse_dto(commandline_args.dto_file)
commandline_args.reference_fasta = matrix_parms['reference-fasta']
commandline_args.reference_dups = matrix_parms.get('reference-dups')
commandline_args.matrix_folder = matrix_parms['matrix-folder']
commandline_args.stats_folder = matrix_parms['stats-folder']
commandline_args.minimum_coverage = int(
matrix_parms['minimum-coverage']) if "minimum-coverage" in matrix_parms else 0
commandline_args.minimum_proportion = float(
matrix_parms['minimum-proportion']) if "minimum-proportion" in matrix_parms else 0
if "filter-matrix-format" in matrix_parms:
commandline_args.filter_matrix_format = matrix_parms['filter-matrix-format']
else:
commandline_args.filter_matrix_format = None
commandline_args.input_files = input_files
return commandline_args
[docs]def import_reference(reference, reference_path, dups_path):
"""
Take an empty reference object and populate it with the data from a
reference file, and a dups file if any.
Does not return anything, as the passed-in object is modified.
"""
reference.import_fasta_file(reference_path)
if dups_path is not None:
reference.import_dups_file(dups_path)
# from sys import stdout
#reference._genome._send_to_fasta_handle( stdout )
#reference._dups._send_to_fasta_handle( stdout )
[docs]def import_external_fasta(input_file):
"""
Create a FastaGenome object, set its metadata, and populate it with the
data from a fasta file.
Must return the data as the single item in an array, because other file
formats potentially contain several genomes.
"""
from nasp.nasp_objects import FastaGenome
genome = FastaGenome()
set_genome_metadata(genome, input_file)
genome.import_fasta_file(genome.file_path(), "franken::")
# from sys import stdout
#genome._genome._send_to_fasta_handle( stdout )
return [genome]
# FIXME split into a larger number of smaller more testable functions
# FIXME This belongs in VCFGenome object perhaps?
[docs]def read_vcf_file(reference, min_coverage, min_proportion, input_file):
"""
Submit VCF to be read in to VCF parser, populate genome data and filter
data from the parsed VCF data, return a list of the read-in genomes.
"""
genomes = {}
file_path = get_file_path(input_file)
with open(file_path, 'r') as vcf_filehandle:
from nasp.nasp_objects import VCFGenome, Genome, ReferenceCallMismatch, VCFRecord
vcf_record = VCFRecord(file_path)
vcf_samples = vcf_record.get_samples()
for vcf_sample in vcf_samples:
genomes[vcf_sample] = VCFGenome()
set_genome_metadata(genomes[vcf_sample], input_file)
genomes[vcf_sample].set_nickname(vcf_sample)
while vcf_record.fetch_next_record():
current_contig = vcf_record.get_contig()
current_pos = vcf_record.get_position()
# Skip if position isn't in reference; maybe user truncated reference to exclude an uninteresting region.
if current_pos <= reference.get_contig_length(current_contig):
reference_call = reference.get_call(current_pos, None, current_contig)
simplified_refcall = Genome.simple_call(reference_call)
if ( simplified_refcall != 'N' ) and (
simplified_refcall != Genome.simple_call(vcf_record.get_reference_call()[0]) ):
# Reference call from reference fasta differs from reference call in VCF file at the same position.
raise ReferenceCallMismatch(reference_call, vcf_record.get_reference_call(), file_path,
current_contig, current_pos)
for vcf_sample in vcf_samples:
sample_info = vcf_record.get_sample_info(vcf_sample)
# FIXME indels
if sample_info['call'] is not None:
genomes[vcf_sample].set_call(sample_info['call'], current_pos, 'X', current_contig)
if sample_info['was_called']:
genomes[vcf_sample].set_was_called('Y', current_pos, current_contig)
if sample_info['coverage'] is not None:
if sample_info['coverage'] == 'PASS' or sample_info['coverage'] >= min_coverage:
genomes[vcf_sample].set_coverage_pass('Y', current_pos, current_contig)
else:
genomes[vcf_sample].set_coverage_pass('N', current_pos, current_contig)
if sample_info['proportion'] is not None:
if sample_info['proportion'] == 'PASS' or sample_info['proportion'] >= min_proportion:
genomes[vcf_sample].set_proportion_pass('Y', current_pos, current_contig)
else:
genomes[vcf_sample].set_proportion_pass('N', current_pos, current_contig)
elif not sample_info['is_a_snp']:
# Some big SNP callers, like GATK, do not provide proportion information when
# the position is called reference. We cannot filter these positions.
genomes[vcf_sample].set_proportion_pass('-', current_pos, current_contig)
# from sys import stdout
#for genome in genomes:
# genomes[genome]._genome._send_to_fasta_handle( stdout )
return genomes.values()
# FIXME The information these three functions capture is in the XML file.
# FIXME These should all be deprecated and the XML data used instead.
[docs]def determine_file_type(input_file):
""" Get the file type from the packed input filename string """
import re
file_type = None
# Format is "type,aligner,caller,::path"
filename_match = re.match(r'^((?:[^,:]+,)+)::.*$', input_file)
if filename_match:
generator_array = filename_match.group(1).split(',')
file_type = generator_array[0]
# print( file_type )
return file_type
[docs]def get_file_path(input_file):
""" Get the file path from the packed input filename string """
import re
file_path = None
# Format is "type,aligner,caller,::path"
filename_match = re.match(r'^(?:[^,:]+,)+::(.*)$', input_file)
if filename_match:
file_path = filename_match.group(1)
else:
file_path = input_file
# print( file_path )
return file_path
# print( genome.identifier() )
[docs]def write_output_matrices(genomes, matrix_folder, matrix_format_choices):
"""
Write matrices from genome collection data.
Defines the matrix types for future expansion of custom matrix options.
This information eventually should come from the user interface and be
included in the XML configuration file, rather than hardcoded here.
The matrix_format_choices option comes from the XML to here.
"""
matrix_formats = [
{
'filename': '' + matrix_folder + '/master_matrix.tsv',
'dataformat': 'matrix',
'filter': 'allcallable'
},
{
'filename': '' + matrix_folder + '/bestsnp_matrix.tsv',
'dataformat': 'matrix',
'filter': 'bestsnp'
},
{
'filename': '' + matrix_folder + '/bestsnp_matrix.snpfasta',
'dataformat': 'fasta',
'filter': 'bestsnp'
},
{
'filename': '' + matrix_folder + '/bestsnp_matrix.vcf',
'dataformat': 'vcf',
'filter': 'bestsnp'
},
{
'filename': '' + matrix_folder + '/missingdata_matrix.tsv',
'dataformat': 'matrix',
'filter': 'missingdata'
},
{
'filename': '' + matrix_folder + '/missingdata_matrix.snpfasta',
'dataformat': 'fasta',
'filter': 'missingdata'
},
{
'filename': '' + matrix_folder + '/missingdata_matrix.vcf',
'dataformat': 'vcf',
'filter': 'missingdata'
}]
# The XML format transferring this information should be more elaborate someday.
if matrix_format_choices is not None and matrix_format_choices == "include_allref_pos":
matrix_formats.extend( [
{
'filename': '' + matrix_folder + '/withallrefpos_matrix.tsv',
'dataformat': 'matrix',
'filter': 'includeref'
},
{
'filename': '' + matrix_folder + '/withallrefpos_matrix.snpfasta',
'dataformat': 'fasta',
'filter': 'includeref'
},
{
'filename': '' + matrix_folder + '/withallrefpos_matrix.vcf',
'dataformat': 'vcf',
'filter': 'includeref'
}] )
genomes.write_to_matrices(matrix_formats)
[docs]def write_stats_data(genomes, stats_folder):
""" Write stats data from genome collection to preset filenames """
general_stats_file = '' + stats_folder + '/general_stats.tsv'
sample_stats_file = '' + stats_folder + '/sample_stats.tsv'
genomes.write_to_stats_files(general_stats_file, sample_stats_file)
[docs]def main():
"""
Main flow control function for the script. Is sequential over the
following steps:
1. Parse arguments
2. Read in reference
3. Read in all query genomes in parallel
4. Write output matrices
5. Write stats files
"""
commandline_args = _parse_args()
if commandline_args.dto_file:
commandline_args = _parse_input_config(commandline_args)
logging.basicConfig(level=logging.WARNING)
from nasp.nasp_objects import ReferenceGenome, GenomeCollection
reference = ReferenceGenome()
import_reference(reference, commandline_args.reference_fasta, commandline_args.reference_dups)
genomes = GenomeCollection()
genomes.set_reference(reference)
parse_input_files(commandline_args.input_files, commandline_args.num_threads, genomes,
commandline_args.minimum_coverage, commandline_args.minimum_proportion)
write_output_matrices(genomes, commandline_args.matrix_folder, commandline_args.filter_matrix_format)
write_stats_data(genomes, commandline_args.stats_folder)
if __name__ == "__main__":
main()