comparison planemo/lib/python3.7/site-packages/ruamel/yaml/emitter.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
comparison
equal deleted inserted replaced
0:d30785e31577 1:56ad4e20f292
1 # coding: utf-8
2
3 from __future__ import absolute_import
4 from __future__ import print_function
5
6 # Emitter expects events obeying the following grammar:
7 # stream ::= STREAM-START document* STREAM-END
8 # document ::= DOCUMENT-START node DOCUMENT-END
9 # node ::= SCALAR | sequence | mapping
10 # sequence ::= SEQUENCE-START node* SEQUENCE-END
11 # mapping ::= MAPPING-START (node node)* MAPPING-END
12
13 import sys
14 from ruamel.yaml.error import YAMLError, YAMLStreamError
15 from ruamel.yaml.events import * # NOQA
16
17 # fmt: off
18 from ruamel.yaml.compat import utf8, text_type, PY2, nprint, dbg, DBG_EVENT, \
19 check_anchorname_char
20 # fmt: on
21
22 if False: # MYPY
23 from typing import Any, Dict, List, Union, Text, Tuple, Optional # NOQA
24 from ruamel.yaml.compat import StreamType # NOQA
25
26 __all__ = ['Emitter', 'EmitterError']
27
28
29 class EmitterError(YAMLError):
30 pass
31
32
33 class ScalarAnalysis(object):
34 def __init__(
35 self,
36 scalar,
37 empty,
38 multiline,
39 allow_flow_plain,
40 allow_block_plain,
41 allow_single_quoted,
42 allow_double_quoted,
43 allow_block,
44 ):
45 # type: (Any, Any, Any, bool, bool, bool, bool, bool) -> None
46 self.scalar = scalar
47 self.empty = empty
48 self.multiline = multiline
49 self.allow_flow_plain = allow_flow_plain
50 self.allow_block_plain = allow_block_plain
51 self.allow_single_quoted = allow_single_quoted
52 self.allow_double_quoted = allow_double_quoted
53 self.allow_block = allow_block
54
55
56 class Indents(object):
57 # replacement for the list based stack of None/int
58 def __init__(self):
59 # type: () -> None
60 self.values = [] # type: List[Tuple[int, bool]]
61
62 def append(self, val, seq):
63 # type: (Any, Any) -> None
64 self.values.append((val, seq))
65
66 def pop(self):
67 # type: () -> Any
68 return self.values.pop()[0]
69
70 def last_seq(self):
71 # type: () -> bool
72 # return the seq(uence) value for the element added before the last one
73 # in increase_indent()
74 try:
75 return self.values[-2][1]
76 except IndexError:
77 return False
78
79 def seq_flow_align(self, seq_indent, column):
80 # type: (int, int) -> int
81 # extra spaces because of dash
82 if len(self.values) < 2 or not self.values[-1][1]:
83 return 0
84 # -1 for the dash
85 base = self.values[-1][0] if self.values[-1][0] is not None else 0
86 return base + seq_indent - column - 1
87
88 def __len__(self):
89 # type: () -> int
90 return len(self.values)
91
92
93 class Emitter(object):
94 # fmt: off
95 DEFAULT_TAG_PREFIXES = {
96 u'!': u'!',
97 u'tag:yaml.org,2002:': u'!!',
98 }
99 # fmt: on
100
101 MAX_SIMPLE_KEY_LENGTH = 128
102
103 def __init__(
104 self,
105 stream,
106 canonical=None,
107 indent=None,
108 width=None,
109 allow_unicode=None,
110 line_break=None,
111 block_seq_indent=None,
112 top_level_colon_align=None,
113 prefix_colon=None,
114 brace_single_entry_mapping_in_flow_sequence=None,
115 dumper=None,
116 ):
117 # type: (StreamType, Any, Optional[int], Optional[int], Optional[bool], Any, Optional[int], Optional[bool], Any, Optional[bool], Any) -> None # NOQA
118 self.dumper = dumper
119 if self.dumper is not None and getattr(self.dumper, '_emitter', None) is None:
120 self.dumper._emitter = self
121 self.stream = stream
122
123 # Encoding can be overriden by STREAM-START.
124 self.encoding = None # type: Optional[Text]
125 self.allow_space_break = None
126
127 # Emitter is a state machine with a stack of states to handle nested
128 # structures.
129 self.states = [] # type: List[Any]
130 self.state = self.expect_stream_start # type: Any
131
132 # Current event and the event queue.
133 self.events = [] # type: List[Any]
134 self.event = None # type: Any
135
136 # The current indentation level and the stack of previous indents.
137 self.indents = Indents()
138 self.indent = None # type: Optional[int]
139
140 # flow_context is an expanding/shrinking list consisting of '{' and '['
141 # for each unclosed flow context. If empty list that means block context
142 self.flow_context = [] # type: List[Text]
143
144 # Contexts.
145 self.root_context = False
146 self.sequence_context = False
147 self.mapping_context = False
148 self.simple_key_context = False
149
150 # Characteristics of the last emitted character:
151 # - current position.
152 # - is it a whitespace?
153 # - is it an indention character
154 # (indentation space, '-', '?', or ':')?
155 self.line = 0
156 self.column = 0
157 self.whitespace = True
158 self.indention = True
159 self.compact_seq_seq = True # dash after dash
160 self.compact_seq_map = True # key after dash
161 # self.compact_ms = False # dash after key, only when excplicit key with ?
162 self.no_newline = None # type: Optional[bool] # set if directly after `- `
163
164 # Whether the document requires an explicit document end indicator
165 self.open_ended = False
166
167 # colon handling
168 self.colon = u':'
169 self.prefixed_colon = self.colon if prefix_colon is None else prefix_colon + self.colon
170 # single entry mappings in flow sequence
171 self.brace_single_entry_mapping_in_flow_sequence = (
172 brace_single_entry_mapping_in_flow_sequence
173 ) # NOQA
174
175 # Formatting details.
176 self.canonical = canonical
177 self.allow_unicode = allow_unicode
178 # set to False to get "\Uxxxxxxxx" for non-basic unicode like emojis
179 self.unicode_supplementary = sys.maxunicode > 0xffff
180 self.sequence_dash_offset = block_seq_indent if block_seq_indent else 0
181 self.top_level_colon_align = top_level_colon_align
182 self.best_sequence_indent = 2
183 self.requested_indent = indent # specific for literal zero indent
184 if indent and 1 < indent < 10:
185 self.best_sequence_indent = indent
186 self.best_map_indent = self.best_sequence_indent
187 # if self.best_sequence_indent < self.sequence_dash_offset + 1:
188 # self.best_sequence_indent = self.sequence_dash_offset + 1
189 self.best_width = 80
190 if width and width > self.best_sequence_indent * 2:
191 self.best_width = width
192 self.best_line_break = u'\n' # type: Any
193 if line_break in [u'\r', u'\n', u'\r\n']:
194 self.best_line_break = line_break
195
196 # Tag prefixes.
197 self.tag_prefixes = None # type: Any
198
199 # Prepared anchor and tag.
200 self.prepared_anchor = None # type: Any
201 self.prepared_tag = None # type: Any
202
203 # Scalar analysis and style.
204 self.analysis = None # type: Any
205 self.style = None # type: Any
206
207 @property
208 def stream(self):
209 # type: () -> Any
210 try:
211 return self._stream
212 except AttributeError:
213 raise YAMLStreamError('output stream needs to specified')
214
215 @stream.setter
216 def stream(self, val):
217 # type: (Any) -> None
218 if val is None:
219 return
220 if not hasattr(val, 'write'):
221 raise YAMLStreamError('stream argument needs to have a write() method')
222 self._stream = val
223
224 @property
225 def serializer(self):
226 # type: () -> Any
227 try:
228 if hasattr(self.dumper, 'typ'):
229 return self.dumper.serializer
230 return self.dumper._serializer
231 except AttributeError:
232 return self # cyaml
233
234 @property
235 def flow_level(self):
236 # type: () -> int
237 return len(self.flow_context)
238
239 def dispose(self):
240 # type: () -> None
241 # Reset the state attributes (to clear self-references)
242 self.states = [] # type: List[Any]
243 self.state = None
244
245 def emit(self, event):
246 # type: (Any) -> None
247 if dbg(DBG_EVENT):
248 nprint(event)
249 self.events.append(event)
250 while not self.need_more_events():
251 self.event = self.events.pop(0)
252 self.state()
253 self.event = None
254
255 # In some cases, we wait for a few next events before emitting.
256
257 def need_more_events(self):
258 # type: () -> bool
259 if not self.events:
260 return True
261 event = self.events[0]
262 if isinstance(event, DocumentStartEvent):
263 return self.need_events(1)
264 elif isinstance(event, SequenceStartEvent):
265 return self.need_events(2)
266 elif isinstance(event, MappingStartEvent):
267 return self.need_events(3)
268 else:
269 return False
270
271 def need_events(self, count):
272 # type: (int) -> bool
273 level = 0
274 for event in self.events[1:]:
275 if isinstance(event, (DocumentStartEvent, CollectionStartEvent)):
276 level += 1
277 elif isinstance(event, (DocumentEndEvent, CollectionEndEvent)):
278 level -= 1
279 elif isinstance(event, StreamEndEvent):
280 level = -1
281 if level < 0:
282 return False
283 return len(self.events) < count + 1
284
285 def increase_indent(self, flow=False, sequence=None, indentless=False):
286 # type: (bool, Optional[bool], bool) -> None
287 self.indents.append(self.indent, sequence)
288 if self.indent is None: # top level
289 if flow:
290 # self.indent = self.best_sequence_indent if self.indents.last_seq() else \
291 # self.best_map_indent
292 # self.indent = self.best_sequence_indent
293 self.indent = self.requested_indent
294 else:
295 self.indent = 0
296 elif not indentless:
297 self.indent += (
298 self.best_sequence_indent if self.indents.last_seq() else self.best_map_indent
299 )
300 # if self.indents.last_seq():
301 # if self.indent == 0: # top level block sequence
302 # self.indent = self.best_sequence_indent - self.sequence_dash_offset
303 # else:
304 # self.indent += self.best_sequence_indent
305 # else:
306 # self.indent += self.best_map_indent
307
308 # States.
309
310 # Stream handlers.
311
312 def expect_stream_start(self):
313 # type: () -> None
314 if isinstance(self.event, StreamStartEvent):
315 if PY2:
316 if self.event.encoding and not getattr(self.stream, 'encoding', None):
317 self.encoding = self.event.encoding
318 else:
319 if self.event.encoding and not hasattr(self.stream, 'encoding'):
320 self.encoding = self.event.encoding
321 self.write_stream_start()
322 self.state = self.expect_first_document_start
323 else:
324 raise EmitterError('expected StreamStartEvent, but got %s' % (self.event,))
325
326 def expect_nothing(self):
327 # type: () -> None
328 raise EmitterError('expected nothing, but got %s' % (self.event,))
329
330 # Document handlers.
331
332 def expect_first_document_start(self):
333 # type: () -> Any
334 return self.expect_document_start(first=True)
335
336 def expect_document_start(self, first=False):
337 # type: (bool) -> None
338 if isinstance(self.event, DocumentStartEvent):
339 if (self.event.version or self.event.tags) and self.open_ended:
340 self.write_indicator(u'...', True)
341 self.write_indent()
342 if self.event.version:
343 version_text = self.prepare_version(self.event.version)
344 self.write_version_directive(version_text)
345 self.tag_prefixes = self.DEFAULT_TAG_PREFIXES.copy()
346 if self.event.tags:
347 handles = sorted(self.event.tags.keys())
348 for handle in handles:
349 prefix = self.event.tags[handle]
350 self.tag_prefixes[prefix] = handle
351 handle_text = self.prepare_tag_handle(handle)
352 prefix_text = self.prepare_tag_prefix(prefix)
353 self.write_tag_directive(handle_text, prefix_text)
354 implicit = (
355 first
356 and not self.event.explicit
357 and not self.canonical
358 and not self.event.version
359 and not self.event.tags
360 and not self.check_empty_document()
361 )
362 if not implicit:
363 self.write_indent()
364 self.write_indicator(u'---', True)
365 if self.canonical:
366 self.write_indent()
367 self.state = self.expect_document_root
368 elif isinstance(self.event, StreamEndEvent):
369 if self.open_ended:
370 self.write_indicator(u'...', True)
371 self.write_indent()
372 self.write_stream_end()
373 self.state = self.expect_nothing
374 else:
375 raise EmitterError('expected DocumentStartEvent, but got %s' % (self.event,))
376
377 def expect_document_end(self):
378 # type: () -> None
379 if isinstance(self.event, DocumentEndEvent):
380 self.write_indent()
381 if self.event.explicit:
382 self.write_indicator(u'...', True)
383 self.write_indent()
384 self.flush_stream()
385 self.state = self.expect_document_start
386 else:
387 raise EmitterError('expected DocumentEndEvent, but got %s' % (self.event,))
388
389 def expect_document_root(self):
390 # type: () -> None
391 self.states.append(self.expect_document_end)
392 self.expect_node(root=True)
393
394 # Node handlers.
395
396 def expect_node(self, root=False, sequence=False, mapping=False, simple_key=False):
397 # type: (bool, bool, bool, bool) -> None
398 self.root_context = root
399 self.sequence_context = sequence # not used in PyYAML
400 self.mapping_context = mapping
401 self.simple_key_context = simple_key
402 if isinstance(self.event, AliasEvent):
403 self.expect_alias()
404 elif isinstance(self.event, (ScalarEvent, CollectionStartEvent)):
405 if (
406 self.process_anchor(u'&')
407 and isinstance(self.event, ScalarEvent)
408 and self.sequence_context
409 ):
410 self.sequence_context = False
411 self.process_tag()
412 if isinstance(self.event, ScalarEvent):
413 # nprint('@', self.indention, self.no_newline, self.column)
414 self.expect_scalar()
415 elif isinstance(self.event, SequenceStartEvent):
416 # nprint('@', self.indention, self.no_newline, self.column)
417 i2, n2 = self.indention, self.no_newline # NOQA
418 if self.event.comment:
419 if self.event.flow_style is False and self.event.comment:
420 if self.write_post_comment(self.event):
421 self.indention = False
422 self.no_newline = True
423 if self.write_pre_comment(self.event):
424 self.indention = i2
425 self.no_newline = not self.indention
426 if (
427 self.flow_level
428 or self.canonical
429 or self.event.flow_style
430 or self.check_empty_sequence()
431 ):
432 self.expect_flow_sequence()
433 else:
434 self.expect_block_sequence()
435 elif isinstance(self.event, MappingStartEvent):
436 if self.event.flow_style is False and self.event.comment:
437 self.write_post_comment(self.event)
438 if self.event.comment and self.event.comment[1]:
439 self.write_pre_comment(self.event)
440 if (
441 self.flow_level
442 or self.canonical
443 or self.event.flow_style
444 or self.check_empty_mapping()
445 ):
446 self.expect_flow_mapping(single=self.event.nr_items == 1)
447 else:
448 self.expect_block_mapping()
449 else:
450 raise EmitterError('expected NodeEvent, but got %s' % (self.event,))
451
452 def expect_alias(self):
453 # type: () -> None
454 if self.event.anchor is None:
455 raise EmitterError('anchor is not specified for alias')
456 self.process_anchor(u'*')
457 self.state = self.states.pop()
458
459 def expect_scalar(self):
460 # type: () -> None
461 self.increase_indent(flow=True)
462 self.process_scalar()
463 self.indent = self.indents.pop()
464 self.state = self.states.pop()
465
466 # Flow sequence handlers.
467
468 def expect_flow_sequence(self):
469 # type: () -> None
470 ind = self.indents.seq_flow_align(self.best_sequence_indent, self.column)
471 self.write_indicator(u' ' * ind + u'[', True, whitespace=True)
472 self.increase_indent(flow=True, sequence=True)
473 self.flow_context.append('[')
474 self.state = self.expect_first_flow_sequence_item
475
476 def expect_first_flow_sequence_item(self):
477 # type: () -> None
478 if isinstance(self.event, SequenceEndEvent):
479 self.indent = self.indents.pop()
480 popped = self.flow_context.pop()
481 assert popped == '['
482 self.write_indicator(u']', False)
483 if self.event.comment and self.event.comment[0]:
484 # eol comment on empty flow sequence
485 self.write_post_comment(self.event)
486 elif self.flow_level == 0:
487 self.write_line_break()
488 self.state = self.states.pop()
489 else:
490 if self.canonical or self.column > self.best_width:
491 self.write_indent()
492 self.states.append(self.expect_flow_sequence_item)
493 self.expect_node(sequence=True)
494
495 def expect_flow_sequence_item(self):
496 # type: () -> None
497 if isinstance(self.event, SequenceEndEvent):
498 self.indent = self.indents.pop()
499 popped = self.flow_context.pop()
500 assert popped == '['
501 if self.canonical:
502 self.write_indicator(u',', False)
503 self.write_indent()
504 self.write_indicator(u']', False)
505 if self.event.comment and self.event.comment[0]:
506 # eol comment on flow sequence
507 self.write_post_comment(self.event)
508 else:
509 self.no_newline = False
510 self.state = self.states.pop()
511 else:
512 self.write_indicator(u',', False)
513 if self.canonical or self.column > self.best_width:
514 self.write_indent()
515 self.states.append(self.expect_flow_sequence_item)
516 self.expect_node(sequence=True)
517
518 # Flow mapping handlers.
519
520 def expect_flow_mapping(self, single=False):
521 # type: (Optional[bool]) -> None
522 ind = self.indents.seq_flow_align(self.best_sequence_indent, self.column)
523 map_init = u'{'
524 if (
525 single
526 and self.flow_level
527 and self.flow_context[-1] == '['
528 and not self.canonical
529 and not self.brace_single_entry_mapping_in_flow_sequence
530 ):
531 # single map item with flow context, no curly braces necessary
532 map_init = u''
533 self.write_indicator(u' ' * ind + map_init, True, whitespace=True)
534 self.flow_context.append(map_init)
535 self.increase_indent(flow=True, sequence=False)
536 self.state = self.expect_first_flow_mapping_key
537
538 def expect_first_flow_mapping_key(self):
539 # type: () -> None
540 if isinstance(self.event, MappingEndEvent):
541 self.indent = self.indents.pop()
542 popped = self.flow_context.pop()
543 assert popped == '{' # empty flow mapping
544 self.write_indicator(u'}', False)
545 if self.event.comment and self.event.comment[0]:
546 # eol comment on empty mapping
547 self.write_post_comment(self.event)
548 elif self.flow_level == 0:
549 self.write_line_break()
550 self.state = self.states.pop()
551 else:
552 if self.canonical or self.column > self.best_width:
553 self.write_indent()
554 if not self.canonical and self.check_simple_key():
555 self.states.append(self.expect_flow_mapping_simple_value)
556 self.expect_node(mapping=True, simple_key=True)
557 else:
558 self.write_indicator(u'?', True)
559 self.states.append(self.expect_flow_mapping_value)
560 self.expect_node(mapping=True)
561
562 def expect_flow_mapping_key(self):
563 # type: () -> None
564 if isinstance(self.event, MappingEndEvent):
565 # if self.event.comment and self.event.comment[1]:
566 # self.write_pre_comment(self.event)
567 self.indent = self.indents.pop()
568 popped = self.flow_context.pop()
569 assert popped in [u'{', u'']
570 if self.canonical:
571 self.write_indicator(u',', False)
572 self.write_indent()
573 if popped != u'':
574 self.write_indicator(u'}', False)
575 if self.event.comment and self.event.comment[0]:
576 # eol comment on flow mapping, never reached on empty mappings
577 self.write_post_comment(self.event)
578 else:
579 self.no_newline = False
580 self.state = self.states.pop()
581 else:
582 self.write_indicator(u',', False)
583 if self.canonical or self.column > self.best_width:
584 self.write_indent()
585 if not self.canonical and self.check_simple_key():
586 self.states.append(self.expect_flow_mapping_simple_value)
587 self.expect_node(mapping=True, simple_key=True)
588 else:
589 self.write_indicator(u'?', True)
590 self.states.append(self.expect_flow_mapping_value)
591 self.expect_node(mapping=True)
592
593 def expect_flow_mapping_simple_value(self):
594 # type: () -> None
595 self.write_indicator(self.prefixed_colon, False)
596 self.states.append(self.expect_flow_mapping_key)
597 self.expect_node(mapping=True)
598
599 def expect_flow_mapping_value(self):
600 # type: () -> None
601 if self.canonical or self.column > self.best_width:
602 self.write_indent()
603 self.write_indicator(self.prefixed_colon, True)
604 self.states.append(self.expect_flow_mapping_key)
605 self.expect_node(mapping=True)
606
607 # Block sequence handlers.
608
609 def expect_block_sequence(self):
610 # type: () -> None
611 if self.mapping_context:
612 indentless = not self.indention
613 else:
614 indentless = False
615 if not self.compact_seq_seq and self.column != 0:
616 self.write_line_break()
617 self.increase_indent(flow=False, sequence=True, indentless=indentless)
618 self.state = self.expect_first_block_sequence_item
619
620 def expect_first_block_sequence_item(self):
621 # type: () -> Any
622 return self.expect_block_sequence_item(first=True)
623
624 def expect_block_sequence_item(self, first=False):
625 # type: (bool) -> None
626 if not first and isinstance(self.event, SequenceEndEvent):
627 if self.event.comment and self.event.comment[1]:
628 # final comments on a block list e.g. empty line
629 self.write_pre_comment(self.event)
630 self.indent = self.indents.pop()
631 self.state = self.states.pop()
632 self.no_newline = False
633 else:
634 if self.event.comment and self.event.comment[1]:
635 self.write_pre_comment(self.event)
636 nonl = self.no_newline if self.column == 0 else False
637 self.write_indent()
638 ind = self.sequence_dash_offset # if len(self.indents) > 1 else 0
639 self.write_indicator(u' ' * ind + u'-', True, indention=True)
640 if nonl or self.sequence_dash_offset + 2 > self.best_sequence_indent:
641 self.no_newline = True
642 self.states.append(self.expect_block_sequence_item)
643 self.expect_node(sequence=True)
644
645 # Block mapping handlers.
646
647 def expect_block_mapping(self):
648 # type: () -> None
649 if not self.mapping_context and not (self.compact_seq_map or self.column == 0):
650 self.write_line_break()
651 self.increase_indent(flow=False, sequence=False)
652 self.state = self.expect_first_block_mapping_key
653
654 def expect_first_block_mapping_key(self):
655 # type: () -> None
656 return self.expect_block_mapping_key(first=True)
657
658 def expect_block_mapping_key(self, first=False):
659 # type: (Any) -> None
660 if not first and isinstance(self.event, MappingEndEvent):
661 if self.event.comment and self.event.comment[1]:
662 # final comments from a doc
663 self.write_pre_comment(self.event)
664 self.indent = self.indents.pop()
665 self.state = self.states.pop()
666 else:
667 if self.event.comment and self.event.comment[1]:
668 # final comments from a doc
669 self.write_pre_comment(self.event)
670 self.write_indent()
671 if self.check_simple_key():
672 if not isinstance(
673 self.event, (SequenceStartEvent, MappingStartEvent)
674 ): # sequence keys
675 try:
676 if self.event.style == '?':
677 self.write_indicator(u'?', True, indention=True)
678 except AttributeError: # aliases have no style
679 pass
680 self.states.append(self.expect_block_mapping_simple_value)
681 self.expect_node(mapping=True, simple_key=True)
682 if isinstance(self.event, AliasEvent):
683 self.stream.write(u' ')
684 else:
685 self.write_indicator(u'?', True, indention=True)
686 self.states.append(self.expect_block_mapping_value)
687 self.expect_node(mapping=True)
688
689 def expect_block_mapping_simple_value(self):
690 # type: () -> None
691 if getattr(self.event, 'style', None) != '?':
692 # prefix = u''
693 if self.indent == 0 and self.top_level_colon_align is not None:
694 # write non-prefixed colon
695 c = u' ' * (self.top_level_colon_align - self.column) + self.colon
696 else:
697 c = self.prefixed_colon
698 self.write_indicator(c, False)
699 self.states.append(self.expect_block_mapping_key)
700 self.expect_node(mapping=True)
701
702 def expect_block_mapping_value(self):
703 # type: () -> None
704 self.write_indent()
705 self.write_indicator(self.prefixed_colon, True, indention=True)
706 self.states.append(self.expect_block_mapping_key)
707 self.expect_node(mapping=True)
708
709 # Checkers.
710
711 def check_empty_sequence(self):
712 # type: () -> bool
713 return (
714 isinstance(self.event, SequenceStartEvent)
715 and bool(self.events)
716 and isinstance(self.events[0], SequenceEndEvent)
717 )
718
719 def check_empty_mapping(self):
720 # type: () -> bool
721 return (
722 isinstance(self.event, MappingStartEvent)
723 and bool(self.events)
724 and isinstance(self.events[0], MappingEndEvent)
725 )
726
727 def check_empty_document(self):
728 # type: () -> bool
729 if not isinstance(self.event, DocumentStartEvent) or not self.events:
730 return False
731 event = self.events[0]
732 return (
733 isinstance(event, ScalarEvent)
734 and event.anchor is None
735 and event.tag is None
736 and event.implicit
737 and event.value == ""
738 )
739
740 def check_simple_key(self):
741 # type: () -> bool
742 length = 0
743 if isinstance(self.event, NodeEvent) and self.event.anchor is not None:
744 if self.prepared_anchor is None:
745 self.prepared_anchor = self.prepare_anchor(self.event.anchor)
746 length += len(self.prepared_anchor)
747 if (
748 isinstance(self.event, (ScalarEvent, CollectionStartEvent))
749 and self.event.tag is not None
750 ):
751 if self.prepared_tag is None:
752 self.prepared_tag = self.prepare_tag(self.event.tag)
753 length += len(self.prepared_tag)
754 if isinstance(self.event, ScalarEvent):
755 if self.analysis is None:
756 self.analysis = self.analyze_scalar(self.event.value)
757 length += len(self.analysis.scalar)
758 return length < self.MAX_SIMPLE_KEY_LENGTH and (
759 isinstance(self.event, AliasEvent)
760 or (isinstance(self.event, SequenceStartEvent) and self.event.flow_style is True)
761 or (isinstance(self.event, MappingStartEvent) and self.event.flow_style is True)
762 or (
763 isinstance(self.event, ScalarEvent)
764 and not self.analysis.empty
765 and not self.analysis.multiline
766 )
767 or self.check_empty_sequence()
768 or self.check_empty_mapping()
769 )
770
771 # Anchor, Tag, and Scalar processors.
772
773 def process_anchor(self, indicator):
774 # type: (Any) -> bool
775 if self.event.anchor is None:
776 self.prepared_anchor = None
777 return False
778 if self.prepared_anchor is None:
779 self.prepared_anchor = self.prepare_anchor(self.event.anchor)
780 if self.prepared_anchor:
781 self.write_indicator(indicator + self.prepared_anchor, True)
782 # issue 288
783 self.no_newline = False
784 self.prepared_anchor = None
785 return True
786
787 def process_tag(self):
788 # type: () -> None
789 tag = self.event.tag
790 if isinstance(self.event, ScalarEvent):
791 if self.style is None:
792 self.style = self.choose_scalar_style()
793 if (not self.canonical or tag is None) and (
794 (self.style == "" and self.event.implicit[0])
795 or (self.style != "" and self.event.implicit[1])
796 ):
797 self.prepared_tag = None
798 return
799 if self.event.implicit[0] and tag is None:
800 tag = u'!'
801 self.prepared_tag = None
802 else:
803 if (not self.canonical or tag is None) and self.event.implicit:
804 self.prepared_tag = None
805 return
806 if tag is None:
807 raise EmitterError('tag is not specified')
808 if self.prepared_tag is None:
809 self.prepared_tag = self.prepare_tag(tag)
810 if self.prepared_tag:
811 self.write_indicator(self.prepared_tag, True)
812 if (
813 self.sequence_context
814 and not self.flow_level
815 and isinstance(self.event, ScalarEvent)
816 ):
817 self.no_newline = True
818 self.prepared_tag = None
819
820 def choose_scalar_style(self):
821 # type: () -> Any
822 if self.analysis is None:
823 self.analysis = self.analyze_scalar(self.event.value)
824 if self.event.style == '"' or self.canonical:
825 return '"'
826 if (not self.event.style or self.event.style == '?') and (
827 self.event.implicit[0] or not self.event.implicit[2]
828 ):
829 if not (
830 self.simple_key_context and (self.analysis.empty or self.analysis.multiline)
831 ) and (
832 self.flow_level
833 and self.analysis.allow_flow_plain
834 or (not self.flow_level and self.analysis.allow_block_plain)
835 ):
836 return ""
837 self.analysis.allow_block = True
838 if self.event.style and self.event.style in '|>':
839 if (
840 not self.flow_level
841 and not self.simple_key_context
842 and self.analysis.allow_block
843 ):
844 return self.event.style
845 if not self.event.style and self.analysis.allow_double_quoted:
846 if "'" in self.event.value or '\n' in self.event.value:
847 return '"'
848 if not self.event.style or self.event.style == "'":
849 if self.analysis.allow_single_quoted and not (
850 self.simple_key_context and self.analysis.multiline
851 ):
852 return "'"
853 return '"'
854
855 def process_scalar(self):
856 # type: () -> None
857 if self.analysis is None:
858 self.analysis = self.analyze_scalar(self.event.value)
859 if self.style is None:
860 self.style = self.choose_scalar_style()
861 split = not self.simple_key_context
862 # if self.analysis.multiline and split \
863 # and (not self.style or self.style in '\'\"'):
864 # self.write_indent()
865 # nprint('xx', self.sequence_context, self.flow_level)
866 if self.sequence_context and not self.flow_level:
867 self.write_indent()
868 if self.style == '"':
869 self.write_double_quoted(self.analysis.scalar, split)
870 elif self.style == "'":
871 self.write_single_quoted(self.analysis.scalar, split)
872 elif self.style == '>':
873 self.write_folded(self.analysis.scalar)
874 elif self.style == '|':
875 self.write_literal(self.analysis.scalar, self.event.comment)
876 else:
877 self.write_plain(self.analysis.scalar, split)
878 self.analysis = None
879 self.style = None
880 if self.event.comment:
881 self.write_post_comment(self.event)
882
883 # Analyzers.
884
885 def prepare_version(self, version):
886 # type: (Any) -> Any
887 major, minor = version
888 if major != 1:
889 raise EmitterError('unsupported YAML version: %d.%d' % (major, minor))
890 return u'%d.%d' % (major, minor)
891
892 def prepare_tag_handle(self, handle):
893 # type: (Any) -> Any
894 if not handle:
895 raise EmitterError('tag handle must not be empty')
896 if handle[0] != u'!' or handle[-1] != u'!':
897 raise EmitterError("tag handle must start and end with '!': %r" % (utf8(handle)))
898 for ch in handle[1:-1]:
899 if not (
900 u'0' <= ch <= u'9' or u'A' <= ch <= u'Z' or u'a' <= ch <= u'z' or ch in u'-_'
901 ):
902 raise EmitterError(
903 'invalid character %r in the tag handle: %r' % (utf8(ch), utf8(handle))
904 )
905 return handle
906
907 def prepare_tag_prefix(self, prefix):
908 # type: (Any) -> Any
909 if not prefix:
910 raise EmitterError('tag prefix must not be empty')
911 chunks = [] # type: List[Any]
912 start = end = 0
913 if prefix[0] == u'!':
914 end = 1
915 while end < len(prefix):
916 ch = prefix[end]
917 if (
918 u'0' <= ch <= u'9'
919 or u'A' <= ch <= u'Z'
920 or u'a' <= ch <= u'z'
921 or ch in u"-;/?!:@&=+$,_.~*'()[]"
922 ):
923 end += 1
924 else:
925 if start < end:
926 chunks.append(prefix[start:end])
927 start = end = end + 1
928 data = utf8(ch)
929 for ch in data:
930 chunks.append(u'%%%02X' % ord(ch))
931 if start < end:
932 chunks.append(prefix[start:end])
933 return "".join(chunks)
934
935 def prepare_tag(self, tag):
936 # type: (Any) -> Any
937 if not tag:
938 raise EmitterError('tag must not be empty')
939 if tag == u'!':
940 return tag
941 handle = None
942 suffix = tag
943 prefixes = sorted(self.tag_prefixes.keys())
944 for prefix in prefixes:
945 if tag.startswith(prefix) and (prefix == u'!' or len(prefix) < len(tag)):
946 handle = self.tag_prefixes[prefix]
947 suffix = tag[len(prefix) :]
948 chunks = [] # type: List[Any]
949 start = end = 0
950 while end < len(suffix):
951 ch = suffix[end]
952 if (
953 u'0' <= ch <= u'9'
954 or u'A' <= ch <= u'Z'
955 or u'a' <= ch <= u'z'
956 or ch in u"-;/?:@&=+$,_.~*'()[]"
957 or (ch == u'!' and handle != u'!')
958 ):
959 end += 1
960 else:
961 if start < end:
962 chunks.append(suffix[start:end])
963 start = end = end + 1
964 data = utf8(ch)
965 for ch in data:
966 chunks.append(u'%%%02X' % ord(ch))
967 if start < end:
968 chunks.append(suffix[start:end])
969 suffix_text = "".join(chunks)
970 if handle:
971 return u'%s%s' % (handle, suffix_text)
972 else:
973 return u'!<%s>' % suffix_text
974
975 def prepare_anchor(self, anchor):
976 # type: (Any) -> Any
977 if not anchor:
978 raise EmitterError('anchor must not be empty')
979 for ch in anchor:
980 if not check_anchorname_char(ch):
981 raise EmitterError(
982 'invalid character %r in the anchor: %r' % (utf8(ch), utf8(anchor))
983 )
984 return anchor
985
986 def analyze_scalar(self, scalar):
987 # type: (Any) -> Any
988 # Empty scalar is a special case.
989 if not scalar:
990 return ScalarAnalysis(
991 scalar=scalar,
992 empty=True,
993 multiline=False,
994 allow_flow_plain=False,
995 allow_block_plain=True,
996 allow_single_quoted=True,
997 allow_double_quoted=True,
998 allow_block=False,
999 )
1000
1001 # Indicators and special characters.
1002 block_indicators = False
1003 flow_indicators = False
1004 line_breaks = False
1005 special_characters = False
1006
1007 # Important whitespace combinations.
1008 leading_space = False
1009 leading_break = False
1010 trailing_space = False
1011 trailing_break = False
1012 break_space = False
1013 space_break = False
1014
1015 # Check document indicators.
1016 if scalar.startswith(u'---') or scalar.startswith(u'...'):
1017 block_indicators = True
1018 flow_indicators = True
1019
1020 # First character or preceded by a whitespace.
1021 preceeded_by_whitespace = True
1022
1023 # Last character or followed by a whitespace.
1024 followed_by_whitespace = len(scalar) == 1 or scalar[1] in u'\0 \t\r\n\x85\u2028\u2029'
1025
1026 # The previous character is a space.
1027 previous_space = False
1028
1029 # The previous character is a break.
1030 previous_break = False
1031
1032 index = 0
1033 while index < len(scalar):
1034 ch = scalar[index]
1035
1036 # Check for indicators.
1037 if index == 0:
1038 # Leading indicators are special characters.
1039 if ch in u'#,[]{}&*!|>\'"%@`':
1040 flow_indicators = True
1041 block_indicators = True
1042 if ch in u'?:': # ToDo
1043 if self.serializer.use_version == (1, 1):
1044 flow_indicators = True
1045 elif len(scalar) == 1: # single character
1046 flow_indicators = True
1047 if followed_by_whitespace:
1048 block_indicators = True
1049 if ch == u'-' and followed_by_whitespace:
1050 flow_indicators = True
1051 block_indicators = True
1052 else:
1053 # Some indicators cannot appear within a scalar as well.
1054 if ch in u',[]{}': # http://yaml.org/spec/1.2/spec.html#id2788859
1055 flow_indicators = True
1056 if ch == u'?' and self.serializer.use_version == (1, 1):
1057 flow_indicators = True
1058 if ch == u':':
1059 if followed_by_whitespace:
1060 flow_indicators = True
1061 block_indicators = True
1062 if ch == u'#' and preceeded_by_whitespace:
1063 flow_indicators = True
1064 block_indicators = True
1065
1066 # Check for line breaks, special, and unicode characters.
1067 if ch in u'\n\x85\u2028\u2029':
1068 line_breaks = True
1069 if not (ch == u'\n' or u'\x20' <= ch <= u'\x7E'):
1070 if (
1071 ch == u'\x85'
1072 or u'\xA0' <= ch <= u'\uD7FF'
1073 or u'\uE000' <= ch <= u'\uFFFD'
1074 or (self.unicode_supplementary and (u'\U00010000' <= ch <= u'\U0010FFFF'))
1075 ) and ch != u'\uFEFF':
1076 # unicode_characters = True
1077 if not self.allow_unicode:
1078 special_characters = True
1079 else:
1080 special_characters = True
1081
1082 # Detect important whitespace combinations.
1083 if ch == u' ':
1084 if index == 0:
1085 leading_space = True
1086 if index == len(scalar) - 1:
1087 trailing_space = True
1088 if previous_break:
1089 break_space = True
1090 previous_space = True
1091 previous_break = False
1092 elif ch in u'\n\x85\u2028\u2029':
1093 if index == 0:
1094 leading_break = True
1095 if index == len(scalar) - 1:
1096 trailing_break = True
1097 if previous_space:
1098 space_break = True
1099 previous_space = False
1100 previous_break = True
1101 else:
1102 previous_space = False
1103 previous_break = False
1104
1105 # Prepare for the next character.
1106 index += 1
1107 preceeded_by_whitespace = ch in u'\0 \t\r\n\x85\u2028\u2029'
1108 followed_by_whitespace = (
1109 index + 1 >= len(scalar) or scalar[index + 1] in u'\0 \t\r\n\x85\u2028\u2029'
1110 )
1111
1112 # Let's decide what styles are allowed.
1113 allow_flow_plain = True
1114 allow_block_plain = True
1115 allow_single_quoted = True
1116 allow_double_quoted = True
1117 allow_block = True
1118
1119 # Leading and trailing whitespaces are bad for plain scalars.
1120 if leading_space or leading_break or trailing_space or trailing_break:
1121 allow_flow_plain = allow_block_plain = False
1122
1123 # We do not permit trailing spaces for block scalars.
1124 if trailing_space:
1125 allow_block = False
1126
1127 # Spaces at the beginning of a new line are only acceptable for block
1128 # scalars.
1129 if break_space:
1130 allow_flow_plain = allow_block_plain = allow_single_quoted = False
1131
1132 # Spaces followed by breaks, as well as special character are only
1133 # allowed for double quoted scalars.
1134 if special_characters:
1135 allow_flow_plain = allow_block_plain = allow_single_quoted = allow_block = False
1136 elif space_break:
1137 allow_flow_plain = allow_block_plain = allow_single_quoted = False
1138 if not self.allow_space_break:
1139 allow_block = False
1140
1141 # Although the plain scalar writer supports breaks, we never emit
1142 # multiline plain scalars.
1143 if line_breaks:
1144 allow_flow_plain = allow_block_plain = False
1145
1146 # Flow indicators are forbidden for flow plain scalars.
1147 if flow_indicators:
1148 allow_flow_plain = False
1149
1150 # Block indicators are forbidden for block plain scalars.
1151 if block_indicators:
1152 allow_block_plain = False
1153
1154 return ScalarAnalysis(
1155 scalar=scalar,
1156 empty=False,
1157 multiline=line_breaks,
1158 allow_flow_plain=allow_flow_plain,
1159 allow_block_plain=allow_block_plain,
1160 allow_single_quoted=allow_single_quoted,
1161 allow_double_quoted=allow_double_quoted,
1162 allow_block=allow_block,
1163 )
1164
1165 # Writers.
1166
1167 def flush_stream(self):
1168 # type: () -> None
1169 if hasattr(self.stream, 'flush'):
1170 self.stream.flush()
1171
1172 def write_stream_start(self):
1173 # type: () -> None
1174 # Write BOM if needed.
1175 if self.encoding and self.encoding.startswith('utf-16'):
1176 self.stream.write(u'\uFEFF'.encode(self.encoding))
1177
1178 def write_stream_end(self):
1179 # type: () -> None
1180 self.flush_stream()
1181
1182 def write_indicator(self, indicator, need_whitespace, whitespace=False, indention=False):
1183 # type: (Any, Any, bool, bool) -> None
1184 if self.whitespace or not need_whitespace:
1185 data = indicator
1186 else:
1187 data = u' ' + indicator
1188 self.whitespace = whitespace
1189 self.indention = self.indention and indention
1190 self.column += len(data)
1191 self.open_ended = False
1192 if bool(self.encoding):
1193 data = data.encode(self.encoding)
1194 self.stream.write(data)
1195
1196 def write_indent(self):
1197 # type: () -> None
1198 indent = self.indent or 0
1199 if (
1200 not self.indention
1201 or self.column > indent
1202 or (self.column == indent and not self.whitespace)
1203 ):
1204 if bool(self.no_newline):
1205 self.no_newline = False
1206 else:
1207 self.write_line_break()
1208 if self.column < indent:
1209 self.whitespace = True
1210 data = u' ' * (indent - self.column)
1211 self.column = indent
1212 if self.encoding:
1213 data = data.encode(self.encoding)
1214 self.stream.write(data)
1215
1216 def write_line_break(self, data=None):
1217 # type: (Any) -> None
1218 if data is None:
1219 data = self.best_line_break
1220 self.whitespace = True
1221 self.indention = True
1222 self.line += 1
1223 self.column = 0
1224 if bool(self.encoding):
1225 data = data.encode(self.encoding)
1226 self.stream.write(data)
1227
1228 def write_version_directive(self, version_text):
1229 # type: (Any) -> None
1230 data = u'%%YAML %s' % version_text
1231 if self.encoding:
1232 data = data.encode(self.encoding)
1233 self.stream.write(data)
1234 self.write_line_break()
1235
1236 def write_tag_directive(self, handle_text, prefix_text):
1237 # type: (Any, Any) -> None
1238 data = u'%%TAG %s %s' % (handle_text, prefix_text)
1239 if self.encoding:
1240 data = data.encode(self.encoding)
1241 self.stream.write(data)
1242 self.write_line_break()
1243
1244 # Scalar streams.
1245
1246 def write_single_quoted(self, text, split=True):
1247 # type: (Any, Any) -> None
1248 if self.root_context:
1249 if self.requested_indent is not None:
1250 self.write_line_break()
1251 if self.requested_indent != 0:
1252 self.write_indent()
1253 self.write_indicator(u"'", True)
1254 spaces = False
1255 breaks = False
1256 start = end = 0
1257 while end <= len(text):
1258 ch = None
1259 if end < len(text):
1260 ch = text[end]
1261 if spaces:
1262 if ch is None or ch != u' ':
1263 if (
1264 start + 1 == end
1265 and self.column > self.best_width
1266 and split
1267 and start != 0
1268 and end != len(text)
1269 ):
1270 self.write_indent()
1271 else:
1272 data = text[start:end]
1273 self.column += len(data)
1274 if bool(self.encoding):
1275 data = data.encode(self.encoding)
1276 self.stream.write(data)
1277 start = end
1278 elif breaks:
1279 if ch is None or ch not in u'\n\x85\u2028\u2029':
1280 if text[start] == u'\n':
1281 self.write_line_break()
1282 for br in text[start:end]:
1283 if br == u'\n':
1284 self.write_line_break()
1285 else:
1286 self.write_line_break(br)
1287 self.write_indent()
1288 start = end
1289 else:
1290 if ch is None or ch in u' \n\x85\u2028\u2029' or ch == u"'":
1291 if start < end:
1292 data = text[start:end]
1293 self.column += len(data)
1294 if bool(self.encoding):
1295 data = data.encode(self.encoding)
1296 self.stream.write(data)
1297 start = end
1298 if ch == u"'":
1299 data = u"''"
1300 self.column += 2
1301 if bool(self.encoding):
1302 data = data.encode(self.encoding)
1303 self.stream.write(data)
1304 start = end + 1
1305 if ch is not None:
1306 spaces = ch == u' '
1307 breaks = ch in u'\n\x85\u2028\u2029'
1308 end += 1
1309 self.write_indicator(u"'", False)
1310
1311 ESCAPE_REPLACEMENTS = {
1312 u'\0': u'0',
1313 u'\x07': u'a',
1314 u'\x08': u'b',
1315 u'\x09': u't',
1316 u'\x0A': u'n',
1317 u'\x0B': u'v',
1318 u'\x0C': u'f',
1319 u'\x0D': u'r',
1320 u'\x1B': u'e',
1321 u'"': u'"',
1322 u'\\': u'\\',
1323 u'\x85': u'N',
1324 u'\xA0': u'_',
1325 u'\u2028': u'L',
1326 u'\u2029': u'P',
1327 }
1328
1329 def write_double_quoted(self, text, split=True):
1330 # type: (Any, Any) -> None
1331 if self.root_context:
1332 if self.requested_indent is not None:
1333 self.write_line_break()
1334 if self.requested_indent != 0:
1335 self.write_indent()
1336 self.write_indicator(u'"', True)
1337 start = end = 0
1338 while end <= len(text):
1339 ch = None
1340 if end < len(text):
1341 ch = text[end]
1342 if (
1343 ch is None
1344 or ch in u'"\\\x85\u2028\u2029\uFEFF'
1345 or not (
1346 u'\x20' <= ch <= u'\x7E'
1347 or (
1348 self.allow_unicode
1349 and (u'\xA0' <= ch <= u'\uD7FF' or u'\uE000' <= ch <= u'\uFFFD')
1350 )
1351 )
1352 ):
1353 if start < end:
1354 data = text[start:end]
1355 self.column += len(data)
1356 if bool(self.encoding):
1357 data = data.encode(self.encoding)
1358 self.stream.write(data)
1359 start = end
1360 if ch is not None:
1361 if ch in self.ESCAPE_REPLACEMENTS:
1362 data = u'\\' + self.ESCAPE_REPLACEMENTS[ch]
1363 elif ch <= u'\xFF':
1364 data = u'\\x%02X' % ord(ch)
1365 elif ch <= u'\uFFFF':
1366 data = u'\\u%04X' % ord(ch)
1367 else:
1368 data = u'\\U%08X' % ord(ch)
1369 self.column += len(data)
1370 if bool(self.encoding):
1371 data = data.encode(self.encoding)
1372 self.stream.write(data)
1373 start = end + 1
1374 if (
1375 0 < end < len(text) - 1
1376 and (ch == u' ' or start >= end)
1377 and self.column + (end - start) > self.best_width
1378 and split
1379 ):
1380 data = text[start:end] + u'\\'
1381 if start < end:
1382 start = end
1383 self.column += len(data)
1384 if bool(self.encoding):
1385 data = data.encode(self.encoding)
1386 self.stream.write(data)
1387 self.write_indent()
1388 self.whitespace = False
1389 self.indention = False
1390 if text[start] == u' ':
1391 data = u'\\'
1392 self.column += len(data)
1393 if bool(self.encoding):
1394 data = data.encode(self.encoding)
1395 self.stream.write(data)
1396 end += 1
1397 self.write_indicator(u'"', False)
1398
1399 def determine_block_hints(self, text):
1400 # type: (Any) -> Any
1401 indent = 0
1402 indicator = u''
1403 hints = u''
1404 if text:
1405 if text[0] in u' \n\x85\u2028\u2029':
1406 indent = self.best_sequence_indent
1407 hints += text_type(indent)
1408 elif self.root_context:
1409 for end in ['\n---', '\n...']:
1410 pos = 0
1411 while True:
1412 pos = text.find(end, pos)
1413 if pos == -1:
1414 break
1415 try:
1416 if text[pos + 4] in ' \r\n':
1417 break
1418 except IndexError:
1419 pass
1420 pos += 1
1421 if pos > -1:
1422 break
1423 if pos > 0:
1424 indent = self.best_sequence_indent
1425 if text[-1] not in u'\n\x85\u2028\u2029':
1426 indicator = u'-'
1427 elif len(text) == 1 or text[-2] in u'\n\x85\u2028\u2029':
1428 indicator = u'+'
1429 hints += indicator
1430 return hints, indent, indicator
1431
1432 def write_folded(self, text):
1433 # type: (Any) -> None
1434 hints, _indent, _indicator = self.determine_block_hints(text)
1435 self.write_indicator(u'>' + hints, True)
1436 if _indicator == u'+':
1437 self.open_ended = True
1438 self.write_line_break()
1439 leading_space = True
1440 spaces = False
1441 breaks = True
1442 start = end = 0
1443 while end <= len(text):
1444 ch = None
1445 if end < len(text):
1446 ch = text[end]
1447 if breaks:
1448 if ch is None or ch not in u'\n\x85\u2028\u2029\a':
1449 if (
1450 not leading_space
1451 and ch is not None
1452 and ch != u' '
1453 and text[start] == u'\n'
1454 ):
1455 self.write_line_break()
1456 leading_space = ch == u' '
1457 for br in text[start:end]:
1458 if br == u'\n':
1459 self.write_line_break()
1460 else:
1461 self.write_line_break(br)
1462 if ch is not None:
1463 self.write_indent()
1464 start = end
1465 elif spaces:
1466 if ch != u' ':
1467 if start + 1 == end and self.column > self.best_width:
1468 self.write_indent()
1469 else:
1470 data = text[start:end]
1471 self.column += len(data)
1472 if bool(self.encoding):
1473 data = data.encode(self.encoding)
1474 self.stream.write(data)
1475 start = end
1476 else:
1477 if ch is None or ch in u' \n\x85\u2028\u2029\a':
1478 data = text[start:end]
1479 self.column += len(data)
1480 if bool(self.encoding):
1481 data = data.encode(self.encoding)
1482 self.stream.write(data)
1483 if ch == u'\a':
1484 if end < (len(text) - 1) and not text[end + 2].isspace():
1485 self.write_line_break()
1486 self.write_indent()
1487 end += 2 # \a and the space that is inserted on the fold
1488 else:
1489 raise EmitterError('unexcpected fold indicator \\a before space')
1490 if ch is None:
1491 self.write_line_break()
1492 start = end
1493 if ch is not None:
1494 breaks = ch in u'\n\x85\u2028\u2029'
1495 spaces = ch == u' '
1496 end += 1
1497
1498 def write_literal(self, text, comment=None):
1499 # type: (Any, Any) -> None
1500 hints, _indent, _indicator = self.determine_block_hints(text)
1501 self.write_indicator(u'|' + hints, True)
1502 try:
1503 comment = comment[1][0]
1504 if comment:
1505 self.stream.write(comment)
1506 except (TypeError, IndexError):
1507 pass
1508 if _indicator == u'+':
1509 self.open_ended = True
1510 self.write_line_break()
1511 breaks = True
1512 start = end = 0
1513 while end <= len(text):
1514 ch = None
1515 if end < len(text):
1516 ch = text[end]
1517 if breaks:
1518 if ch is None or ch not in u'\n\x85\u2028\u2029':
1519 for br in text[start:end]:
1520 if br == u'\n':
1521 self.write_line_break()
1522 else:
1523 self.write_line_break(br)
1524 if ch is not None:
1525 if self.root_context:
1526 idnx = self.indent if self.indent is not None else 0
1527 self.stream.write(u' ' * (_indent + idnx))
1528 else:
1529 self.write_indent()
1530 start = end
1531 else:
1532 if ch is None or ch in u'\n\x85\u2028\u2029':
1533 data = text[start:end]
1534 if bool(self.encoding):
1535 data = data.encode(self.encoding)
1536 self.stream.write(data)
1537 if ch is None:
1538 self.write_line_break()
1539 start = end
1540 if ch is not None:
1541 breaks = ch in u'\n\x85\u2028\u2029'
1542 end += 1
1543
1544 def write_plain(self, text, split=True):
1545 # type: (Any, Any) -> None
1546 if self.root_context:
1547 if self.requested_indent is not None:
1548 self.write_line_break()
1549 if self.requested_indent != 0:
1550 self.write_indent()
1551 else:
1552 self.open_ended = True
1553 if not text:
1554 return
1555 if not self.whitespace:
1556 data = u' '
1557 self.column += len(data)
1558 if self.encoding:
1559 data = data.encode(self.encoding)
1560 self.stream.write(data)
1561 self.whitespace = False
1562 self.indention = False
1563 spaces = False
1564 breaks = False
1565 start = end = 0
1566 while end <= len(text):
1567 ch = None
1568 if end < len(text):
1569 ch = text[end]
1570 if spaces:
1571 if ch != u' ':
1572 if start + 1 == end and self.column > self.best_width and split:
1573 self.write_indent()
1574 self.whitespace = False
1575 self.indention = False
1576 else:
1577 data = text[start:end]
1578 self.column += len(data)
1579 if self.encoding:
1580 data = data.encode(self.encoding)
1581 self.stream.write(data)
1582 start = end
1583 elif breaks:
1584 if ch not in u'\n\x85\u2028\u2029':
1585 if text[start] == u'\n':
1586 self.write_line_break()
1587 for br in text[start:end]:
1588 if br == u'\n':
1589 self.write_line_break()
1590 else:
1591 self.write_line_break(br)
1592 self.write_indent()
1593 self.whitespace = False
1594 self.indention = False
1595 start = end
1596 else:
1597 if ch is None or ch in u' \n\x85\u2028\u2029':
1598 data = text[start:end]
1599 self.column += len(data)
1600 if self.encoding:
1601 data = data.encode(self.encoding)
1602 try:
1603 self.stream.write(data)
1604 except: # NOQA
1605 sys.stdout.write(repr(data) + '\n')
1606 raise
1607 start = end
1608 if ch is not None:
1609 spaces = ch == u' '
1610 breaks = ch in u'\n\x85\u2028\u2029'
1611 end += 1
1612
1613 def write_comment(self, comment, pre=False):
1614 # type: (Any, bool) -> None
1615 value = comment.value
1616 # nprintf('{:02d} {:02d} {!r}'.format(self.column, comment.start_mark.column, value))
1617 if not pre and value[-1] == '\n':
1618 value = value[:-1]
1619 try:
1620 # get original column position
1621 col = comment.start_mark.column
1622 if comment.value and comment.value.startswith('\n'):
1623 # never inject extra spaces if the comment starts with a newline
1624 # and not a real comment (e.g. if you have an empty line following a key-value
1625 col = self.column
1626 elif col < self.column + 1:
1627 ValueError
1628 except ValueError:
1629 col = self.column + 1
1630 # nprint('post_comment', self.line, self.column, value)
1631 try:
1632 # at least one space if the current column >= the start column of the comment
1633 # but not at the start of a line
1634 nr_spaces = col - self.column
1635 if self.column and value.strip() and nr_spaces < 1 and value[0] != '\n':
1636 nr_spaces = 1
1637 value = ' ' * nr_spaces + value
1638 try:
1639 if bool(self.encoding):
1640 value = value.encode(self.encoding)
1641 except UnicodeDecodeError:
1642 pass
1643 self.stream.write(value)
1644 except TypeError:
1645 raise
1646 if not pre:
1647 self.write_line_break()
1648
1649 def write_pre_comment(self, event):
1650 # type: (Any) -> bool
1651 comments = event.comment[1]
1652 if comments is None:
1653 return False
1654 try:
1655 start_events = (MappingStartEvent, SequenceStartEvent)
1656 for comment in comments:
1657 if isinstance(event, start_events) and getattr(comment, 'pre_done', None):
1658 continue
1659 if self.column != 0:
1660 self.write_line_break()
1661 self.write_comment(comment, pre=True)
1662 if isinstance(event, start_events):
1663 comment.pre_done = True
1664 except TypeError:
1665 sys.stdout.write('eventtt {} {}'.format(type(event), event))
1666 raise
1667 return True
1668
1669 def write_post_comment(self, event):
1670 # type: (Any) -> bool
1671 if self.event.comment[0] is None:
1672 return False
1673 comment = event.comment[0]
1674 self.write_comment(comment)
1675 return True