#! /usr/bin/env python
# -*- coding: utf-8 -*-
##############################################################################
## DendroPy Phylogenetic Computing Library.
##
## Copyright 2010-2015 Jeet Sukumaran and Mark T. Holder.
## All rights reserved.
##
## See "LICENSE.rst" for terms and conditions of usage.
##
## If you use this work or any portion thereof in published work,
## please cite it as:
##
## Sukumaran, J. and M. T. Holder. 2010. DendroPy: a Python library
## for phylogenetic computing. Bioinformatics 26: 1569-1571.
##
##############################################################################
"""
Implementation of NEXUS-schema data reader.
"""
from dendropy.utility import error
from dendropy.dataio import ioservice
from dendropy.dataio import nexusprocessing
from dendropy.dataio import newickreader
###############################################################################
## NexusReader
class NexusReader(ioservice.DataReader):
"Encapsulates loading and parsing of a NEXUS schema file."
class BlockTerminatedException(Exception):
pass
class NexusReaderError(error.DataParseError):
def __init__(self, message,
line_num=None,
col_num=None,
stream=None):
error.DataParseError.__init__(self,
message=message,
line_num=line_num,
col_num=col_num,
stream=stream)
class NotNexusFileError(NexusReaderError):
def __init__(self, message,
line_num=None,
col_num=None,
stream=None):
NexusReader.NexusReaderError.__init__(self,
message=message,
line_num=line_num,
col_num=col_num,
stream=stream)
class LinkRequiredError(NexusReaderError):
def __init__(self, message,
line_num=None,
col_num=None,
stream=None):
NexusReader.NexusReaderError.__init__(self,
message=message,
line_num=line_num,
col_num=col_num,
stream=stream)
class NoCharacterBlocksFoundError(NexusReaderError):
def __init__(self, message,
line_num=None,
col_num=None,
stream=None):
NexusReader.NexusReaderError.__init__(self,
message=message,
line_num=line_num,
col_num=col_num,
stream=stream)
class UndefinedBlockError(NexusReaderError):
def __init__(self, message,
line_num=None,
col_num=None,
stream=None):
NexusReader.NexusReaderError.__init__(self,
message=message,
line_num=line_num,
col_num=col_num,
stream=stream)
class MultipleBlockWithSameTitleError(NexusReaderError):
def __init__(self, message,
line_num=None,
col_num=None,
stream=None):
NexusReader.NexusReaderError.__init__(self,
message=message,
line_num=line_num,
col_num=col_num,
stream=stream)
class InvalidCharacterStateSymbolError(NexusReaderError):
def __init__(self, message,
line_num=None,
col_num=None,
stream=None):
NexusReader.NexusReaderError.__init__(self,
message=message,
line_num=line_num,
col_num=col_num,
stream=stream)
class InvalidContinuousCharacterValueError(NexusReaderError):
def __init__(self, message,
line_num=None,
col_num=None,
stream=None):
NexusReader.NexusReaderError.__init__(self,
message=message,
line_num=line_num,
col_num=col_num,
stream=stream)
class TooManyTaxaError(NexusReaderError):
def __init__(self,
taxon_namespace,
max_taxa,
label,
line_num=None,
col_num=None,
stream=None):
message = "Cannot add taxon with label '{}': Declared number of taxa ({}) already defined: {}".format(
label,
max_taxa,
str(["{}".format(t.label) for t in taxon_namespace]))
NexusReader.NexusReaderError.__init__(self,
message=message,
line_num=line_num,
col_num=col_num,
stream=stream)
class UndefinedTaxonError(NexusReaderError):
def __init__(self,
taxon_namespace,
label,
line_num=None,
col_num=None,
stream=None):
message = "Taxon '{}' is not in the set of defined taxa: {}".format(
label,
str(["{}".format(t.label) for t in taxon_namespace]))
NexusReader.NexusReaderError.__init__(self,
message=message,
line_num=line_num,
col_num=col_num,
stream=stream)
class TooManyCharactersError(NexusReaderError):
def __init__(self,
max_characters,
character,
line_num=None,
col_num=None,
stream=None):
message = "Cannot add '{}' to sequence: declared sequence length ({}) will be exceeded".format(
character, max_characters)
NexusReader.NexusReaderError.__init__(self,
message=message,
line_num=line_num,
col_num=col_num,
stream=stream)
class IncompleteBlockError(NexusReaderError):
def __init__(self, message,
line_num=None,
col_num=None,
stream=None):
NexusReader.NexusReaderError.__init__(self,
message=message,
line_num=line_num,
col_num=col_num,
stream=stream)
###########################################################################
## Life-cycle and Setup
[docs]
def __init__(self, **kwargs):
"""
Keyword Arguments
-----------------
rooting : string, {['default-unrooted'], 'default-rooted', 'force-unrooted', 'force-rooted'}
Specifies how trees in the data source should be intepreted with
respect to their rooting:
'default-unrooted' [default]:
All trees are interpreted as unrooted unless a '[&R]'
comment token explicitly specifies them as rooted.
'default-rooted'
All trees are interpreted as rooted unless a '[&U]'
comment token explicitly specifies them as unrooted.
'force-unrooted'
All trees are unconditionally interpreted as unrooted.
'force-rooted'
All trees are unconditionally interpreted as rooted.
edge_length_type : type, default: ``float``
Specifies the type of the edge lengths (``int`` or ``float``). Tokens
interpreted as branch lengths will be cast to this type.
Defaults to ``float``.
suppress_edge_lengths : boolean, default: |False|
If |True|, edge length values will not be processed. If |False|,
edge length values will be processed.
extract_comment_metadata : boolean, default: |True|
If |True| (default), any comments that begin with '&' or '&&' will
be parsed and stored as part of the annotation set of the
corresponding object (accessible through the ``annotations``
attribute of the object). This requires that the comment
contents conform to a particular format (NHX or BEAST: 'field =
value'). If |False|, then the comments will not be parsed,
but will be instead stored directly as elements of the ``comments``
list attribute of the associated object.
store_tree_weights : boolean, default: |False|
If |True|, process the tree weight (e.g. "[&W 1/2]") comment
associated with each tree, if any. Defaults to |False|.
encode_splits : boolean, default: |False|
If |True|, split hash bitmasks will be calculated and attached to
the edges.
finish_node_fn : function object, default: |None|
If specified, this function will be applied to each node after
it has been constructed.
case_sensitive_taxon_labels : boolean, default: |False|
If |True|, then taxon labels are case sensitive (e.g., "P.regius"
and "P.REGIUS" wil be treated as different operation taxonomic
unit concepts). Otherwise, taxon label intepretation will be made
without regard for case.
preserve_underscores : boolean, default: |False|
If |True|, unquoted underscores in labels will *not* converted to
spaces. Defaults to |False|: all underscores not protected by
quotes will be converted to spaces.
suppress_internal_node_taxa : boolean, default: |True|
If |False|, internal node labels will be instantantiated into
|Taxon| objects. If |True|, internal node labels
will *not* be instantantiated as strings.
suppress_leaf_node_taxa : boolean, default: |False|
If |False|, leaf (external) node labels will be instantantiated
into |Taxon| objects. If |True|, leaff (external) node
labels will *not* be instantantiated as strings.
terminating_semicolon_required : boolean, default: |True|
If |True| [default], then a tree statement that does not end in a
semi-colon is an error. If |False|, then no error will be raised.
unconstrained_taxa_accumulation_mode : bool
If |True|, then no error is raised even if the number of taxon
names defined exceeds the number of declared taxa (as specified by
'NTAX'). Defaults to |False|.
automatically_substitute_missing_taxa_blocks : bool
If |True| then, if a taxon namespace is linked to by title but is
not given in the data file, then, if one and exactly one other
taxon namespace has been given in the data file, this taxon
namespace will be used; if there are multiple taxon namespaces,
then if ``automatically_create_missing_taxa_blocks`` is |True| a
new taxon namespace will be created, otherwise an error is raised.
Default is |False|: if a taxon namespace is linked to by title but
is not given in the data file, then an error is raised.
automatically_create_missing_taxa_blocks : bool
If |True| then taxon namespaces linked to by title but not given in
the data file will be automatically created. If |False| taxon
namespaces linked to by title but not given in the data file will
result in error.
exclude_chars : bool
If |False|, then character data will not be read. Defaults to
|True|: character data will be read.
exclude_trees : bool
If |False|, then tree data will not be read. Defaults to
|True|: tree data will be read.
store_ignored_blocks : bool
If |True|, then ignored NEXUS blocks will be stored under the annotation
(NOT attribute!) ``ignored_nexus_blocks''.
To dereference, for e.g.: ``dataset.annotations["ignored_nexus_blocks"]``.
Defaults to |False|: non-character and tree blocks will not be read.
attached_taxon_namespace : |TaxonNamespace|
Unify all operational taxonomic unit definitions in this namespace.
ignore_unrecognized_keyword_arguments : boolean, default: |False|
If |True|, then unsupported or unrecognized keyword arguments will
not result in an error. Default is |False|: unsupported keyword
arguments will result in an error.
"""
# base
ioservice.DataReader.__init__(self)
# Following are NEXUS-parsing specific (i.e., not used by NEWICK
# parsers), and need to be removed so as not to cause problems with our
# keyword validation scheme
self.exclude_chars = kwargs.pop("exclude_chars", False)
self.exclude_trees = kwargs.pop("exclude_trees", False)
self.store_ignored_blocks = kwargs.pop("store_ignored_blocks", False)
self._data_type = kwargs.pop("data_type", "standard")
self.attached_taxon_namespace = kwargs.pop("attached_taxon_namespace", None)
# Following are undocumented for a GOOD reason! They are experimental and subject to change!
self.unconstrained_taxa_accumulation_mode = kwargs.pop("unconstrained_taxa_accumulation_mode", False)
self.automatically_create_missing_taxa_blocks = kwargs.pop("automatically_create_missing_taxa_blocks", False)
self.automatically_substitute_missing_taxa_blocks = kwargs.pop("automatically_substitute_missing_taxa_blocks", False)
# The following are used by NewickReader in addition to NexusReader, So
# they are extracted/set here and then forwarded on ...
self.preserve_underscores = kwargs.get('preserve_underscores', False)
self.case_sensitive_taxon_labels = kwargs.get('case_sensitive_taxon_labels', False)
self.extract_comment_metadata = kwargs.get('extract_comment_metadata', True)
# As above, but the NEXUS format default is different from the NEWICK
# default, so this rather convoluted approach
# self.extract_comment_metadata = kwargs.pop('extract_comment_metadata', True)
# kwargs["extract_comment_metadata"] = self.extract_comment_metadata
# Create newick handler
self.newick_reader = newickreader.NewickReader(**kwargs)
# Set up parsing meta-variables
self._interleave = False
self._symbols = ""
self._gap_char = '-'
self._missing_char = '?'
self._match_char = frozenset('.')
self._file_specified_ntax = None
self._file_specified_nchar = None
self._nexus_tokenizer = None
self._taxon_namespace_factory = None
self._tree_list_factory = None
self._char_matrix_factory = None
self._global_annotations_target = None
self._taxon_namespaces = []
self._char_matrices = []
self._tree_lists = []
self._product = None
self._ignored_blocks = []
###########################################################################
## Reader Implementation
def _read(self,
stream,
taxon_namespace_factory=None,
tree_list_factory=None,
char_matrix_factory=None,
state_alphabet_factory=None,
global_annotations_target=None):
"""
Instantiates and returns a DataSet object based on the
NEXUS-formatted contents given in the file-like object ``stream``.
"""
self._taxon_namespace_factory = taxon_namespace_factory
self._tree_list_factory = tree_list_factory
if self._tree_list_factory is None:
self.exclude_trees = True
self._char_matrix_factory = char_matrix_factory
if self._char_matrix_factory is None:
self.exclude_chars = True
self._state_alphabet_factory = state_alphabet_factory
self._global_annotations_target = global_annotations_target
self._parse_nexus_stream(stream)
self._product = self.Product(
taxon_namespaces=self._taxon_namespaces,
tree_lists=self._tree_lists,
char_matrices=self._char_matrices)
if self._global_annotations_target is not None and self._ignored_blocks:
a = self._global_annotations_target.annotations.find(name="ignored_nexus_blocks")
if a is None:
self._global_annotations_target.annotations.add_new(
name="ignored_nexus_blocks",
value=self._ignored_blocks,
datatype_hint="xsd:list",
)
else:
a.extend(self._ignored_blocks)
return self._product
###########################################################################
## Tokenizer Control
def create_tokenizer(self, stream, **kwargs):
self._nexus_tokenizer = nexusprocessing.NexusTokenizer(
stream, **kwargs)
return self._nexus_tokenizer
def set_stream(self, stream):
return self._nexus_tokenizer.set_stream(stream)
###########################################################################
## Book-keeping Control
def _nexus_error(self, message, error_type=None):
if error_type is None:
error_type = NexusReader.NexusReaderError
e = error_type(
message=message,
line_num=self._nexus_tokenizer.token_line_num,
col_num=self._nexus_tokenizer.token_column_num,
stream=self._nexus_tokenizer.src)
return e
def _too_many_taxa_error(self, taxon_namespace, label):
e = NexusReader.TooManyTaxaError(
taxon_namespace=taxon_namespace,
max_taxa=self._file_specified_ntax,
label=label,
line_num=self._nexus_tokenizer.token_line_num,
col_num=self._nexus_tokenizer.token_column_num,
stream=self._nexus_tokenizer.src)
return e
def _undefined_taxon_error(self, taxon_namespace, label):
e = NexusReader.UndefinedTaxonError(
taxon_namespace=taxon_namespace,
label=label,
line_num=self._nexus_tokenizer.token_line_num,
col_num=self._nexus_tokenizer.token_column_num,
stream=self._nexus_tokenizer.src)
return e
def _too_many_characters_error(self, character):
e = NexusReader.TooManyCharactersError(
max_characters=self._file_specified_nchar,
character=character,
line_num=self._nexus_tokenizer.token_line_num,
col_num=self._nexus_tokenizer.token_column_num,
stream=self._nexus_tokenizer.src)
return e
def _debug_print(self, message=None, out=None):
import sys
if out is None:
out = sys.stdout
if message is None:
message = ""
else:
message = " --- ({})".format(message)
out.write("--- Current Position: Line {}, Column {}; Current token [starting at line {} and column {}]: '{}'{}\n".format(
self._nexus_tokenizer.current_line_num,
self._nexus_tokenizer.current_column_num,
self._nexus_tokenizer.token_line_num,
self._nexus_tokenizer.token_column_num,
self._nexus_tokenizer.current_token,
message))
###########################################################################
## Data Management
def _new_taxon_namespace(self, title=None):
if self.attached_taxon_namespace is not None:
return self.attached_taxon_namespace
taxon_namespace = self._taxon_namespace_factory(label=title)
self._taxon_namespaces.append(taxon_namespace)
return taxon_namespace
def _get_taxon_namespace(self, title=None):
if self.attached_taxon_namespace is not None:
return self.attached_taxon_namespace
if title is None:
if len(self._taxon_namespaces) == 0:
return self._new_taxon_namespace(title=title)
elif len(self._taxon_namespaces) == 1:
return self._taxon_namespaces[0]
else:
raise self._nexus_error("Multiple taxa blocks defined: require 'LINK' statement", NexusReader.LinkRequiredError)
else:
found = []
for tns in self._taxon_namespaces:
if tns.label is not None and tns.label.upper() == title.upper():
found.append(tns)
if len(found) == 0:
if self.automatically_substitute_missing_taxa_blocks:
if len(self._taxon_namespaces) == 1:
return self._taxon_namespaces[0]
elif not self.automatically_create_missing_taxa_blocks:
raise self._nexus_error("Taxa block with title '{}' not found, and multiple taxa blocks are defined for this file: unable to automatically substitute".format(title), NexusReader.UndefinedBlockError)
if self.automatically_create_missing_taxa_blocks:
return self._new_taxon_namespace(title=title)
raise self._nexus_error("Taxa block with title '{}' not found".format(title), NexusReader.UndefinedBlockError)
elif len(found) > 1:
raise self._nexus_error("Multiple taxa blocks with title '{}' defined".format(title), NexusReader.MultipleBlockWithSameTitleError)
return found[0]
def _get_taxon_symbol_mapper(self, taxon_namespace, enable_lookup_by_taxon_number=True):
taxon_symbol_mapper = nexusprocessing.NexusTaxonSymbolMapper(
taxon_namespace=taxon_namespace,
enable_lookup_by_taxon_number=enable_lookup_by_taxon_number,
case_sensitive=self.case_sensitive_taxon_labels)
return taxon_symbol_mapper
def _new_char_matrix(self, data_type, taxon_namespace, title=None):
# if data_type is None:
# data_type = "standard"
char_matrix = self._char_matrix_factory(
data_type,
taxon_namespace=taxon_namespace,
label=title)
self._char_matrices.append(char_matrix)
return char_matrix
def _new_state_alphabet(self, *args, **kwargs):
return self._state_alphabet_factory(*args, **kwargs)
def _get_char_matrix(self, title=None):
if title is None:
if len(self._char_matrices) == 1:
return self._char_matrices[0]
elif len(self._char_matrices) == 0:
raise self._nexus_error("No character matrices defined", NexusReader.NoCharacterBlocksFoundError)
else:
raise self._nexus_error("Multiple character matrices defined: require 'LINK' statement", NexusReader.LinkRequiredError)
else:
found = []
for cm in self._char_matrices:
if cm.label.upper() == title.upper():
found.append(cm)
if len(found) == 0:
raise self._nexus_error("Character block with title '{}' not found".format(title), NexusReader.UndefinedBlockError)
elif len(found) > 1:
raise self._nexus_error("Multiple character blocks with title '{}' defined".format(title), NexusReader.MultipleBlockWithSameTitleError)
return found[0]
def _new_tree_list(self, taxon_namespace, title=None):
tree_list = self._tree_list_factory(
taxon_namespace=taxon_namespace,
label=title)
self._tree_lists.append(tree_list)
return tree_list
def _get_tree_list(self, title=None):
if title is None:
if len(self._tree_lists) == 1:
return self._tree_lists[0]
elif len(self._tree_lists) == 0:
raise self._nexus_error("No tree blocks defined", NexusReader.NoCharacterBlocksFoundError)
else:
raise self._nexus_error("Multiple tree blocks defined: require 'LINK' statement", NexusReader.LinkRequiredError)
else:
found = []
for tlst in self._tree_lists:
if tlst.label.upper() == title.upper():
found.append(tlst)
if len(found) == 0:
raise self._nexus_error("Trees block with title '{}' not found".format(title), NexusReader.UndefinedBlockError)
elif len(found) > 1:
raise self._nexus_error("Multiple trees blocks with title '{}' defined".format(title), NexusReader.MultipleBlockWithSameTitleError)
return found[0]
###########################################################################
## Main Stream Parse Driver
def _parse_nexus_stream(self, stream):
"Main file parsing driver."
if self._nexus_tokenizer is None:
self.create_tokenizer(stream,
preserve_unquoted_underscores=self.preserve_underscores)
else:
self._nexus_tokenizer.set_stream(stream)
token = self._nexus_tokenizer.next_token()
if token.upper() != "#NEXUS":
raise self._nexus_error("Expecting '#NEXUS', but found '{}'".format(token),
NexusReader.NotNexusFileError)
while not self._nexus_tokenizer.is_eof():
token = self._nexus_tokenizer.next_token_ucase()
while token != None and token != 'BEGIN' and not self._nexus_tokenizer.is_eof():
token = self._nexus_tokenizer.next_token_ucase()
self._nexus_tokenizer.process_and_clear_comments_for_item(
self._global_annotations_target,
self.extract_comment_metadata)
token = self._nexus_tokenizer.next_token_ucase()
if token == 'TAXA':
self._parse_taxa_block()
elif token == 'CHARACTERS' or token == 'DATA':
self._parse_characters_data_block()
elif token == 'TREES':
self._parse_trees_block()
elif token in ['SETS', 'ASSUMPTIONS', 'CODONS']:
if not self.exclude_chars:
self._nexus_tokenizer.skip_to_semicolon() # move past BEGIN command
link_title = None
block_title = None
while not (token == 'END' or token == 'ENDBLOCK') \
and not self._nexus_tokenizer.is_eof() \
and not token==None:
token = self._nexus_tokenizer.next_token_ucase()
if token == 'TITLE':
block_title = self._parse_title_statement()
elif token == "LINK":
link_title = self._parse_link_statement().get('characters')
elif token == 'CHARSET':
self._parse_charset_statement(block_title=block_title, link_title=link_title)
elif token == 'BEGIN':
raise self._nexus_error("'BEGIN' found without completion of previous block",
NexusReader.IncompleteBlockError)
self._nexus_tokenizer.skip_to_semicolon() # move past END command
elif token == 'BEGIN':
raise self._nexus_error("'BEGIN' found without completion of previous block",
NexusReader.IncompleteBlockError)
else:
# unknown block
if token is not None and self.store_ignored_blocks:
b = self._read_block_without_processing(token=token)
self._ignored_blocks.append(b)
else:
token = self._consume_to_end_of_block(token)
###########################################################################
## TAXA BLOCK
def _parse_taxa_block(self):
token = ''
self._nexus_tokenizer.allow_eof = False
self._nexus_tokenizer.skip_to_semicolon() # move past BEGIN statement
title = None
taxon_namespace = None
#while not (token == 'END' or token == 'ENDBLOCK') \
# and not self._nexus_tokenizer.is_eof() \
# and not token==None:
while not (token == 'END' or token == 'ENDBLOCK'):
token = self._nexus_tokenizer.next_token_ucase()
if token == "TITLE":
token = self._parse_title_statement()
taxon_namespace = self._new_taxon_namespace(token)
if token == 'DIMENSIONS':
self._parse_dimensions_statement()
if token == 'TAXLABELS':
if taxon_namespace is None:
taxon_namespace = self._new_taxon_namespace()
self._nexus_tokenizer.process_and_clear_comments_for_item(
self._global_annotations_target,
self.extract_comment_metadata)
self._parse_taxlabels_statement(taxon_namespace)
self._nexus_tokenizer.skip_to_semicolon() # move past END statement
self._nexus_tokenizer.allow_eof = True
def _get_taxon(self, taxon_namespace, label):
if not self._file_specified_ntax or len(taxon_namespace) < self._file_specified_ntax:
taxon = taxon_namespace.require_taxon(label=label,
is_case_sensitive=self.case_sensitive_taxon_labels)
else:
taxon = taxon_namespace.get_taxon(label=label,
is_case_sensitive=self.case_sensitive_taxon_labels)
if taxon is None:
raise self._too_many_taxa_error(taxon_namespace=taxon_namespace, label=label)
return taxon
def _parse_taxlabels_statement(self, taxon_namespace=None):
"""
Processes a TAXLABELS command. Assumes that the file reader is
positioned right after the "TAXLABELS" token in a TAXLABELS command.
"""
if taxon_namespace is None:
taxon_namespace = self._get_taxon_namespace()
token = self._nexus_tokenizer.next_token()
# Construct label lookup set
# The get_taxon call is expensive for large taxon namespaces as it requires
# a linear search. This causes significant performance penalties for loading
# very large trees into an empty taxon namespace as each new taxon requires
# a worst case search of the existing namespace before it can be inserted.
# To alleviate this, we build a temporary one-time set of all the labels
# in the taxon namespace. Now we can determine in constant-time whether
# a label token corresponds to a new taxon that requires insertion,
# or if an existing taxon can be fetched with get_taxon.
label_set = set([])
for taxon in taxon_namespace._taxa:
if taxon_namespace.is_case_sensitive:
label_set.add(taxon.label)
else:
label_set.add(taxon.lower_cased_label)
while token != ';':
label = token
# Convert the token to the appropriate case to check against label set
if taxon_namespace.is_case_sensitive:
check_label = label
else:
check_label = label.lower()
if check_label in label_set:
taxon = taxon_namespace.get_taxon(label=label)
else:
if len(taxon_namespace) >= self._file_specified_ntax and not self.attached_taxon_namespace and not self.unconstrained_taxa_accumulation_mode:
raise self._too_many_taxa_error(taxon_namespace=taxon_namespace, label=label)
taxon = taxon_namespace.new_taxon(label=label)
# Add the new label to the label lookup set too
if taxon_namespace.is_case_sensitive:
label_set.add(taxon.label)
else:
label_set.add(taxon.lower_cased_label)
token = self._nexus_tokenizer.next_token()
self._nexus_tokenizer.process_and_clear_comments_for_item(taxon,
self.extract_comment_metadata)
###########################################################################
## LINK/TITLE PARSERS (How Mesquite handles multiple TAXA blocks)
def _parse_title_statement(self):
"""
Processes a MESQUITE 'TITLE' statement.
Assumes current token is 'TITLE'
"""
if self._nexus_tokenizer.cast_current_token_to_ucase() != "TITLE":
raise self._nexus_error("Expecting 'TITLE' token, but instead found '{}'".format(self._nexus_tokenizer.cast_current_token_to_ucase()))
title = self._nexus_tokenizer.require_next_token()
sc = self._nexus_tokenizer.require_next_token()
if sc != ";":
raise self._nexus_error("Expecting ';' token, but instead found '{}'".format(sc))
return title
def _parse_link_statement(self):
"""
Processes a MESQUITE 'LINK' statement.
"""
# TODO: this is now pretty ugly
# need to refactor with more abstraction
links = {}
token = self._nexus_tokenizer.next_token_ucase()
while token != ';':
if token == 'TAXA':
token = self._nexus_tokenizer.next_token()
if token != "=":
raise self._nexus_error("expecting '=' after link taxa")
token = self._nexus_tokenizer.next_token()
links['taxa'] = token
token = self._nexus_tokenizer.next_token()
if token == 'CHARACTERS':
token = self._nexus_tokenizer.next_token()
if token != "=":
raise self._nexus_error("expecting '=' after link characters")
token = self._nexus_tokenizer.next_token()
links['characters'] = token
token = self._nexus_tokenizer.next_token()
if token != ";":
self._nexus_tokenizer.skip_to_semicolon()
return links
###########################################################################
## CHARACTER/DATA BLOCK PARSERS AND SUPPORT
def _parse_characters_data_block(self):
token = self._nexus_tokenizer.cast_current_token_to_ucase()
if token != "CHARACTERS" and token != "DATA":
raise self._nexus_error("Expecting 'CHARACTERS' or 'DATA' token, but instead found '{}'".format(token))
if self.exclude_chars:
self._consume_to_end_of_block(self._nexus_tokenizer.current_token)
return
self._nexus_tokenizer.skip_to_semicolon() # move past BEGIN command
block_title = None
link_title = None
self._data_type = "standard" # set as default
while (token != 'END'
and token != 'ENDBLOCK'
and not self._nexus_tokenizer.is_eof()
and not token==None):
token = self._nexus_tokenizer.next_token_ucase()
if token == 'TITLE':
block_title = self._parse_title_statement()
elif token == "LINK":
link_title = self._parse_link_statement().get('taxa')
elif token == 'DIMENSIONS':
self._parse_dimensions_statement()
elif token == 'FORMAT':
self._parse_format_statement()
elif token == 'MATRIX':
self._parse_matrix_statement(block_title=block_title, link_title=link_title)
elif token == 'BEGIN':
raise self._nexus_error("'BEGIN' found without completion of previous block",
NexusReader.IncompleteBlockError)
# token = self._nexus_tokenizer.cast_current_token_to_ucase()
self._nexus_tokenizer.skip_to_semicolon() # move past END command
def _build_state_alphabet(self, char_block, symbols):
if self._gap_char and self._gap_char in symbols:
symbols = [s for s in symbols if s != self._gap_char]
sa = self._new_state_alphabet(
fundamental_states=symbols,
no_data_symbol=self._missing_char,
gap_symbol=self._gap_char,
case_sensitive=False)
char_block.state_alphabets = [sa]
char_block.default_state_alphabet = char_block.state_alphabets[0]
def _parse_format_statement(self):
"""
Processes a FORMAT command. Assumes that the file reader is
positioned right after the "FORMAT" token in a FORMAT command.
"""
token = self._nexus_tokenizer.require_next_token_ucase()
while token != ';':
if token == 'DATATYPE':
token = self._nexus_tokenizer.require_next_token_ucase()
if token == '=':
token = self._nexus_tokenizer.require_next_token_ucase()
if token == "DNA" or token == "NUCLEOTIDES":
self._data_type = "dna"
elif token == "RNA":
self._data_type = "rna"
elif token == "NUCLEOTIDE":
self._data_type = "nucleotide"
elif token == "PROTEIN":
self._data_type = "protein"
elif token == "CONTINUOUS":
self._data_type = "continuous"
else:
# defaults to STANDARD elif token == "STANDARD":
self._data_type = "standard"
self._symbols = "0123456789"
else:
raise self._nexus_error("Expecting '=' after DATATYPE keyword")
token = self._nexus_tokenizer.require_next_token_ucase()
elif token == 'SYMBOLS':
token = self._nexus_tokenizer.require_next_token_ucase()
if token == '=':
token = self._nexus_tokenizer.require_next_token_ucase()
if token == '"':
self._symbols = ""
token = self._nexus_tokenizer.require_next_token_ucase()
while token != '"':
if token not in self._symbols:
self._symbols = self._symbols + token
token = self._nexus_tokenizer.require_next_token_ucase()
else:
raise self._nexus_error("Expecting '\"' before beginning SYMBOLS list")
else:
raise self._nexus_error("Expecting '=' after SYMBOLS keyword")
token = self._nexus_tokenizer.require_next_token_ucase()
elif token == 'GAP':
token = self._nexus_tokenizer.require_next_token_ucase()
if token == '=':
token = self._nexus_tokenizer.require_next_token_ucase()
self._gap_char = token
else:
raise self._nexus_error("Expecting '=' after GAP keyword")
token = self._nexus_tokenizer.require_next_token_ucase()
elif token == 'INTERLEAVE':
token = self._nexus_tokenizer.require_next_token_ucase()
if token == '=':
token = self._nexus_tokenizer.require_next_token_ucase()
if token.startswith("N"):
self._interleave = False
else:
self._interleave = True
token = self._nexus_tokenizer.require_next_token_ucase()
else:
self._interleave = True
elif token == 'MISSING':
token = self._nexus_tokenizer.require_next_token_ucase()
if token == '=':
token = self._nexus_tokenizer.require_next_token_ucase()
self._missing_char = token
else:
raise self._nexus_error("Expecting '=' after MISSING keyword")
token = self._nexus_tokenizer.require_next_token_ucase()
elif token == 'MATCHCHAR':
token = self._nexus_tokenizer.require_next_token_ucase()
if token == '=':
token = self._nexus_tokenizer.require_next_token_ucase()
self._match_char = frozenset([token, token.lower()])
else:
raise self._nexus_error("Expecting '=' after MISSING keyword")
token = self._nexus_tokenizer.require_next_token_ucase()
elif token == 'BEGIN':
raise self._nexus_error("'BEGIN' found without completion of previous block",
NexusReader.IncompleteBlockError)
else:
token = self._nexus_tokenizer.require_next_token_ucase()
def _parse_dimensions_statement(self):
"""
Processes a DIMENSIONS command. Assumes that the file reader is
positioned right after the "DIMENSIONS" token in a DIMENSIONS command.
"""
token = self._nexus_tokenizer.require_next_token_ucase()
while token != ';':
if token == 'NTAX':
token = self._nexus_tokenizer.require_next_token_ucase()
if token == '=':
token = self._nexus_tokenizer.require_next_token_ucase()
if token.isdigit():
self._file_specified_ntax = int(token)
else:
raise self._nexus_error('Expecting numeric value for NTAX')
else:
raise self._nexus_error("Expecting '=' after NTAX keyword")
elif token == 'NCHAR':
token = self._nexus_tokenizer.require_next_token_ucase()
if token == '=':
token = self._nexus_tokenizer.require_next_token_ucase()
if token.isdigit():
self._file_specified_nchar = int(token)
else:
raise self._nexus_error("Expecting numeric value for NCHAR")
else:
raise self._nexus_error("Expecting '=' after NCHAR keyword")
elif token == 'BEGIN':
raise self._nexus_error("'BEGIN' found without completion of previous block",
NexusReader.IncompleteBlockError)
token = self._nexus_tokenizer.require_next_token_ucase()
def _parse_matrix_statement(self, block_title=None, link_title=None):
"""
Processes a MATRIX command. Assumes that the file reader
is positioned right after the "MATRIX" token in a MATRIX command,
and that NTAX and NCHAR have been specified accurately.
"""
if not self._file_specified_ntax:
raise self._nexus_error('NTAX must be defined by DIMENSIONS command to non-zero value before MATRIX command')
elif not self._file_specified_nchar:
raise self._nexus_error('NCHAR must be defined by DIMENSIONS command to non-zero value before MATRIX command')
taxon_namespace = self._get_taxon_namespace(link_title)
char_block = self._new_char_matrix(
self._data_type,
taxon_namespace=taxon_namespace,
title=block_title)
if self._data_type == "continuous":
self._process_continuous_matrix_data(char_block)
else:
self._process_discrete_matrix_data(char_block)
def _process_continuous_matrix_data(self, char_block):
taxon_namespace = char_block.taxon_namespace
token = self._nexus_tokenizer.next_token()
first_sequence_defined = None
if self._interleave:
try:
while token != ";" and not self._nexus_tokenizer.is_eof():
taxon = self._get_taxon(taxon_namespace=taxon_namespace, label=token)
self._read_continuous_character_values(char_block[taxon])
# if first_sequence_defined is None:
# first_sequence_defined = char_block[taxon]
token = self._nexus_tokenizer.next_token()
except NexusReader.BlockTerminatedException:
token = self._nexus_tokenizer.next_token()
else:
while token != ';' and not self._nexus_tokenizer.is_eof():
taxon = self._get_taxon(taxon_namespace=taxon_namespace, label=token)
self._read_continuous_character_values(char_block[taxon])
# if first_sequence_defined is None:
# first_sequence_defined = char_block[taxon]
if len(char_block[taxon]) < self._file_specified_nchar:
raise self._nexus_error("Insufficient characters given for taxon '{}': expecting {} but only found {} ('{}')".format(taxon.label, self._file_specified_nchar, len(char_block[taxon]), char_block[taxon].symbols_as_string()))
token = self._nexus_tokenizer.next_token()
# if self._interleave:
# raise NotImplementedError("Continuous interleaved characters in NEXUS schema not yet supported")
# taxon_namespace = char_block.taxon_namespace
# token = self._nexus_tokenizer.next_token()
# while token != ';' and not self._nexus_tokenizer.is_eof():
# taxon = self._get_taxon(taxon_namespace=taxon_namespace, label=token)
# while len(char_block[taxon]) < self._file_specified_nchar and not self._nexus_tokenizer.is_eof():
# # char_group = self._nexus_tokenizer.next_token(ignore_punctuation="-+")
# char_group = self._nexus_tokenizer.next_token()
# char_block[taxon].append(dataobject.CharacterDataCell(value=float(char_group)))
# if len(char_block[taxon]) < self._file_specified_nchar:
# raise self._nexus_error("Insufficient characters given for taxon '%s': expecting %d but only found %d ('%s')" \
# % (taxon.label, self._file_specified_nchar, len(char_block[taxon]), char_block[taxon].symbols_as_string()))
# token = self._nexus_tokenizer.next_token()
def _process_discrete_matrix_data(self, char_block):
if self._data_type == "standard":
self._build_state_alphabet(char_block, self._symbols)
taxon_namespace = char_block.taxon_namespace
token = self._nexus_tokenizer.next_token()
state_alphabet = char_block.default_state_alphabet
first_sequence_defined = None
if self._interleave:
try:
while token != ";" and not self._nexus_tokenizer.is_eof():
taxon = self._get_taxon(taxon_namespace=taxon_namespace, label=token)
self._read_character_states(char_block[taxon], state_alphabet, first_sequence_defined)
if first_sequence_defined is None:
first_sequence_defined = char_block[taxon]
token = self._nexus_tokenizer.next_token()
except NexusReader.BlockTerminatedException:
token = self._nexus_tokenizer.next_token()
else:
while token != ';' and not self._nexus_tokenizer.is_eof():
taxon = self._get_taxon(taxon_namespace=taxon_namespace, label=token)
self._read_character_states(char_block[taxon], state_alphabet, first_sequence_defined)
if first_sequence_defined is None:
first_sequence_defined = char_block[taxon]
if len(char_block[taxon]) < self._file_specified_nchar:
raise self._nexus_error("Insufficient characters given for taxon '%s': expecting %d but only found %d ('%s')" \
% (taxon.label, self._file_specified_nchar, len(char_block[taxon]), char_block[taxon].symbols_as_string()))
token = self._nexus_tokenizer.next_token()
def _get_state_for_multistate_tokens(self,
state_char_seq,
multistate_type,
state_alphabet):
try:
state = state_alphabet.match_state(state_char_seq,
state_denomination=multistate_type)
except KeyError:
try:
if multistate_type == state_alphabet.AMBIGUOUS_STATE:
sae = state_alphabet.new_ambiguous_state(
symbol=None,
member_state_symbols=state_char_seq)
else:
sae = state_alphabet.new_polymorphic_state(
symbol=None,
member_state_symbols=state_char_seq)
except KeyError:
raise self._nexus_error("Unrecognized state symbols encountered in multistate sequence: '{}'".format(state_char_seq))
else:
return sae
else:
return state
###########################################################################
## TREE / TREE BLOCK PARSERS
def _parse_tree_statement(self, tree_factory, taxon_symbol_mapper):
"""
Processes a TREE command. Assumes that the file reader is
positioned right after the "TREE" token in a TREE command.
Calls on the NewickStatementParser of the trees module.
"""
token = self._nexus_tokenizer.next_token()
if token == '*':
token = self._nexus_tokenizer.next_token()
tree_name = token
token = self._nexus_tokenizer.next_token()
pre_tree_comments = self._nexus_tokenizer.pull_captured_comments()
if token != '=':
raise self._nexus_error("Expecting '=' in definition of Tree '%s' but found '%s'" % (tree_name, token))
tree_comments = self._nexus_tokenizer.pull_captured_comments()
# advance to '('; comments will be processed by newick reader
self._nexus_tokenizer.next_token()
tree = self._build_tree_from_newick_tree_string(tree_factory, taxon_symbol_mapper)
tree.label = tree_name
nexusprocessing.process_comments_for_item(tree, pre_tree_comments, self.extract_comment_metadata)
nexusprocessing.process_comments_for_item(tree, tree_comments, self.extract_comment_metadata)
# if self.extract_comment_metadata:
# annotations = nexustokenizer.parse_comment_metadata(tree_comments)
# for annote in annotations:
# tree.annotations.add(annote)
# if pre_tree_metadata_comments:
# pre_tree_annotations = nexustokenizer.parse_comment_metadata(pre_tree_metadata_comments)
# for annote in pre_annotations:
# tree.annotations.add(annote)
# if tree_comments is not None and len(tree_comments) > 0:
# tree.comments.extend(tree_comments)
# if self._nexus_tokenizer.current_token != ';':
# self._nexus_tokenizer.skip_to_semicolon()
return tree
def _build_tree_from_newick_tree_string(self, tree_factory, taxon_symbol_mapper):
tree = self.newick_reader._parse_tree_statement(
nexus_tokenizer=self._nexus_tokenizer,
tree_factory=tree_factory,
taxon_symbol_map_fn=taxon_symbol_mapper.require_taxon_for_symbol)
return tree
def _parse_translate_statement(self, taxon_namespace, taxon_symbol_mapper=None):
"""
Processes a TRANSLATE command. Assumes that the file reader is
positioned right after the "TRANSLATE" token in a TRANSLATE command.
"""
token = self._nexus_tokenizer.current_token
if taxon_symbol_mapper is None:
taxon_symbol_mapper = self._get_taxon_symbol_mapper(taxon_namespace=taxon_namespace)
else:
assert taxon_symbol_mapper.taxon_namespace is taxon_namespace
if self._file_specified_ntax is None:
# Not yet parsed TAXA block: NEXUS file without TAXA block
# Badly-formed NEXUS file, yet widely-found in the wild
# Override namespace modification lock
taxon_namespace.is_mutable = True
while True:
translation_token = self._nexus_tokenizer.next_token()
if translation_token == ";" and not self._nexus_tokenizer.is_token_quoted:
raise self._nexus_error("Expecting translation token but found ';' instead")
translation_label = self._nexus_tokenizer.next_token()
try:
taxon = taxon_namespace.require_taxon(label=translation_label)
except error.ImmutableTaxonNamespaceError:
exc = self._undefined_taxon_error(taxon_namespace=taxon_namespace, label=translation_label)
exc.__context__ = None # Python 3.0, 3.1, 3.2
exc.__cause__ = None # Python 3.3, 3.4
raise exc
taxon_symbol_mapper.add_translate_token(translation_token, taxon)
token = self._nexus_tokenizer.next_token() # ","
if (not token) or (token == ';'):
break
if token != ',':
raise self._nexus_error("Expecting ',' in TRANSLATE statement after definition for %s = '%s', but found '%s' instead." % (translation_token, translation_label, token))
return taxon_symbol_mapper
def _parse_trees_block(self):
"""
Expectations:
- current token: "TREES" [part of "BEGIN TREES"]
"""
token = self._nexus_tokenizer.cast_current_token_to_ucase()
if token != "TREES":
raise self._nexus_error("Expecting 'TREES' token, but instead found '{}'".format(token))
if self.exclude_trees:
self._consume_to_end_of_block(self._nexus_tokenizer.current_token)
return
self._nexus_tokenizer.skip_to_semicolon() # move past "BEGIN TREES" command
link_title = None
taxon_namespace = None
taxon_symbol_mapper = None
trees_block = None
block_title = None
# while ((not self._nexus_tokenizer.is_eof())
# and self._nexus_tokenizer.current_token is not None
# and self._nexus_tokenixer.current_token != 'END'
# and self._nexus_tokenixer.current_token != 'ENDBLOCK'):
while ((not self._nexus_tokenizer.is_eof())
and token is not None
and token != 'END'
and token != 'ENDBLOCK'):
token = self._nexus_tokenizer.next_token_ucase()
if token == 'LINK':
link_title = self._parse_link_statement().get("taxa")
elif token == 'TITLE':
block_title = self._parse_title_statement()
token = "" # clear; repopulate at start of loop
elif token == 'TRANSLATE':
if taxon_namespace is None:
taxon_namespace = self._get_taxon_namespace(link_title)
taxon_symbol_mapper = self._parse_translate_statement(taxon_namespace)
token = "" # clear; repopulate at start of loop
elif token == 'TREE':
if taxon_namespace is None:
taxon_namespace = self._get_taxon_namespace(link_title)
if taxon_symbol_mapper is None:
taxon_symbol_mapper = self._get_taxon_symbol_mapper(taxon_namespace=taxon_namespace)
pre_tree_comments = self._nexus_tokenizer.pull_captured_comments()
if trees_block is None:
trees_block = self._new_tree_list(taxon_namespace=taxon_namespace, title=block_title)
# All comments leading up to the first 'TREE' statement assumed
# to belong to the TreeList corresponding to the TREES block
nexusprocessing.process_comments_for_item(
trees_block,
pre_tree_comments,
self.extract_comment_metadata)
tree_factory = trees_block.new_tree
while True:
## After the following, the current token
## will be the token immediately following
## the terminating semi-colon of a tree
## statement. Typically, this will be
## 'TREE' if there is another tree, or
## 'END'/'ENDBLOCK'.
tree = self._parse_tree_statement(
tree_factory=tree_factory,
taxon_symbol_mapper=taxon_symbol_mapper)
if self._nexus_tokenizer.is_eof() or not self._nexus_tokenizer.current_token:
break
if self._nexus_tokenizer.cast_current_token_to_ucase() != "TREE":
token = self._nexus_tokenizer.current_token
break
elif token == 'BEGIN':
raise self._nexus_error("'BEGIN' found without completion of previous block",
NexusReader.IncompleteBlockError)
self._nexus_tokenizer.skip_to_semicolon() # move past END command
def _parse_charset_statement(self, block_title=None, link_title=None):
"""
Parses a character set description. Assumes token stream is positioned right after 'charset' command.
"""
char_matrix = self._get_char_matrix(title=link_title)
keyword = self._nexus_tokenizer.current_token
token = self._nexus_tokenizer.next_token()
if self._nexus_tokenizer.is_eof() or not token:
raise self._nexus_error('Unexpected end of file or null token')
else:
if not token:
raise self._nexus_error("Unexpected end of file or null token")
else:
charset_name = token
token = self._nexus_tokenizer.next_token()
if not token:
raise self._nexus_error("Unexpected end of file or null token")
elif token != '=':
raise self._nexus_error('Expecting "=" after character set name "%s", but instead found "%s"' % (charset_name, token))
else:
positions = self._parse_positions(adjust_to_zero_based=True)
char_matrix.new_character_subset(charset_name, positions)
def _parse_positions(self, adjust_to_zero_based=True, verify=True):
"""
Parses a character position list. Expects next character read to be the first item in a position list.
"""
positions = []
# hyphens_as_tokens = self._nexus_tokenizer.hyphens_as_tokens
# self._nexus_tokenizer.hyphens_as_tokens = True
self._nexus_tokenizer.set_hyphens_as_captured_delimiters(True)
token = self._nexus_tokenizer.next_token()
max_positions = self._file_specified_nchar
if self._nexus_tokenizer.is_eof() or not token:
raise self._nexus_error('Unexpected end of file or null token')
while token != ';' and token != ',' and not self._nexus_tokenizer.is_eof():
if not token:
break
if token.upper() == 'ALL':
positions = range(1, max_positions + 1)
break
elif token.isdigit():
start = int(token)
token = self._nexus_tokenizer.next_token()
if token:
if token == ',' or token.isdigit() or token == ';':
positions.append(start)
elif token == '-':
token = self._nexus_tokenizer.next_token()
if token:
if token.isdigit() or token == '.':
if token == '.':
end = max_positions
#token = self._nexus_tokenizer.next_token()
else:
end = int(token)
#token = self._nexus_tokenizer.next_token()
token = self._nexus_tokenizer.next_token()
if token:
if token == '\\' or token == '/': # (NEXUS standard only accepts '\')
token = self._nexus_tokenizer.next_token()
if token:
if token.isdigit():
step = int(token)
#token = self._nexus_tokenizer.next_token()
else:
raise self._nexus_error('Expecting digit but found "%s".' % (token))
else:
raise self._nexus_error(r'Expecting other tokens after "\", but no more found.')
token = self._nexus_tokenizer.next_token()
else:
step = 1
else:
step = 1
for q in range(start, end+1, step):
if q <= max_positions:
positions.append(q)
else:
raise self._nexus_error('Expecting digit or ".", but found "%s".' % (token))
else:
raise self._nexus_error('Expecting other tokens after "-", but no more found.')
else:
raise self._nexus_error('Expecting digit or "all", but found "%s".' % (token))
else:
positions.append(start)
self._nexus_tokenizer.set_hyphens_as_captured_delimiters(False)
positions = list(set(positions))
positions.sort()
if verify:
for position in positions:
if position > max_positions:
raise self._nexus_error("Specified position %d, but maximum position is %d" % (position, max_positions))
if adjust_to_zero_based:
positions = [position - 1 for position in positions]
return positions # make unique and return
def _consume_to_end_of_block(self, token=None):
if token:
token = token.upper()
else:
token = "DUMMY"
while not (token == 'END' or token == 'ENDBLOCK') \
and not self._nexus_tokenizer.is_eof() \
and not token==None:
self._nexus_tokenizer.skip_to_semicolon()
token = self._nexus_tokenizer.next_token_ucase()
return token
def _read_block_without_processing(self, token=None):
# used for unknown blocks we want to save
# NOT (really) TESTED
# Everybody else except Jeet: (REALLY) DO NOT USE!
# Jeet: SORTA DO NOT USE WITHOUT MORE TESTING
if token:
token = token.upper()
block = ["BEGIN", token]
old_uncaptured_delimiters = self._nexus_tokenizer.uncaptured_delimiters
old_captured_delimiters = self._nexus_tokenizer.captured_delimiters
to_switch = "\n\r"
for ch in to_switch:
self._nexus_tokenizer.uncaptured_delimiters.discard(ch)
self._nexus_tokenizer.captured_delimiters.add(ch)
while not (token == 'END' or token == 'ENDBLOCK') \
and not self._nexus_tokenizer.is_eof() \
and not token==None:
token = self._nexus_tokenizer.require_next_token()
uctoken = token.upper()
if uctoken == "END" or uctoken == "ENDBLOCK":
token = uctoken
block.append(token)
self._nexus_tokenizer.uncaptured_delimiters = old_uncaptured_delimiters
self._nexus_tokenizer.captured_delimiters = old_captured_delimiters
self._nexus_tokenizer.skip_to_semicolon() # move past end
block.append(";")
return " ".join(block)
def _read_character_states(self,
character_data_vector,
state_alphabet,
first_sequence_defined,
):
"""
Reads character sequence data substatement until the number of
character states read is equal to ``self._file_specified_nchar`` (with
multi-state characters, such as '(AG)' counting as a single
state) or, if ``self._interleave`` is |True|, until an EOL is
reached.
Given a sequence of characters, with ambiguities denoted by
`{<STATES>}`, this returns a list of state alphabet elements.
For example, the following sequence:
"ACTG(AC)GGT(CGG)(CG)GG"
will result in a list such as:
[<A>, <C>, <T>, <G>, <AC>, <G>, <G>, <T>, <CGG>, <CG>, <G>, <G>]
where `<.>` is a StateIdentity object with the characters within the
brackets as symbol(s).
"""
if self._interleave:
self._nexus_tokenizer.set_capture_eol(True)
states_to_add = []
while len(character_data_vector) + len(states_to_add) < self._file_specified_nchar:
token = self._nexus_tokenizer.require_next_token()
if token == "{" or token == "(":
if token == "{":
# multistate_type = dataobject.StateIdentity.AMBIGUOUS_STATE
multistate_type = state_alphabet.AMBIGUOUS_STATE
closing_token = "}"
else:
# multistate_type = dataobject.StateIdentity.POLYMORPHIC_STATE
multistate_type = state_alphabet.POLYMORPHIC_STATE
closing_token = ")"
multistate_tokens = []
while True:
token = self._nexus_tokenizer.require_next_token()
if token == closing_token:
break
multistate_tokens.append(token)
c = "".join(multistate_tokens)
state = self._get_state_for_multistate_tokens(c, multistate_type, state_alphabet)
if len(character_data_vector) + len(states_to_add) == self._file_specified_nchar:
raise self._too_many_characters_error(c)
states_to_add.append(state)
elif token == "\r" or token == "\n":
if self._interleave:
break
elif token == ";":
raise NexusReader.BlockTerminatedException
else:
for c in token:
if c in self._match_char:
try:
state = first_sequence_defined[len(character_data_vector) + len(states_to_add)]
except TypeError:
exc = self._nexus_error("Cannot dereference MATCHCHAR '{}' on first sequence".format(c), NexusReader.NexusReaderError)
exc.__context__ = None # Python 3.0, 3.1, 3.2
exc.__cause__ = None # Python 3.3, 3.4
raise exc
except IndexError:
exc = self._nexus_error("Cannot dereference MATCHCHAR '{}': current position ({}) exceeds length of first sequence ({})".format(c,
len(character_data_vector) + len(states_to_add) + 1,
len(first_sequence_defined),
NexusReader.NexusReaderError))
exc.__context__ = None # Python 3.0, 3.1, 3.2
exc.__cause__ = None # Python 3.3, 3.4
raise exc
else:
try:
state = state_alphabet.full_symbol_state_map[c]
except KeyError:
exc = self._nexus_error("Unrecognized character state symbol for state alphabet '{}' ({}) : '{}'".format(
state_alphabet.label,
state_alphabet.__class__.__name__,
c),
NexusReader.InvalidCharacterStateSymbolError)
exc.__context__ = None # Python 3.0, 3.1, 3.2
exc.__cause__ = None # Python 3.3, 3.4
raise exc
if len(character_data_vector) + len(states_to_add) == self._file_specified_nchar:
raise self._too_many_characters_error(c)
states_to_add.append(state)
if self._interleave:
self._nexus_tokenizer.set_capture_eol(False)
character_data_vector.extend(states_to_add)
return character_data_vector
def _read_continuous_character_values(self,
character_data_vector,
datatype=float,
):
"""
Reads character sequence data substatement until the number of
character states read is equal to ``self._file_specified_nchar`` (with
multi-state characters, such as '(AG)' counting as a single
state) or, if ``self._interleave`` is |True|, until an EOL is
reached.
"""
if self._interleave:
self._nexus_tokenizer.set_capture_eol(True)
while len(character_data_vector) < self._file_specified_nchar:
token = self._nexus_tokenizer.require_next_token()
if token == "\r" or token == "\n":
if self._interleave:
break
elif token == ";":
raise NexusReader.BlockTerminatedException
else:
try:
state = float(token)
except ValueError:
exc = self._nexus_error("Invalid value for continuous character type: '{invalid_value}'".format(datatype=datatype, invalid_value=token),
NexusReader.InvalidContinuousCharacterValueError)
exc.__context__ = None # Python 3.0, 3.1, 3.2
exc.__cause__ = None # Python 3.3, 3.4
raise exc
# if c in self._match_char:
# try:
# state = first_sequence_defined[len(character_data_vector)]
# except TypeError:
# exc = self._nexus_error("Cannot dereference MATCHCHAR '{}' on first sequence".format(c), NexusReader.NexusReaderError)
# exc.__context__ = None # Python 3.0, 3.1, 3.2
# exc.__cause__ = None # Python 3.3, 3.4
# raise exc
# except IndexError:
# exc = self._nexus_error("Cannot dereference MATCHCHAR '{}': current position ({}) exceeds length of first sequence ({})".format(c,
# len(character_data_vector)+1,
# len(first_sequence_defined),
# NexusReader.NexusReaderError))
# exc.__context__ = None # Python 3.0, 3.1, 3.2
# exc.__cause__ = None # Python 3.3, 3.4
# raise exc
# else:
# try:
# state = state_alphabet.full_symbol_state_map[c]
# except KeyError:
# exc = self._nexus_error("Unrecognized character state symbol for state alphabet '{}' ({}) : '{}'".format(
# state_alphabet.label,
# state_alphabet.__class__.__name__,
# c),
# NexusReader.InvalidCharacterStateSymbolError)
# exc.__context__ = None # Python 3.0, 3.1, 3.2
# exc.__cause__ = None # Python 3.3, 3.4
# raise exc
if len(character_data_vector) == self._file_specified_nchar:
raise self._too_many_characters_error(token)
character_data_vector.append(state)
if self._interleave:
self._nexus_tokenizer.set_capture_eol(False)
return character_data_vector