#!/usr/bin/env python3
__author__ = "David Smith"
__version__ = "1.0.0"
__email__ = "dsmith@tgen.org"
import logging
[docs]class GenomeStatus(object):
"""
Contains and manipulates any generic data that is per-contig-position.
This could be any single type of data, like actual bases, filter data,
depth information, insertion data, pileups, etc.
In the perl version, this object was originally a hash of lists, and
converted to a hash of strings for performance. In this version, it was
originally a dictionary of strings, and converted to a dictionary of
lists for performance and flexibility.
Whether single characters, numbers, boolean values, or a mixture of data
types are used does not seem to affect memory and performance.
Storing lists per-contig-position with this class is a complicated
affair, as several of the manipulation functions assume you mean to
manipulate a continuous range of positions instead of a single position
when you do that.
"""
"""
The conventions used for what data is stored are as follows:
Genomes:
A, C, G, T, U, a, c, g, t, u: The respective call.
N, n: Called "N" according to upstream analysis tools.
X: Not called by upstream analysis tools.
. or empty string: A deletion relative to reference.
String of length >1: An insertion relative to reference.
Any other single letter: A degeneracy.
!: An algorithmic failure.
Duplicate region data:
0: Position not in a region that is duplicated within the reference.
1: Position is in a region that is duplicated.
-: Duplicate checking at this position was skipped by the user.
!: An algorithmic failure.
Filters:
Y: This position passed its filter.
N: This position failed its filter.
?: The filter could not be checked, and so the position is assumed
to have failed.
-: The filter was not applicable, or skipped, or could not be checked
for a known reason, and so is assumed to have passed.
!: An algorithmic failure.
"""
# Arrays are zero-indexed, genome positions are one-indexed. Off-by-one errors? Never heard of 'em.
def __init__(self):
"""
Attributes:
_status_data: The dictionary of lists that stores the actual genome.
data. The keys of the dictionary are the contig names. The lists
correspond to the position data on that contig. Genome position is
list position + 1.
_current_contig: Tracks the most recently-referenced contig, for
convenience EG reading in fastas line-by-line.
"""
self._status_data = {}
self._current_contig = None
# NOTE(jtravis): Does not raise exception if contig name is None or the empty string as documented
[docs] def add_contig(self, contig_name):
"""
Defines a new empty contig in the genome.
By default, if an unrecognized contig is encountered, a new empty
contig will be created and then acted upon.
Otherwise, add_contig must be called on a new contig first, or an
InvalidContigName will be thrown.
Args:
contig_name (str): Unique contig description.
Raises:
InvalidContigName: If contig_name is undefined.
"""
if contig_name not in self._status_data:
self._status_data[contig_name] = []
self._current_contig = contig_name
# NOTE(jtravis): unused parameter create_contig
[docs] def set_current_contig(self, contig_name, create_contig=True):
"""
Sets the most-recently-referenced contig without actually performing
any action on the data.
Can be called to return the current contig without changing it if
given a contig_name of None.
Will create the contig if it has not been encountered yet by
default, or throw an InvalidContigName otherwise.
Args:
contig_name (str): Unique contig description or None to query the current contig name.
create_contig (bool): If True and the contig does not exist, an empty contig will be created.
Returns:
str: Name of the last accessed contig or None.
Raises:
InvalidContigName: If create_contig is False and the contig does not exist.
"""
if contig_name is None:
contig_name = self._current_contig
elif contig_name in self._status_data:
self._current_contig = contig_name
elif create_contig:
self.add_contig(contig_name)
else:
raise InvalidContigName(contig_name, self.get_contigs())
return contig_name
[docs] def get_contigs(self):
"""
Returns:
list: Sorted list of contig names.
"""
return sorted(self._status_data.keys())
[docs] def append_contig(self, genome_data, contig_name=None):
"""
Places the passed-in data at the position following the last
defined position on the contig. If passed a list, will give each
item in the list its own position.
Args:
genome_data (list): List of nucleotide symbols.
contig_name (str): Unique contig description.
"""
contig_name = self.set_current_contig(contig_name)
self._status_data[contig_name].extend(genome_data)
[docs] def extend_contig(self, new_length, missing_range_filler, contig_name=None):
"""
Ensures the contig is at least new_length positions long
Args:
new_length (int): Minimum contig length.
missing_range_filler (str): Placeholder character for undefined areas at the end of the contig.
contig_name (str): Unique contig description.
"""
contig_name = self.set_current_contig(contig_name)
if len(self._status_data[contig_name]) < new_length:
self._status_data[contig_name].extend(
[missing_range_filler] * ( new_length - len(self._status_data[contig_name]) ))
[docs] def set_value(self, new_data, position_number, missing_range_filler="!", contig_name=None):
"""
Sets the value at position_number on the contig.
If passed a list, will change the continuous range of positions
starting at position_number, one position per list item.
Will extend the contig with missing_range_filler filling undefined
values if the position to set is beyond the end of the contig.
Args:
new_data (str or list): Single or list of nucleotide symbols.
position_number (int): 1-indexed contig position number.
missing_range_filler (str): Filler for undefined regions before the set value. Modifies the data.
contig_name (str): Unique contig description
"""
contig_name = self.set_current_contig(contig_name)
self.extend_contig(position_number, missing_range_filler, contig_name)
if len(new_data) > 1:
self._status_data[contig_name][position_number - 1:position_number - 1 + len(new_data)] = new_data
else:
self._status_data[contig_name][position_number - 1] = new_data
[docs] def get_value(self, first_position, last_position=None, contig_name=None, filler_value=None):
"""
Args:
contig_name (str): Unique contig description.
first_position (int): 1-indexed first position number.
last_position (int): Optional last position to select a range or -1 to specify the end of the contig.
filler_value (str): Optional filler for undefined regions beyond the genome data. Does not modify the data.
Returns:
Returns the nucleotide at first_position, list of values from
first_position to last_position inclusive, or None.
"""
contig_name = self.set_current_contig(contig_name)
queried_value = filler_value
if last_position is None:
if first_position <= len(self._status_data[contig_name]):
queried_value = self._status_data[contig_name][first_position - 1]
else:
queried_value = []
if last_position == -1:
last_position = len(self._status_data[contig_name])
if last_position >= first_position and first_position <= len(self._status_data[contig_name]):
queried_value = self._status_data[contig_name][first_position - 1:last_position]
if filler_value is not None and len(queried_value) < last_position - first_position + 1:
queried_value.extend([filler_value] * ( last_position - first_position + 1 - len(queried_value) ))
return queried_value
[docs] def get_contig_length(self, contig_name=None):
"""
Args:
contig_name (str): Unique contig description.
Returns:
int: Number of positions defined in the contig
"""
contig_name = self.set_current_contig(contig_name)
return len(self._status_data[contig_name])
[docs] def send_to_fasta_handle(self, output_handle, contig_prefix="", max_chars_per_line=80):
"""
Assumes the genome data is in string format or stringifiable and one
character per position, and then writes it in to the handle open for
writing. The file format is like a typical fasta were the genome
data to be base calls (but no checks are performed).
The contigs are sorted by name, not the order they were created.
Args:
output_handle (file object): File to append FASTA string.
contig_prefix (str): Prefix for all contig names.
max_chars_per_line (int): A positive value will limit the max chars per line.
"""
for current_contig in self.get_contigs():
output_handle.write(">" + contig_prefix + current_contig + "\n")
if max_chars_per_line > 0:
i = 0
while ( max_chars_per_line * i ) < len(self._status_data[current_contig]):
output_handle.write(''.join(self._status_data[current_contig][
( max_chars_per_line * i ):( max_chars_per_line * ( i + 1 ) )]) + "\n")
i += 1
else:
output_handle.write(''.join(self._status_data[current_contig]) + "\n")
[docs] def write_to_fasta_file(self, output_filename, contig_prefix="", max_chars_per_line=80):
"""
Opens the passed filename and passes to send_to_fasta_handle.
This is a separate function so that unit testing is easier, and
file names or open file handles can be used as destinations.
Args:
output_filename (str): Output filename.
contig_prefix (str): Prefix for all contig names.
max_chars_per_line (int): A positive value will limit the max contig chars per line.
"""
with open(output_filename, 'w') as output_handle:
self.send_to_fasta_handle(output_handle, contig_prefix, max_chars_per_line)
[docs]class Genome(GenomeStatus):
"""
A special type of GenomeStatus where the genome information being stored
is always actual base calls, as strings.
"""
def __init__(self):
"""
Attributes:
_genome is an alias of _status_data, provided for code clarity
when working with an actual genome
"""
GenomeStatus.__init__(self)
self._genome = self._status_data
[docs] def set_call(self, new_data, first_position, missing_range_filler="X", contig_name=None):
""" Alias of set_value, for code clarity """
self.set_value(new_data, first_position, missing_range_filler, contig_name)
[docs] def get_call(self, first_position, last_position=None, contig_name=None, filler_value="X"):
""" Alias of get_value, for code clarity """
return self.get_value( first_position, last_position, contig_name, filler_value)
# NOTE: contig_prefix is unused
def _import_fasta_line(self, line_from_fasta, contig_prefix=""):
"""
Assumes the string passed in is a line from a fasta file, and
populates the genome with the information contained. Not meant
to be called on any data except a full fasta file in order by
line top to bottom.
Args:
line_from_fasta (str): the current line to parse
contig_prefix (str): the prefix will be removed from the parsed contig name
"""
import re
# Parse the contig name discarding the prefix and surrounding whitespace characters
contig_match = re.match(r'^>' + re.escape(contig_prefix) + r'([^\s]+)(?:\s|$)', line_from_fasta)
if contig_match:
self.add_contig(contig_match.group(1))
else:
# Parse the contig sequence discarding trailing whitespace characters
data_match = re.match(r'^([A-Za-z.-]+)\s*$', line_from_fasta)
if data_match:
self.append_contig(list(data_match.group(1)))
# contig_prefix is used by vcf_to_matrix to discard the frankenfasta contig name prefix.
[docs] def import_fasta_file(self, fasta_filename, contig_prefix=""):
""" Read in a fasta file.
Args:
fasta_filename (str): fasta file to import
contig_prefix (str): the prefix will be removed from the parsed contig names
"""
with open(fasta_filename, 'r') as fasta_handle:
for line_from_fasta in fasta_handle:
self._import_fasta_line(line_from_fasta, contig_prefix)
@staticmethod
[docs] def reverse_complement(dna_string):
"""
Args:
dna_string (str): nucleotide sequence to reverse complement
Returns:
string: nucleotide sequence reverse complement
"""
return dna_string.translate(
''.maketrans('ABCDGHMNRSTUVWXYabcdghmnrstuvwxy', 'TVGHCDKNYSAABWXRtvghcdknysaabwxr'))[::-1]
@staticmethod
[docs] def simple_call(dna_string, allow_x=False, allow_del=False):
"""
Standardizes the DNA call assumed to be the base at position one.
Discards insertion data, changes 'U' to 'T', and changes degeneracies to 'N'.
'X' and deletes are changed to 'N' by default.
Args:
dna_string (str): only the first position is considered
allow_x (bool):
allow_del (bool):
Returns:
string: 'A', 'C', 'G', 'T', or 'N' with optional 'X' and '.'
"""
simple_base = 'N'
if len(dna_string) > 0:
simple_base = dna_string[0].upper()
elif allow_del:
simple_base = '.'
if simple_base == 'U':
simple_base = 'T'
if simple_base not in ['A', 'C', 'G', 'T', 'X', '.']:
simple_base = 'N'
if not allow_x and ( simple_base == 'X' ):
simple_base = 'N'
if not allow_del and ( simple_base == '.' ):
simple_base = 'N'
return simple_base
[docs]class IndelList(object):
"""
For storing indel data separately from the reference-indexed position
data. Mostly a relic from when calls were stored as a long string with
one character per position. Might still have some use for indel
implementation, but might be no longer useful.
"""
def __init__(self):
"""
Attributes:
_indels (dict):
"""
self._indels = {}
[docs]class ReferenceGenome(Genome):
"""
A special type of genome that is to be used as our reference.
Unlike other genomes, we know there will only be one, it carries duplicate
region data, and we don't need to store any metadata about it.
"""
def __init__(self):
"""
Attributes:
_dups (GenomeStatus): carries data about whether a particular
region of the reference was found to be very similar to another region
in the same reference.
"""
Genome.__init__(self)
self._dups = GenomeStatus()
[docs] def get_dups_call(self, first_position, last_position=None, contig_name=None):
"""
Args:
first_position (int):
last_position (int):
contig_name (str):
Returns:
list:
"""
return self._dups.get_value(first_position, last_position, contig_name, "?")
def _import_dups_line(self, line_from_dups_file, contig_prefix=""):
"""
Just like importing any other fasta-like file line-by-line, but
specific to duplicate region data.
Args:
line_from_dups_file (str):
contig_prefix (str):
"""
import re
contig_match = re.match(r'^>' + re.escape(contig_prefix) + r'([^\s]+)(?:\s|$)', line_from_dups_file)
if contig_match:
self.add_contig(contig_match.group(1))
self._dups.add_contig(contig_match.group(1))
else:
data_match = re.match(r'^([01-]+)\s*$', line_from_dups_file)
if data_match:
self._dups.append_contig(list(data_match.group(1)))
[docs] def import_dups_file(self, dups_filename, contig_prefix=""):
""" Wrapper for _import_dups_line for flexibility and testing.
Args:
dups_filename (str):
contig_prefix (str):
"""
with open(dups_filename, 'r') as dups_handle:
for line_from_dups_file in dups_handle:
self._import_dups_line(line_from_dups_file, contig_prefix)
[docs]class FastaGenome(Genome, GenomeMeta):
"""
A special type of genome where we know the data came from a fasta file,
and so we can omit the depth and proportion filters. Meant to mimic
a VCFGenome object, with filter checks hard-coded.
"""
def __init__(self):
Genome.__init__(self)
GenomeMeta.__init__(self)
self._indels = IndelList()
# FIXME This data should be put into a real structure when the fasta code is pulled in
[docs] def get_was_called(self, current_pos, contig_name=None):
"""
A stand-in for the was-called filter that just makes sure the position
isn't an "N".
"""
return_value = "N"
call_to_check = self.get_call(current_pos, None, contig_name, "X")
if call_to_check != "X" and call_to_check != "N":
return_value = "Y"
return return_value
[docs] def get_coverage_pass(self, current_pos, contig_name=None):
""" This filter is not applicable. """
return "-"
[docs] def get_proportion_pass(self, current_pos, contig_name=None):
""" This filter is not applicable. """
return "-"
[docs]class VCFGenome(Genome, GenomeMeta):
"""
A standard sample for analysis. Has genome data, metadata, and data for
the three filters.
"""
def __init__(self):
Genome.__init__(self)
GenomeMeta.__init__(self)
self._indels = IndelList()
self._was_called = GenomeStatus()
self._passed_coverage = GenomeStatus()
self._passed_proportion = GenomeStatus()
[docs] def set_was_called(self, pass_value, current_pos, contig_name=None):
self._was_called.set_value(pass_value, current_pos, "N", contig_name)
[docs] def set_coverage_pass(self, pass_value, current_pos, contig_name=None):
self._passed_coverage.set_value(pass_value, current_pos, "?", contig_name)
[docs] def set_proportion_pass(self, pass_value, current_pos, contig_name=None):
self._passed_proportion.set_value(pass_value, current_pos, "?", contig_name)
[docs] def get_was_called(self, current_pos, contig_name=None):
return self._was_called.get_value(current_pos, None, contig_name, "N")
[docs] def get_coverage_pass(self, current_pos, contig_name=None):
return self._passed_coverage.get_value(current_pos, None, contig_name, "?")
[docs] def get_proportion_pass(self, current_pos, contig_name=None):
return self._passed_proportion.get_value(current_pos, None, contig_name, "?")
[docs]class CollectionStatistics(object):
"""
Stores a running tally for the statistics for the run.
Stats are in two categories: per-contig and per-sample.
Contig stats are basic counts, and percentages based on reference length.
Sample stats are tallied as x out of y for each position, and then
counts for each sample, all samples, and any samples, can be computed
automatically.
For this reason, the object needs to know when the run moves on to the
next position, and the flush_cumulative_stat_cache function does this.
"""
def __init__(self):
"""
_cumulative_cache keeps a running tally of sample stats at the
current position, and then writes the results to _sample_stats when
flush_cumulative_stat_cache() is called.
"""
self._contig_stats = {}
self._sample_stats = {}
self._cumulative_cache = {}
def _increment_by_contig(self, stat_id, contig_name):
""" Makes sure the contig stat is defined, then increments it. """
if ( stat_id, contig_name ) not in self._contig_stats:
self._contig_stats[( stat_id, contig_name )] = 0
self._contig_stats[( stat_id, contig_name )] += 1
[docs] def increment_contig_stat(self, stat_id, contig_name=None):
"""
Increments the stat for the specified contig, and automatically
increments the count for the all-contigs tally on the same stat.
"""
self._increment_by_contig(stat_id, contig_name)
if contig_name is not None:
self._increment_by_contig(stat_id, None)
[docs] def get_contig_stat(self, stat_id, contig_name=None):
return_value = 0
if ( stat_id, contig_name ) in self._contig_stats:
return_value = self._contig_stats[( stat_id, contig_name )]
return return_value
def _cache_cumulative_stats(self, stat_id, sample_nickname, did_pass):
"""
Writes data to the stat cache for the current position. stat_type
is either "p" for pass/positive or "t" for total. Flushing the cache
involves checking the p/t ratio. When updating the cache,
failed/negative samples just increment t, while pass/positive samples
increment p and t.
Cumulative stats are across all samples at a position, and then once
each for all sample-analyses on a sample.
"""
if ( stat_id, sample_nickname, 't' ) not in self._cumulative_cache:
self._cumulative_cache[( stat_id, sample_nickname, 't' )] = 0
self._cumulative_cache[( stat_id, sample_nickname, 'p' )] = 0
self._cumulative_cache[( stat_id, sample_nickname, 't' )] += 1
if did_pass:
self._cumulative_cache[( stat_id, sample_nickname, 'p' )] += 1
def _increment_by_sample(self, stat_id, sample_nickname, sample_info, cum_type):
""" Defines if necessary, then increments, the sample stat. """
if ( stat_id, sample_nickname, sample_info, cum_type ) not in self._sample_stats:
self._sample_stats[( stat_id, sample_nickname, sample_info, cum_type )] = 0
self._sample_stats[( stat_id, sample_nickname, sample_info, cum_type )] += 1
[docs] def record_sample_stat(self, stat_id, sample_nickname, sample_identifier, sample_path, did_pass):
"""
Updates the sample stat, and then the cumulative stat cache.
Increments the cache for all samples, and for all analyses
on this sample.
"""
if did_pass:
self._increment_by_sample(stat_id, sample_nickname, ( sample_identifier, sample_path ), None)
self._cache_cumulative_stats(stat_id, sample_nickname, did_pass)
self._cache_cumulative_stats(stat_id, None, did_pass)
[docs] def get_sample_stat(self, stat_id, sample_nickname, sample_identifier, sample_path):
return_value = 0
if ( stat_id, sample_nickname, ( sample_identifier, sample_path ), None ) in self._sample_stats:
return_value = self._sample_stats[( stat_id, sample_nickname, ( sample_identifier, sample_path ), None )]
return return_value
[docs] def get_cumulative_stat(self, stat_id, cum_type, sample_nickname=None):
return_value = 0
if ( stat_id, sample_nickname, None, cum_type ) in self._sample_stats:
return_value = self._sample_stats[( stat_id, sample_nickname, None, cum_type )]
return return_value
[docs] def flush_cumulative_stat_cache(self):
"""
Does the math on the p/t ratio for the stat cache for the current
position, and writes that to the sample stats for the any/all counts.
p > 0: any++
p = t: all++
"""
for ( stat_id, sample_nickname, stat_type ) in self._cumulative_cache:
if stat_type == 'p':
if self._cumulative_cache[( stat_id, sample_nickname, 'p' )] == self._cumulative_cache[
( stat_id, sample_nickname, 't' )]:
self._increment_by_sample(stat_id, sample_nickname, None, 'all')
if self._cumulative_cache[( stat_id, sample_nickname, 'p' )] > 0:
self._increment_by_sample(stat_id, sample_nickname, None, 'any')
self._cumulative_cache = {}
[docs]class GenomeCollection(CollectionStatistics):
"""
A "master matrix" object, of sorts.
Carries all the data necessary to make a matrix, although some of it
is computed on-the-fly as the matrix is actually written, for
performance reasons. Stats aren't available until after the matrix
is written, for this reason.
"""
def __init__(self):
"""
_failed_genomes is just a list of filenames that had errors during
import. Added as blank columns to inform the user.
"""
CollectionStatistics.__init__(self)
self._reference = None
self._genomes = []
self._genome_identifiers = {}
self._failed_genomes = []
@staticmethod
def _get_key(genome):
"""
A wrapper getter for a genome's identifier, for the purpose of passing
informative data to the sort function. This is used to make sample
order predictable and preserved between different runs.
"""
return genome.identifier()
[docs] def set_reference(self, reference):
self._reference = reference
[docs] def reference(self):
return self._reference
[docs] def get_dups_call(self, first_position, last_position=None, contig_name=None):
return self._reference.get_dups_call(first_position, last_position, contig_name)
[docs] def add_genome(self, genome):
"""
Adds the genome to the collection, then makes sure the genome list is
properly set and in order.
"""
self._genomes.append(genome)
genome_nickname = genome.nickname()
if genome_nickname not in self._genome_identifiers:
self._genome_identifiers[genome_nickname] = {}
self._genome_identifiers[genome_nickname][( genome.identifier(), genome.file_path() )] = True
self._genomes.sort(key=GenomeCollection._get_key)
[docs] def add_failed_genome(self, genome_path):
if genome_path not in self._failed_genomes:
self._failed_genomes.append(genome_path)
[docs] def set_current_contig(self, contig_name):
contig_name = self._reference.set_current_contig(contig_name)
for genome in self._genomes:
genome.set_current_contig(contig_name)
return contig_name
[docs] def get_contigs(self):
return self._reference.get_contigs()
# FIXME this function is starting to become a bit of a cluster
def _format_matrix_line(self, current_contig, current_pos, matrix_formats, pattern_data):
"""
A matrix line represents all the sample data at one reference
contig-position.
This function contains a large portion of the ultimate NASP logic that
goes into generating a matrix, like filter application and consensus
checking.
A section below must be executed linearly and single-threaded for all
contig-position-sample-analyses. This is the "expensive loop".
The monolithic nature of this function, its massive size, and the fact
that it makes little distinction between calculating something and
writing something to a file, are all problems that will probably need
to eventually be corrected. I fear, however, that there is a
significant tradeoff with it works / it's fast to run / it was fast to
develop / it's easily maintained / it's beautiful.
"""
all_matrices = []
all_vcfs = []
allcallable_matrices = []
snp_matrices_best = []
snp_matrices_md = []
fasta_pending_data = {}
vcf_pending_data = []
current_pattern = ''
# Key None stores next unused, value 1 reserved for reference
current_pattern_legend = {None: 2}
encountered_calls = []
for matrix_format in matrix_formats:
if matrix_format['dataformat'] == 'matrix':
all_matrices.append(matrix_format)
if matrix_format['filter'] == 'allcallable':
allcallable_matrices.append(matrix_format)
elif matrix_format['filter'] == 'bestsnp' or matrix_format['filter'] == 'includeref':
snp_matrices_best.append(matrix_format)
elif matrix_format['filter'] == 'missingdata':
snp_matrices_md.append(matrix_format)
elif matrix_format['dataformat'] == 'vcf':
all_vcfs.append(matrix_format)
genome_count = len(self._genomes)
failed_genome_count = len(self._failed_genomes)
for matrix_format in all_matrices:
matrix_format['linetowrite'] = "{0}::{1}\t".format(current_contig, str(current_pos))
for matrix_format in all_vcfs:
matrix_format['linetowrite'] = "{0}\t{1}\t.\t".format(current_contig, str(current_pos))
reference_call = self._reference.get_call(current_pos, None, current_contig)
simplified_refcall = Genome.simple_call(reference_call)
fasta_pending_data['Reference'] = simplified_refcall
if simplified_refcall == 'N':
current_pattern += 'N'
else:
current_pattern_legend[simplified_refcall] = 1
current_pattern += '1'
self.increment_contig_stat('reference_length', current_contig)
if simplified_refcall != 'N':
self.increment_contig_stat('reference_clean', current_contig)
for matrix_format in ( all_matrices + all_vcfs ):
matrix_format['linetowrite'] += "{0}\t".format(reference_call)
dups_call = self._reference.get_dups_call(current_pos, None, current_contig)
if dups_call == "1":
dups_call = True
self.increment_contig_stat('reference_duplicated', current_contig)
else:
dups_call = False
call_data = {'A': 0, 'C': 0, 'G': 0, 'T': 0, 'N': 0, 'indel': 0, 'snpcall': 0, 'indelcall': 0, 'refcall': 0,
'callstring': '', 'covstring': '', 'propstring': '', 'called': 0, 'passcov': 0, 'passprop': 0}
consensus_check = {}
# The expensive loop, single threaded and runs for every sample-analysis-contig-position
for genome in self._genomes:
sample_call = genome.get_call(current_pos, None, current_contig, 'X')
simplified_sample_call = Genome.simple_call(sample_call)
call_data[simplified_sample_call] += 1
genome_nickname = genome.nickname()
genome_identifier = genome.identifier()
genome_path = genome.file_path()
was_called = genome.get_was_called(current_pos, current_contig)
call_data['callstring'] += was_called
if was_called == 'Y':
was_called = True
call_data['called'] += 1
else:
was_called = False
self.record_sample_stat('was_called', genome_nickname, genome_identifier, genome_path, was_called)
passed_coverage = genome.get_coverage_pass(current_pos, current_contig)
call_data['covstring'] += passed_coverage
if passed_coverage == 'Y' or passed_coverage == '-':
passed_coverage = True
call_data['passcov'] += 1
else:
passed_coverage = False
self.record_sample_stat('passed_coverage_filter', genome_nickname, genome_identifier, genome_path,
passed_coverage)
passed_proportion = genome.get_proportion_pass(current_pos, current_contig)
call_data['propstring'] += passed_proportion
if passed_proportion == 'Y' or passed_proportion == '-':
passed_proportion = True
call_data['passprop'] += 1
else:
passed_proportion = False
self.record_sample_stat('passed_proportion_filter', genome_nickname, genome_identifier, genome_path,
passed_proportion)
if was_called and passed_coverage and passed_proportion:
if genome_nickname in consensus_check:
if consensus_check[genome_nickname] != simplified_sample_call:
consensus_check[genome_nickname] = 'N'
else:
consensus_check[genome_nickname] = simplified_sample_call
else:
consensus_check[genome_nickname] = 'N'
# FIXME indels
if was_called and passed_coverage and passed_proportion and simplified_refcall != 'N':
if not dups_call:
self.record_sample_stat('quality_breadth', genome_nickname, genome_identifier, genome_path, True)
if simplified_refcall == simplified_sample_call:
call_data['refcall'] += 1
if not dups_call:
self.record_sample_stat('called_reference', genome_nickname, genome_identifier, genome_path,
True)
self.record_sample_stat('called_snp', genome_nickname, genome_identifier, genome_path, False)
self.record_sample_stat('called_indel', genome_nickname, genome_identifier, genome_path, False)
self.record_sample_stat('called_degen', genome_nickname, genome_identifier, genome_path, False)
elif simplified_sample_call != 'N':
call_data['snpcall'] += 1
if not dups_call:
self.record_sample_stat('called_reference', genome_nickname, genome_identifier, genome_path,
False)
self.record_sample_stat('called_snp', genome_nickname, genome_identifier, genome_path, True)
self.record_sample_stat('called_indel', genome_nickname, genome_identifier, genome_path, False)
self.record_sample_stat('called_degen', genome_nickname, genome_identifier, genome_path, False)
else:
if not dups_call:
self.record_sample_stat('called_reference', genome_nickname, genome_identifier, genome_path,
False)
self.record_sample_stat('called_snp', genome_nickname, genome_identifier, genome_path, False)
self.record_sample_stat('called_indel', genome_nickname, genome_identifier, genome_path, False)
self.record_sample_stat('called_degen', genome_nickname, genome_identifier, genome_path, True)
elif simplified_refcall != 'N':
if not dups_call:
self.record_sample_stat('quality_breadth', genome_nickname, genome_identifier, genome_path, False)
for matrix_format in ( allcallable_matrices + snp_matrices_best ):
matrix_format['linetowrite'] += "{0}\t".format(sample_call)
fasta_pending_data[genome_identifier] = simplified_sample_call
vcf_current_data = { 'GT': '.', 'was_called': was_called, 'passed_coverage': passed_coverage, 'passed_proportion': passed_proportion }
if was_called and passed_coverage and passed_proportion and simplified_sample_call != 'N':
if simplified_sample_call not in current_pattern_legend:
current_pattern_legend[simplified_sample_call] = current_pattern_legend[None]
current_pattern_legend[None] += 1
# FIXME indels
encountered_calls.append(simplified_sample_call)
current_pattern += str(current_pattern_legend[simplified_sample_call])
vcf_current_data['GT'] = current_pattern_legend[simplified_sample_call] - 1
for matrix_format in snp_matrices_md:
matrix_format['linetowrite'] += "{0}\t".format(sample_call)
elif not was_called:
current_pattern += 'N'
fasta_pending_data[genome_identifier] = 'N'
for matrix_format in snp_matrices_md:
matrix_format['linetowrite'] += "X\t"
else:
current_pattern += 'N'
fasta_pending_data[genome_identifier] = 'N'
for matrix_format in snp_matrices_md:
matrix_format['linetowrite'] += "N\t"
vcf_pending_data.append(vcf_current_data)
for matrix_format in all_matrices:
matrix_format['linetowrite'] += "\t" * failed_genome_count
for genome_nickname in consensus_check:
if consensus_check[genome_nickname] != 'N':
self.record_sample_stat('consensus', genome_nickname, None, None, True)
else:
self.record_sample_stat('consensus', genome_nickname, None, None, False)
for matrix_format in all_matrices:
matrix_format['linetowrite'] += "{0}\t{1}\t{2}\t".format(str(call_data['snpcall']),
str(call_data['indelcall']),
str(call_data['refcall']))
matrix_format['linetowrite'] += "{0}/{3}\t{1}/{3}\t{2}/{3}\t".format(str(call_data['called']),
str(call_data['passcov']),
str(call_data['passprop']),
str(genome_count))
matrix_format['linetowrite'] += "{0}\t{1}\t{2}\t{3}\t{4}\t{5}\t".format(str(call_data['A']),
str(call_data['C']),
str(call_data['G']),
str(call_data['T']),
str(call_data['indel']),
str(call_data['N']))
matrix_format['linetowrite'] += "{0}\t{1}\t".format(current_contig, str(current_pos))
if 'N' not in consensus_check.values():
consensus_check = True
self.increment_contig_stat('all_passed_consensus', current_contig)
else:
consensus_check = False
if call_data['called'] == genome_count:
self.increment_contig_stat('all_called', current_contig)
if call_data['passcov'] == genome_count:
self.increment_contig_stat('all_passed_coverage', current_contig)
if call_data['passprop'] == genome_count:
self.increment_contig_stat('all_passed_proportion', current_contig)
if consensus_check and not dups_call and call_data['called'] == genome_count and call_data[
'passcov'] == genome_count and call_data['passprop'] == genome_count and call_data['N'] == 0:
self.increment_contig_stat('quality_breadth', current_contig)
if call_data['snpcall'] > 0:
self.increment_contig_stat('best_snps', current_contig)
if not dups_call and call_data['snpcall'] > 0:
self.increment_contig_stat('any_snps', current_contig)
for matrix_format in all_matrices:
matrix_format['linetowrite'] += "{0}\t{1}\t".format(str(dups_call), str(consensus_check))
for matrix_format in ( allcallable_matrices + snp_matrices_md ):
matrix_format['linetowrite'] += "{0}\t{1}\t{2}\t".format(str(call_data['callstring']), str(call_data['covstring']), str(call_data['propstring']))
for matrix_format in all_matrices:
if current_pattern not in pattern_data:
pattern_data[current_pattern] = pattern_data[None]
pattern_data[None] += 1
matrix_format['linetowrite'] += "'{0}'\t{1}\n".format(current_pattern, str(pattern_data[current_pattern]))
for matrix_format in all_vcfs:
if len(encountered_calls) > 0:
matrix_format['linetowrite'] += ",".join(encountered_calls)
else:
matrix_format['linetowrite'] += "."
matrix_format['linetowrite'] += "\t.\tPASS\tAN={0};NS={1}\tGT:FT".format(len(encountered_calls)+1, str(call_data['snpcall']+call_data['indelcall']+call_data['refcall']))
for vcf_current_data in vcf_pending_data:
matrix_format['linetowrite'] += "\t{0}:".format(str(vcf_current_data['GT']))
if not vcf_current_data['was_called']:
matrix_format['linetowrite'] += "NoCall"
elif not vcf_current_data['passed_coverage']:
matrix_format['linetowrite'] += "CovFail"
elif not vcf_current_data['passed_proportion']:
matrix_format['linetowrite'] += "PropFail"
else:
matrix_format['linetowrite'] += "PASS"
matrix_format['linetowrite'] += "\n"
# Determine if a line should be present in a particular matrix
# bestsnp outputs
if call_data['snpcall'] == 0 or call_data['indelcall'] > 0 or call_data['snpcall'] + call_data[
'refcall'] < genome_count or dups_call or not consensus_check:
for matrix_format in matrix_formats:
if matrix_format['filter'] == 'bestsnp':
if matrix_format['dataformat'] == 'matrix' or matrix_format['dataformat'] == 'vcf':
matrix_format['linetowrite'] = None
else:
for matrix_format in matrix_formats:
if matrix_format['dataformat'] == 'fasta' and matrix_format['filter'] == 'bestsnp':
for genome_identifier in fasta_pending_data:
matrix_format['fastadata'].append_contig(fasta_pending_data[genome_identifier],
genome_identifier)
# missing data outputs
if call_data['snpcall'] == 0 or call_data['indelcall'] > 0 or dups_call:
for matrix_format in matrix_formats:
if matrix_format['filter'] == "missingdata":
if matrix_format['dataformat'] == 'matrix' or matrix_format['dataformat'] == 'vcf':
matrix_format['linetowrite'] = None
else:
for matrix_format in matrix_formats:
if matrix_format['dataformat'] == 'fasta' and matrix_format['filter'] == 'missingdata':
for genome_identifier in fasta_pending_data:
matrix_format['fastadata'].append_contig(fasta_pending_data[genome_identifier],
genome_identifier)
# including ref outputs
if call_data['indelcall'] > 0 or call_data['snpcall'] + call_data[
'refcall'] < genome_count or dups_call or not consensus_check:
for matrix_format in matrix_formats:
if matrix_format['filter'] == 'includeref':
if matrix_format['dataformat'] == 'matrix' or matrix_format['dataformat'] == 'vcf':
matrix_format['linetowrite'] = None
else:
for matrix_format in matrix_formats:
if matrix_format['dataformat'] == 'fasta' and matrix_format['filter'] == 'includeref':
for genome_identifier in fasta_pending_data:
matrix_format['fastadata'].append_contig(fasta_pending_data[genome_identifier],
genome_identifier)
self.flush_cumulative_stat_cache()
# NOTE(jtravis): Fix documentation _write_matrix_line() does not exist
[docs] def send_to_matrix_handles(self, matrix_formats):
"""
Writes headers and handles per-matrix logic. Calls _write_matrix_line
to handle the per-line computation and analysis.
"""
for matrix_format in matrix_formats:
if matrix_format['dataformat'] == 'matrix':
matrix_format['handle'].write("LocusID\tReference\t")
for genome in self._genomes:
matrix_format['handle'].write("{0}\t".format(genome.identifier()))
for genome_path in self._failed_genomes:
matrix_format['handle'].write("{0}\t".format(genome_path))
if matrix_format['filter'] == 'bestsnp' or matrix_format['filter'] == 'includeref':
matrix_format['handle'].write(
"#SNPcall\t#Indelcall\t#Refcall\t#CallWasMade\t#PassedDepthFilter\t#PassedProportionFilter\t#A\t#C\t#G\t#T\t#Indel\t#NXdegen\tContig\tPosition\tInDupRegion\tSampleConsensus\tPattern\tPattern#\n")
else:
# must be all callable or missing data
matrix_format['handle'].write(
"#SNPcall\t#Indelcall\t#Refcall\t#CallWasMade\t#PassedDepthFilter\t#PassedProportionFilter\t#A\t#C\t#G\t#T\t#Indel\t#NXdegen\t" +
"Contig\tPosition\tInDupRegion\tSampleConsensus\tCallWasMade\tPassedDepthFilter\tPassedProportionFilter\tPattern\tPattern#\n")
matrix_format['linetowrite'] = ''
elif matrix_format['dataformat'] == 'fasta':
matrix_format['fastadata'] = GenomeStatus()
for genome in self._genomes:
matrix_format['fastadata'].add_contig(genome.identifier())
elif matrix_format['dataformat'] == 'vcf':
matrix_format['handle'].write("##fileFormat=VCFv4.2\n##source=NASPv{0}\n".format(__version__))
for current_contig in self._reference.get_contigs():
matrix_format['handle'].write("##contig=<ID=\"{0}\",length={1}>\n".format(current_contig, self._reference.get_contig_length(current_contig)))
for genome in self._genomes:
matrix_format['handle'].write("##SAMPLE=<ID=\"{0}\",Genomes=\"{0}\",Mixture=1.0>\n".format(genome.identifier()))
matrix_format['handle'].write("##INFO=<ID=NS,Number=1,Type=Integer,Description=\"Number of Samples With Data\">\n")
matrix_format['handle'].write("##FILTER=<ID=NoCall,Description=\"No call for this sample at this position\">\n")
matrix_format['handle'].write("##FILTER=<ID=CovFail,Description=\"Insufficient depth of coverage for this sample at this position\">\n")
matrix_format['handle'].write("##FILTER=<ID=PropFail,Description=\"Insufficient proportion of reads were variant for this sample at this position\">\n")
matrix_format['handle'].write("##FORMAT=<ID=GT,Number=1,Type=String,Description=\"Genotype\">\n")
matrix_format['handle'].write("##FORMAT=<ID=FT,Number=1,Type=String,Description=\"Filters that failed for this sample at this position\">\n")
matrix_format['handle'].write("#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT")
for genome in self._genomes:
matrix_format['handle'].write("\t" + genome.identifier())
matrix_format['handle'].write("\n")
# Key None stores next unused
pattern_data = {None: 1}
for current_contig in self.get_contigs():
for current_pos in range(1, self._reference.get_contig_length(current_contig) + 1):
self._format_matrix_line(current_contig, current_pos, matrix_formats, pattern_data)
for matrix_format in matrix_formats:
if matrix_format['dataformat'] == 'matrix' and matrix_format['linetowrite'] is not None:
matrix_format['handle'].write(matrix_format['linetowrite'])
matrix_format['linetowrite'] = ''
if matrix_format['dataformat'] == 'vcf' and matrix_format['linetowrite'] is not None:
matrix_format['handle'].write(matrix_format['linetowrite'])
matrix_format['linetowrite'] = ''
for matrix_format in matrix_formats:
if matrix_format['dataformat'] == 'fasta':
matrix_format['fastadata'].send_to_fasta_handle(matrix_format['handle'])
[docs] def write_to_matrices(self, matrix_formats):
""" Opens files for writing; abstracted for flexibility/testing. """
for matrix_format in matrix_formats:
matrix_format['handle'] = open(matrix_format['filename'], 'w')
self.send_to_matrix_handles(matrix_formats)
for matrix_format in matrix_formats:
matrix_format['handle'].close()
def _write_general_stats(self, general_handle):
""" Writes finalized general stats to file. """
general_stat_array = ['reference_length', 'reference_clean', 'reference_duplicated', 'all_called',
'all_passed_coverage', 'all_passed_proportion', 'all_passed_consensus', 'quality_breadth',
'any_snps', 'best_snps']
denominator_stat = 'reference_length'
general_handle.write("Contig\t")
for current_stat in general_stat_array:
general_handle.write("{0}\t".format(current_stat))
if current_stat != denominator_stat:
general_handle.write("{0} (%)\t".format(current_stat))
general_handle.write("\n")
general_handle.write("\tstat descriptions go here\n") # FIXME
for current_contig in ( [None] + self.get_contigs() ):
denominator_value = self.get_contig_stat(denominator_stat, current_contig)
if current_contig is None:
general_handle.write("Whole Genome\t")
else:
general_handle.write("{0}\t".format(current_contig))
for current_stat in general_stat_array:
general_handle.write("{0}\t".format(str(self.get_contig_stat(current_stat, current_contig))))
if current_stat != denominator_stat:
general_handle.write(
"%.2f%%\t" % ( self.get_contig_stat(current_stat, current_contig) / denominator_value * 100 ))
general_handle.write("\n")
def _write_sample_stats(self, sample_handle):
"""
Writes finalized sample stats to file.
Includes per-sample counts, any/all counts across each set of
sample-analyses, and any/all counts overall.
"""
sample_stat_array = ['was_called', 'passed_coverage_filter', 'passed_proportion_filter', 'quality_breadth',
'called_reference', 'called_snp', 'called_degen']
denominator_stat = 'reference_length'
denominator_value = self.get_contig_stat(denominator_stat)
sample_handle.write("Sample\tSample::Analysis\t")
for current_stat in sample_stat_array:
sample_handle.write("{0}\t".format(current_stat))
sample_handle.write("{0} (%)\t".format(current_stat))
sample_handle.write("\n")
sample_handle.write("\tstat descriptions go here\n\n") # FIXME
for current_sample in ( [None] + sorted(self._genome_identifiers.keys()) ):
for current_analysis in ['any', 'all']:
if current_sample is not None:
sample_handle.write("{0}\t".format(current_sample))
sample_handle.write("[{0}]\t".format(current_analysis))
if current_sample is None:
sample_handle.write("\t")
for current_stat in sample_stat_array:
sample_handle.write("{0}\t".format(str(self.get_cumulative_stat(current_stat, current_analysis, current_sample))))
if current_stat != denominator_stat:
sample_handle.write("%.2f%%\t" % (self.get_cumulative_stat(current_stat, current_analysis, current_sample) / denominator_value * 100))
sample_handle.write("\n")
if current_sample is not None:
for current_analysis in sorted(self._genome_identifiers[current_sample].keys()):
( sample_identifier, sample_path ) = current_analysis
sample_handle.write("{0}\t{1}\t".format(current_sample, sample_identifier))
for current_stat in sample_stat_array:
sample_handle.write("{0}\t".format(str(self.get_sample_stat(current_stat, current_sample, sample_identifier, sample_path))))
if current_stat != denominator_stat:
sample_handle.write("%.2f%%\t" % (self.get_sample_stat(current_stat, current_sample, sample_identifier, sample_path) / denominator_value * 100))
sample_handle.write("\n")
sample_handle.write("\n")
[docs] def write_to_stats_files(self, general_filename, sample_filename):
""" Opens files for writing; abstracted for flexibility/testing. """
with open(general_filename, 'w') as general_handle, open(sample_filename, 'w') as sample_handle:
self._write_general_stats(general_handle)
self._write_sample_stats(sample_handle)
# print( self._stats._contig_stats )
# print( self._stats._sample_stats )
[docs]class VCFRecord(object):
""" VCF parser, object representing an input VCF being read. """
def __init__(self, file_path):
self._file_path = file_path
self._file_handle = open(self._file_path, 'r')
self._header_list = []
self._sample_list = []
self._current_record = {}
self._get_header_map()
def _get_header_map(self):
"""
Pulls the headers from the file, so that we know which columns mean
what as we read the file in line-by-line.
The header map is expected to start with "#CHROM".
"""
current_line = ''
while current_line[0:6] != "#CHROM":
current_line = self._file_handle.readline()
if current_line == '':
raise MalformedInputFile(self._file_path, "mandatory VCF header not found or not recognized")
header_list = current_line.rstrip()[1:].split("\t")
sample_headers_started = False
for current_header in header_list:
self._header_list.append(current_header)
if sample_headers_started:
self._sample_list.append(current_header)
elif current_header == "FORMAT":
sample_headers_started = True
if not sample_headers_started:
self._sample_list.append('vcf_sample')
# print( self._header_list )
# print( self._sample_list )
def _get_record_alts(self):
"""
Get and parse the list of non-reference calls that might appear in
one or more samples.
"""
self._current_record['alts'] = [self._current_record['global']['REF']]
if self._current_record['global']['ALT'] != '.':
self._current_record['alts'] += self._current_record['global']['ALT'].split(',')
def _get_record_info(self):
self._current_record['info'] = {}
info_strings = self._current_record['global']['INFO'].split(';')
for info_string in info_strings:
info_keyval = info_string.split('=', 1)
if len(info_keyval) > 1:
self._current_record['info'][info_keyval[0]] = info_keyval[1]
else:
self._current_record['info'][info_keyval[0]] = None
def _get_record_samples(self):
if 'FORMAT' in self._current_record['global']:
self._current_record['samples'] = {}
sample_header = self._current_record['global']['FORMAT'].split(':')
for current_sample in self._sample_list:
self._current_record['samples'][current_sample] = dict(
zip(sample_header, self._current_record['global'][current_sample].split(':')))
[docs] def fetch_next_record(self):
"""
We're done with the current line, let's move to the next.
Populates all the information at the position we'll need for parsing
parts of the next line.
"""
current_line = "#"
while current_line[0:1] == "#":
current_line = self._file_handle.readline()
return_value = False
if current_line != '':
record_list = current_line.rstrip().split("\t")
self._current_record['global'] = dict(zip(self._header_list, record_list))
self._get_record_alts()
self._get_record_info()
self._get_record_samples()
return_value = True
# print( self._current_record )
return return_value
[docs] def get_samples(self):
return self._sample_list
[docs] def get_contig(self):
return self._current_record['global']['CHROM']
[docs] def get_position(self):
return int(self._current_record['global']['POS'])
[docs] def get_reference_call(self):
return self._current_record['global']['REF']
[docs] def get_sample_call(self, current_sample):
# FIXME indels
return_value = None
if len(self._current_record['alts']) == 1:
return_value = self._current_record['alts'][0]
elif 'FORMAT' in self._current_record['global']:
if 'GT' in self._current_record['samples'][current_sample]:
alt_number = self._current_record['samples'][current_sample]['GT'].split('/', 1)[0].split('|', 1)[0]
if alt_number.isdigit():
return_value = self._current_record['alts'][int(alt_number)]
# OMG varscan
if len(self._current_record['global']['REF']) > 1 and (
len(self._current_record['global']['REF']) - 1 ) == len(return_value) and \
self._current_record['global']['REF'][:len(return_value)] != return_value and \
self._current_record['global']['REF'][-len(return_value):] == return_value:
return_value = self._current_record['alts'][0]
#elif len(self._current_record['alts']) > 1:
# return_value = self._current_record['alts'][1]
return return_value
[docs] def get_coverage(self, current_sample):
sample_coverage = None
if 'FORMAT' in self._current_record['global'] and 'DP' in self._current_record['samples'][current_sample] and \
self._current_record['samples'][current_sample]['DP'] is not None and \
self._current_record['samples'][current_sample]['DP'].isdigit():
sample_coverage = int(self._current_record['samples'][current_sample]['DP'])
elif 'DP' in self._current_record['info'] and self._current_record['info']['DP'] is not None and \
self._current_record['info']['DP'].isdigit():
sample_coverage = int(self._current_record['info']['DP']) / len(self._sample_list)
elif 'ADP' in self._current_record['info'] and self._current_record['info']['ADP'] is not None and \
self._current_record['info']['ADP'].isdigit():
sample_coverage = int(self._current_record['info']['ADP']) / len(self._sample_list)
# NASP output
elif 'FORMAT' in self._current_record['global'] and 'FT' in self._current_record['samples'][current_sample]:
failed_filters = self._current_record['samples'][current_sample]['FT'].split(',')
if 'CovFail' in failed_filters:
sample_coverage = -1
elif 'PASS' in failed_filters or 'PropFail' in failed_filters:
sample_coverage = 'PASS'
return sample_coverage
[docs] def get_proportion(self, current_sample, sample_coverage, is_a_snp):
sample_proportion = None
if 'FORMAT' in self._current_record['global'] and 'AD' in self._current_record['samples'][current_sample]:
call_depths = self._current_record['samples'][current_sample]['AD'].split(',')
# gatk, reliable and documented
if len(call_depths) > 1:
alt_number = self._current_record['samples'][current_sample]['GT'].split('/', 1)[0].split('|', 1)[0]
if alt_number.isdigit():
sample_proportion = int(call_depths[int(alt_number)]) / sample_coverage
# varscan, reliable and documented
elif is_a_snp:
sample_proportion = int(call_depths[0]) / sample_coverage
elif not is_a_snp and 'RD' in self._current_record['samples'][current_sample]:
sample_proportion = int(self._current_record['samples'][current_sample]['RD']) / sample_coverage
# solsnp, undocumented, no multi-sample support
elif 'AR' in self._current_record['info']:
sample_proportion = float(self._current_record['info']['AR'])
if not is_a_snp:
sample_proportion = 1 - sample_proportion
# samtools, estimate, dubious accuracy
elif 'DP4' in self._current_record['info']:
call_depths = self._current_record['info']['DP4'].split(',')
if is_a_snp:
sample_proportion = ( int(call_depths[2]) + int(call_depths[3]) ) / (
sample_coverage * len(self._sample_list) )
else:
sample_proportion = ( int(call_depths[0]) + int(call_depths[1]) ) / (
sample_coverage * len(self._sample_list) )
# NASP output
elif 'FORMAT' in self._current_record['global'] and 'FT' in self._current_record['samples'][current_sample]:
failed_filters = self._current_record['samples'][current_sample]['FT'].split(',')
if 'PropFail' in failed_filters:
sample_proportion = -1
elif 'PASS' in failed_filters:
sample_proportion = 'PASS'
return sample_proportion
[docs] def get_sample_info(self, current_sample):
sample_info = {'call': self.get_sample_call(current_sample)}
# FIXME indels
if sample_info['call'] is not None and len(sample_info['call']) > 1:
sample_info['call'] = sample_info['call'][0]
sample_info['was_called'] = False
sample_info['is_a_snp'] = False
if sample_info['call'] is not None and sample_info['call'] != 'N':
sample_info['was_called'] = True
if Genome.simple_call(sample_info['call']) != Genome.simple_call(self._current_record['global']['REF']):
sample_info['is_a_snp'] = True
# FIXME indels
sample_info['is_an_insert'] = None
sample_info['is_a_delete'] = None
sample_info['coverage'] = self.get_coverage(current_sample)
sample_info['proportion'] = self.get_proportion(current_sample, sample_info['coverage'],
sample_info['is_a_snp'])
return sample_info
[docs]class InvalidContigName(Exception):
"""
A contig was referenced that is not known to exist in this run.
For the most part, this exception isn't used because encountering
an unknown contig either means we should create it or we should
discard the data that follows. However, there are potential times
where we need to take more extreme action.
"""
def __init__(self, invalid_contig, contig_list):
self._invalid_contig = invalid_contig
self._contig_list = contig_list
def __str__(self):
return "Contig '{0}' not in {1}!".format(self._invalid_contig, str(self._contig_list))
[docs]class ReferenceCallMismatch(Exception):
"""
At two different points in the run, the reference call appeared, and the
two calls disagreed. This probably means the user is analyzing two
different sets of data as if they belong together, in error. This is
a very bad thing.
"""
def __init__(self, old_call, new_call, data_file=None, contig_name=None, position_number=None):
self._old_call = old_call
self._new_call = new_call
self._data_file = data_file
self._contig_name = contig_name
self._position_number = position_number
def __str__(self):
"""
The file, contig, and position are optional. When given in any
combination, what's given will be in the error message.
"""
return_value = "Expected reference call of '{0}' but got '{1}'".format(self._old_call, self._new_call)
if self._data_file is not None:
return_value += " while reading '{0}'".format(self._data_file)
if self._position_number is not None:
return_value += " at position {0}".format(str(self._position_number))
if self._contig_name is not None:
return_value += " on contig '{0}'".format(self._contig_name)
return_value += "!"
return return_value
[docs]def main():
print("This is just a class definition, to be included from other python programs.")
print(" ...You sillyface.")
if __name__ == "__main__":
main()