#! /usr/bin/env python
# -*- coding: utf-8 -*-
##############################################################################
## DendroPy Phylogenetic Computing Library.
##
## Copyright 2010-2015 Jeet Sukumaran and Mark T. Holder.
## All rights reserved.
##
## See "LICENSE.rst" for terms and conditions of usage.
##
## If you use this work or any portion thereof in published work,
## please cite it as:
##
## Sukumaran, J. and M. T. Holder. 2010. DendroPy: a Python library
## for phylogenetic computing. Bioinformatics 26: 1569-1571.
##
##############################################################################
"""
Implementation of PHYLIP-format data reader.
"""
import re
from dendropy.dataio import ioservice
from dendropy.utility import filesys
from dendropy.utility import error
class PhylipReader(ioservice.DataReader):
"Implements the DataReader interface for parsing PHYLIP files."
# supported_data_types = ['dna', 'rna', 'protein', 'standard', 'restriction', 'infinite']
# supported_matrix_types = [dataobject.DnaCharacterMatrix,
# dataobject.RnaCharacterMatrix,
# dataobject.ProteinCharacterMatrix,
# dataobject.StandardCharacterMatrix,
# dataobject.RestrictionSitesCharacterMatrix,
# dataobject.InfiniteSitesCharacterMatrix]
class PhylipStrictSequentialError(error.DataParseError):
def __init__(self, *args, **kwargs):
error.DataParseError.__init__(self, *args, **kwargs)
class PhylipStrictInterleavedError(error.DataParseError):
def __init__(self, *args, **kwargs):
error.DataParseError.__init__(self, *args, **kwargs)
class PhylipRelaxedSequentialError(error.DataParseError):
def __init__(self, *args, **kwargs):
error.DataParseError.__init__(self, *args, **kwargs)
class PhylipRelaxedInterleavedError(error.DataParseError):
def __init__(self, *args, **kwargs):
error.DataParseError.__init__(self, *args, **kwargs)
[docs]
def __init__(self, **kwargs):
"""
Keyword Arguments
-----------------
data_type: str
When reading into a |DataSet| object, the type of data must be
specified: "dna", "rna", "protein", "restriction", "infinite",
"standard", or "continuous".
default_state_alphabet: |StateAlphabet| instance
A |StateAlphabet| object to be used to manage the alphabet of the
characters (|StandardCharacterMatrix| **only**).
strict : bool
If |True|, then data is given in 'strict' format, where first 10
characters are the taxon label and remaining characters are the sequence.
Default is |False|: relaxed format, where taxon labels are of
arbitrary length and separation of sequences are is by one or more (if
``multispace_delimiter`` is |False|) or two or more (if
``multispace_delimiter`` is |True|) spaces.
interleaved : bool
If |True|, then data is in interleaved format.
Default is |False|: data is non-interleaved.
multispace_delimiter: bool
If |True| (and ``strict`` is |False|), then at least two spaces are
required to delimit taxon label and associated sequence. Default is
|False|: one or more spaces delimit taxon label and associated
sequence.
underscore_to_spaces: bool
If |True|, then underscores in taxon labels are converted to
spaces. Default is |False|: underscores are not converted.
ignore_invalid_chars : bool
If |True| then any invalid characters in sequences will be ignored.
Default is |False|: invalid characters result in errors.
ignore_unrecognized_keyword_arguments : boolean, default: |False|
If |True|, then unsupported or unrecognized keyword arguments will
not result in an error. Default is |False|: unsupported keyword
arguments will result in an error.
"""
ioservice.DataReader.__init__(self)
self.data_type = kwargs.pop("data_type", None)
# if "char_matrix_type" in kwargs and "data_type" in kwargs:
# raise ValueError("Cannot specify both 'data_type' and 'char_matrix_type'")
# if "data_type" in kwargs:
# data_type = kwargs["data_type"].lower()
# if data_type not in PhylipReader.supported_data_types:
# raise ValueError("'%s' is not a valid data type specification; must be one of: %s" \
# % (", ".join([("'" + d + "'") for d in PhylipReader.supported_data_types])))
# else:
# self.char_matrix_type = dataobject.character_data_type_label_map[data_type]
# elif "char_matrix_type" in kwargs:
# self.char_matrix_type = kwargs.pop("char_matrix_type")
# else:
# raise ValueError("Must specify 'data_type' for PHYLIP format, one of: %s" % (PhylipReader.supported_data_types))
# if self.char_matrix_type not in PhylipReader.supported_matrix_types:
# raise ValueError("'%s' is not a supported data type for PhylipReader" % self.char_matrix_type.__name__)
self.strict = kwargs.pop("strict", False)
self.interleaved = kwargs.pop("interleaved", False)
self.multispace_delimiter = kwargs.pop("multispace_delimiter", False)
self.underscores_to_spaces = kwargs.pop("underscores_to_spaces", False)
self.ignore_invalid_chars = kwargs.pop("ignore_invalid_chars", False)
self.default_state_alphabet = kwargs.pop("default_state_alphabet", None)
if self.default_state_alphabet is not None:
if self.data_type is None:
self.data_type = "standard"
elif self.data_type != "standard":
raise ValueError("Cannot specify 'default_state_alphabet' with data type of '{}'".format(self.data_type))
self.check_for_unused_keyword_arguments(kwargs)
self.ntax = None
self.taxa_processed = None
self.nchar = None
self.char_matrix = None
self.taxon_namespace = None
def describe_mode(self):
parts = []
if self.strict:
parts.append("strict")
else:
parts.append("relaxed")
if self.interleaved:
parts.append("interleaved")
else:
parts.append("sequential")
return ", ".join(parts)
def reset(self):
self.ntax = None
self.nchar = None
self.char_matrix = None
self.taxon_namespace = None
self.stream = None
self.taxa_processed = set()
def _read(self,
stream,
taxon_namespace_factory=None,
tree_list_factory=None,
char_matrix_factory=None,
state_alphabet_factory=None,
global_annotations_target=None):
self.reset()
self.stream = stream
self.taxon_namespace = taxon_namespace_factory(label=None)
if self.data_type is None:
raise TypeError("Data type must be specified for this schema")
if self.data_type == "standard" and self.default_state_alphabet is not None:
self.char_matrix = char_matrix_factory(
self.data_type,
label=None,
taxon_namespace=self.taxon_namespace,
default_state_alphabet=self.default_state_alphabet,
)
else:
self.char_matrix = char_matrix_factory(
self.data_type,
label=None,
taxon_namespace=self.taxon_namespace)
if self.data_type == "standard":
state_alphabet = state_alphabet_factory(
fundamental_states="0123456789",
no_data_symbol="?",
gap_symbol="-",
case_sensitive=False)
self.char_matrix.state_alphabets.append(state_alphabet)
lines = filesys.get_lines(stream)
if len(lines) == 0:
raise error.DataParseError("No data in source", stream=self.stream)
elif len(lines) <= 2:
raise error.DataParseError("Expecting at least 2 lines in PHYLIP format data source", stream=self.stream)
desc_line = lines[0]
lines = lines[1:]
m = re.match(r'\s*(\d+)\s+(\d+)\s*$', desc_line)
if m is None:
raise self._data_parse_error("Invalid data description line: '%s'" % desc_line)
self.ntax = int(m.groups()[0])
self.nchar = int(m.groups()[1])
if self.ntax == 0 or self.nchar == 0:
raise error.DataParseError("No data in source", stream=self.stream)
if self.interleaved:
self._parse_interleaved(lines)
else:
self._parse_sequential(lines)
if len(self.taxa_processed) != self.ntax:
self._taxon_error(num_expected=self.ntax, found=self.taxa_processed)
product = self.Product(
taxon_namespaces=None,
tree_lists=None,
char_matrices=[self.char_matrix])
return product
def _parse_taxon_from_line(self, line, line_index):
if self.strict:
seq_label = line[:10].strip()
line = line[10:]
else:
if self.multispace_delimiter:
parts = re.split('[ \t]{2,}', line, maxsplit=1)
else:
parts = re.split('[ \t]{1,}', line, maxsplit=1)
seq_label = parts[0]
if len(parts) < 2:
line = ''
else:
line = parts[1]
seq_label = seq_label.strip()
if not seq_label:
raise self._data_parse_error("Expecting taxon label", line_index=line_index)
if self.underscores_to_spaces:
seq_label = seq_label.replace('_', ' ')
current_taxon = self.char_matrix.taxon_namespace.require_taxon(label=seq_label)
if current_taxon not in self.char_matrix:
self.char_matrix[current_taxon] = self.char_matrix.new_sequence(taxon=current_taxon)
else:
if len(self.char_matrix[current_taxon]) >= self.nchar:
raise self._data_parse_error("Cannot add characters to sequence for taxon '%s': already has declared number of characters (%d)" \
% (current_taxon.label, self.char_matrix[current_taxon]), line_index=line_index)
self.taxa_processed.add(current_taxon)
if len(self.taxa_processed) > self.ntax:
self._taxon_error(num_expected=self.ntax, found=self.taxa_processed)
return current_taxon, line
def _parse_sequence_from_line(self, current_taxon, line, line_index):
if self.data_type == "continuous":
for c in line.split():
if not c:
continue
try:
state = float(c)
except ValueError:
if not self.ignore_invalid_chars:
raise self._data_parse_error("Invalid state for taxon '%s': '%s'" % (current_taxon.label, c),
line_index=line_index)
else:
self.char_matrix[current_taxon].append(state)
else:
for c in line:
if c in [' ', '\t']:
continue
try:
state = self.char_matrix.default_state_alphabet[c]
except KeyError:
if not self.ignore_invalid_chars:
raise self._data_parse_error("Invalid state symbol for taxon '%s': '%s'" % (current_taxon.label, c),
line_index=line_index)
else:
self.char_matrix[current_taxon].append(state)
def _parse_sequential(self, lines, line_num_start=1):
seq_labels = []
current_taxon = None
for line_index, line in enumerate(lines):
line = line.rstrip()
if line == '':
continue
if current_taxon is None:
seq_label = None
current_taxon, line = self._parse_taxon_from_line(line, line_index)
# if current_taxon not in self.char_matrix and len(self.char_matrix.taxon_namespace) >= self.ntax:
# raise self._data_parse_error("Cannot add new sequence %s: declared number of sequences (%d) already defined" \
# % (current_taxon, len(self.char_matrix.taxon_namespace)), line_index=line_index)
self._parse_sequence_from_line(current_taxon, line, line_index)
if len(self.char_matrix[current_taxon]) >= self.nchar:
current_taxon = None
def _parse_interleaved(self, lines, line_num_start=1):
seq_labels = []
current_taxon = None
paged = False
paged_row = -1
for line_index, line in enumerate(lines):
current_taxon = None
line = line.rstrip()
if line == '':
continue
paged_row += 1
if paged_row >= self.ntax:
paged_row = 0
if paged:
current_taxon = self.char_matrix.taxon_namespace[paged_row]
else:
current_taxon, line = self._parse_taxon_from_line(line, line_index)
if len(self.char_matrix.taxon_namespace) == self.ntax:
paged = True
paged_row = -1
self._parse_sequence_from_line(current_taxon, line, line_index)
def _data_parse_error(self, message, line_index=None):
if line_index is None:
row = None
else:
row = line_index + 2
if self.strict and self.interleaved:
error_type = PhylipReader.PhylipStrictInterleavedError
elif self.strict:
error_type = PhylipReader.PhylipStrictSequentialError
elif self.interleaved:
error_type = PhylipReader.PhylipRelaxedInterleavedError
else:
error_type = PhylipReader.PhylipStrictSequentialError
return error_type(message, line_num=row, stream=self.stream)
def _taxon_error(self, num_expected, found):
if num_expected == 1:
n1 = "taxon"
else:
n1 = "taxa"
if len(found) == 1:
n2 = "taxon"
else:
n2 = "taxa"
if num_expected > len(found):
a = "only "
else:
a = ""
raise error.DataParseError("{} {} expected but {}{} {} found: {}".format(
num_expected,
n1,
a,
len(found),
n2,
", ".join("{}".format(t) for t in found)))