Spaces:
Running
Running
__all__ = ['Composer', 'ComposerError'] | |
from .error import MarkedYAMLError | |
from .events import * | |
from .nodes import * | |
class ComposerError(MarkedYAMLError): | |
pass | |
class Composer: | |
def __init__(self): | |
self.anchors = {} | |
def check_node(self): | |
# Drop the STREAM-START event. | |
if self.check_event(StreamStartEvent): | |
self.get_event() | |
# If there are more documents available? | |
return not self.check_event(StreamEndEvent) | |
def get_node(self): | |
# Get the root node of the next document. | |
if not self.check_event(StreamEndEvent): | |
return self.compose_document() | |
def get_single_node(self): | |
# Drop the STREAM-START event. | |
self.get_event() | |
# Compose a document if the stream is not empty. | |
document = None | |
if not self.check_event(StreamEndEvent): | |
document = self.compose_document() | |
# Ensure that the stream contains no more documents. | |
if not self.check_event(StreamEndEvent): | |
event = self.get_event() | |
raise ComposerError("expected a single document in the stream", | |
document.start_mark, "but found another document", | |
event.start_mark) | |
# Drop the STREAM-END event. | |
self.get_event() | |
return document | |
def compose_document(self): | |
# Drop the DOCUMENT-START event. | |
self.get_event() | |
# Compose the root node. | |
node = self.compose_node(None, None) | |
# Drop the DOCUMENT-END event. | |
self.get_event() | |
self.anchors = {} | |
return node | |
def compose_node(self, parent, index): | |
if self.check_event(AliasEvent): | |
event = self.get_event() | |
anchor = event.anchor | |
if anchor not in self.anchors: | |
raise ComposerError(None, None, "found undefined alias %r" | |
% anchor, event.start_mark) | |
return self.anchors[anchor] | |
event = self.peek_event() | |
anchor = event.anchor | |
if anchor is not None: | |
if anchor in self.anchors: | |
raise ComposerError("found duplicate anchor %r; first occurrence" | |
% anchor, self.anchors[anchor].start_mark, | |
"second occurrence", event.start_mark) | |
self.descend_resolver(parent, index) | |
if self.check_event(ScalarEvent): | |
node = self.compose_scalar_node(anchor) | |
elif self.check_event(SequenceStartEvent): | |
node = self.compose_sequence_node(anchor) | |
elif self.check_event(MappingStartEvent): | |
node = self.compose_mapping_node(anchor) | |
self.ascend_resolver() | |
return node | |
def compose_scalar_node(self, anchor): | |
event = self.get_event() | |
tag = event.tag | |
if tag is None or tag == '!': | |
tag = self.resolve(ScalarNode, event.value, event.implicit) | |
node = ScalarNode(tag, event.value, | |
event.start_mark, event.end_mark, style=event.style) | |
if anchor is not None: | |
self.anchors[anchor] = node | |
return node | |
def compose_sequence_node(self, anchor): | |
start_event = self.get_event() | |
tag = start_event.tag | |
if tag is None or tag == '!': | |
tag = self.resolve(SequenceNode, None, start_event.implicit) | |
node = SequenceNode(tag, [], | |
start_event.start_mark, None, | |
flow_style=start_event.flow_style) | |
if anchor is not None: | |
self.anchors[anchor] = node | |
index = 0 | |
while not self.check_event(SequenceEndEvent): | |
node.value.append(self.compose_node(node, index)) | |
index += 1 | |
end_event = self.get_event() | |
node.end_mark = end_event.end_mark | |
return node | |
def compose_mapping_node(self, anchor): | |
start_event = self.get_event() | |
tag = start_event.tag | |
if tag is None or tag == '!': | |
tag = self.resolve(MappingNode, None, start_event.implicit) | |
node = MappingNode(tag, [], | |
start_event.start_mark, None, | |
flow_style=start_event.flow_style) | |
if anchor is not None: | |
self.anchors[anchor] = node | |
while not self.check_event(MappingEndEvent): | |
#key_event = self.peek_event() | |
item_key = self.compose_node(node, None) | |
#if item_key in node.value: | |
# raise ComposerError("while composing a mapping", start_event.start_mark, | |
# "found duplicate key", key_event.start_mark) | |
item_value = self.compose_node(node, item_key) | |
#node.value[item_key] = item_value | |
node.value.append((item_key, item_value)) | |
end_event = self.get_event() | |
node.end_mark = end_event.end_mark | |
return node | |