Spaces:
Running
Running
__all__ = ['Serializer', 'SerializerError'] | |
from .error import YAMLError | |
from .events import * | |
from .nodes import * | |
class SerializerError(YAMLError): | |
pass | |
class Serializer: | |
ANCHOR_TEMPLATE = 'id%03d' | |
def __init__(self, encoding=None, | |
explicit_start=None, explicit_end=None, version=None, tags=None): | |
self.use_encoding = encoding | |
self.use_explicit_start = explicit_start | |
self.use_explicit_end = explicit_end | |
self.use_version = version | |
self.use_tags = tags | |
self.serialized_nodes = {} | |
self.anchors = {} | |
self.last_anchor_id = 0 | |
self.closed = None | |
def open(self): | |
if self.closed is None: | |
self.emit(StreamStartEvent(encoding=self.use_encoding)) | |
self.closed = False | |
elif self.closed: | |
raise SerializerError("serializer is closed") | |
else: | |
raise SerializerError("serializer is already opened") | |
def close(self): | |
if self.closed is None: | |
raise SerializerError("serializer is not opened") | |
elif not self.closed: | |
self.emit(StreamEndEvent()) | |
self.closed = True | |
#def __del__(self): | |
# self.close() | |
def serialize(self, node): | |
if self.closed is None: | |
raise SerializerError("serializer is not opened") | |
elif self.closed: | |
raise SerializerError("serializer is closed") | |
self.emit(DocumentStartEvent(explicit=self.use_explicit_start, | |
version=self.use_version, tags=self.use_tags)) | |
self.anchor_node(node) | |
self.serialize_node(node, None, None) | |
self.emit(DocumentEndEvent(explicit=self.use_explicit_end)) | |
self.serialized_nodes = {} | |
self.anchors = {} | |
self.last_anchor_id = 0 | |
def anchor_node(self, node): | |
if node in self.anchors: | |
if self.anchors[node] is None: | |
self.anchors[node] = self.generate_anchor(node) | |
else: | |
self.anchors[node] = None | |
if isinstance(node, SequenceNode): | |
for item in node.value: | |
self.anchor_node(item) | |
elif isinstance(node, MappingNode): | |
for key, value in node.value: | |
self.anchor_node(key) | |
self.anchor_node(value) | |
def generate_anchor(self, node): | |
self.last_anchor_id += 1 | |
return self.ANCHOR_TEMPLATE % self.last_anchor_id | |
def serialize_node(self, node, parent, index): | |
alias = self.anchors[node] | |
if node in self.serialized_nodes: | |
self.emit(AliasEvent(alias)) | |
else: | |
self.serialized_nodes[node] = True | |
self.descend_resolver(parent, index) | |
if isinstance(node, ScalarNode): | |
detected_tag = self.resolve(ScalarNode, node.value, (True, False)) | |
default_tag = self.resolve(ScalarNode, node.value, (False, True)) | |
implicit = (node.tag == detected_tag), (node.tag == default_tag) | |
self.emit(ScalarEvent(alias, node.tag, implicit, node.value, | |
style=node.style)) | |
elif isinstance(node, SequenceNode): | |
implicit = (node.tag | |
== self.resolve(SequenceNode, node.value, True)) | |
self.emit(SequenceStartEvent(alias, node.tag, implicit, | |
flow_style=node.flow_style)) | |
index = 0 | |
for item in node.value: | |
self.serialize_node(item, node, index) | |
index += 1 | |
self.emit(SequenceEndEvent()) | |
elif isinstance(node, MappingNode): | |
implicit = (node.tag | |
== self.resolve(MappingNode, node.value, True)) | |
self.emit(MappingStartEvent(alias, node.tag, implicit, | |
flow_style=node.flow_style)) | |
for key, value in node.value: | |
self.serialize_node(key, node, None) | |
self.serialize_node(value, node, key) | |
self.emit(MappingEndEvent()) | |
self.ascend_resolver() | |