diff options
Diffstat (limited to 'python/google/protobuf/text_format.py')
-rwxr-xr-x | python/google/protobuf/text_format.py | 586 |
1 files changed, 193 insertions, 393 deletions
diff --git a/python/google/protobuf/text_format.py b/python/google/protobuf/text_format.py index 2429fa5..cc6ac90 100755 --- a/python/google/protobuf/text_format.py +++ b/python/google/protobuf/text_format.py @@ -1,6 +1,6 @@ # Protocol Buffers - Google's data interchange format # Copyright 2008 Google Inc. All rights reserved. -# https://developers.google.com/protocol-buffers/ +# http://code.google.com/p/protobuf/ # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are @@ -28,10 +28,6 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -#PY25 compatible for GAE. -# -# Copyright 2007 Google Inc. All Rights Reserved. - """Contains routines for printing protocol messages in text format.""" __author__ = 'kenton@google.com (Kenton Varda)' @@ -39,92 +35,46 @@ __author__ = 'kenton@google.com (Kenton Varda)' import cStringIO import re +from collections import deque from google.protobuf.internal import type_checkers from google.protobuf import descriptor -from google.protobuf import text_encoding - -__all__ = ['MessageToString', 'PrintMessage', 'PrintField', - 'PrintFieldValue', 'Merge'] +__all__ = [ 'MessageToString', 'PrintMessage', 'PrintField', + 'PrintFieldValue', 'Merge' ] -_INTEGER_CHECKERS = (type_checkers.Uint32ValueChecker(), - type_checkers.Int32ValueChecker(), - type_checkers.Uint64ValueChecker(), - type_checkers.Int64ValueChecker()) -_FLOAT_INFINITY = re.compile('-?inf(?:inity)?f?', re.IGNORECASE) -_FLOAT_NAN = re.compile('nanf?', re.IGNORECASE) -_FLOAT_TYPES = frozenset([descriptor.FieldDescriptor.CPPTYPE_FLOAT, - descriptor.FieldDescriptor.CPPTYPE_DOUBLE]) +# Infinity and NaN are not explicitly supported by Python pre-2.6, and +# float('inf') does not work on Windows (pre-2.6). +_INFINITY = 1e10000 # overflows, thus will actually be infinity. +_NAN = _INFINITY * 0 -class Error(Exception): - """Top-level module error for text_format.""" - -class ParseError(Error): +class ParseError(Exception): """Thrown in case of ASCII parsing error.""" -def MessageToString(message, as_utf8=False, as_one_line=False, - pointy_brackets=False, use_index_order=False, - float_format=None): - """Convert protobuf message to text format. - - Floating point values can be formatted compactly with 15 digits of - precision (which is the most that IEEE 754 "double" can guarantee) - using float_format='.15g'. - - Args: - message: The protocol buffers message. - as_utf8: Produce text output in UTF8 format. - as_one_line: Don't introduce newlines between fields. - pointy_brackets: If True, use angle brackets instead of curly braces for - nesting. - use_index_order: If True, print fields of a proto message using the order - defined in source code instead of the field number. By default, use the - field number order. - float_format: If set, use this to specify floating point number formatting - (per the "Format Specification Mini-Language"); otherwise, str() is used. - - Returns: - A string of the text formatted protocol buffer message. - """ +def MessageToString(message): out = cStringIO.StringIO() - PrintMessage(message, out, as_utf8=as_utf8, as_one_line=as_one_line, - pointy_brackets=pointy_brackets, - use_index_order=use_index_order, - float_format=float_format) + PrintMessage(message, out) result = out.getvalue() out.close() - if as_one_line: - return result.rstrip() return result -def PrintMessage(message, out, indent=0, as_utf8=False, as_one_line=False, - pointy_brackets=False, use_index_order=False, - float_format=None): - fields = message.ListFields() - if use_index_order: - fields.sort(key=lambda x: x[0].index) - for field, value in fields: +def PrintMessage(message, out, indent = 0): + for field, value in message.ListFields(): if field.label == descriptor.FieldDescriptor.LABEL_REPEATED: for element in value: - PrintField(field, element, out, indent, as_utf8, as_one_line, - pointy_brackets=pointy_brackets, - float_format=float_format) + PrintField(field, element, out, indent) else: - PrintField(field, value, out, indent, as_utf8, as_one_line, - pointy_brackets=pointy_brackets, - float_format=float_format) + PrintField(field, value, out, indent) -def PrintField(field, value, out, indent=0, as_utf8=False, as_one_line=False, - pointy_brackets=False, float_format=None): +def PrintField(field, value, out, indent = 0): """Print a single field name/value pair. For repeated fields, the value should be a single element.""" - out.write(' ' * indent) + out.write(' ' * indent); if field.is_extension: out.write('[') if (field.containing_type.GetOptions().message_set_wire_format and @@ -146,168 +96,54 @@ def PrintField(field, value, out, indent=0, as_utf8=False, as_one_line=False, # don't include it. out.write(': ') - PrintFieldValue(field, value, out, indent, as_utf8, as_one_line, - pointy_brackets=pointy_brackets, - float_format=float_format) - if as_one_line: - out.write(' ') - else: - out.write('\n') + PrintFieldValue(field, value, out, indent) + out.write('\n') -def PrintFieldValue(field, value, out, indent=0, as_utf8=False, - as_one_line=False, pointy_brackets=False, - float_format=None): +def PrintFieldValue(field, value, out, indent = 0): """Print a single field value (not including name). For repeated fields, the value should be a single element.""" - if pointy_brackets: - openb = '<' - closeb = '>' - else: - openb = '{' - closeb = '}' - if field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE: - if as_one_line: - out.write(' %s ' % openb) - PrintMessage(value, out, indent, as_utf8, as_one_line, - pointy_brackets=pointy_brackets, - float_format=float_format) - out.write(closeb) - else: - out.write(' %s\n' % openb) - PrintMessage(value, out, indent + 2, as_utf8, as_one_line, - pointy_brackets=pointy_brackets, - float_format=float_format) - out.write(' ' * indent + closeb) + out.write(' {\n') + PrintMessage(value, out, indent + 2) + out.write(' ' * indent + '}') elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_ENUM: - enum_value = field.enum_type.values_by_number.get(value, None) - if enum_value is not None: - out.write(enum_value.name) - else: - out.write(str(value)) + out.write(field.enum_type.values_by_number[value].name) elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_STRING: out.write('\"') - if isinstance(value, unicode): - out_value = value.encode('utf-8') - else: - out_value = value - if field.type == descriptor.FieldDescriptor.TYPE_BYTES: - # We need to escape non-UTF8 chars in TYPE_BYTES field. - out_as_utf8 = False - else: - out_as_utf8 = as_utf8 - out.write(text_encoding.CEscape(out_value, out_as_utf8)) + out.write(_CEscape(value)) out.write('\"') elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_BOOL: if value: - out.write('true') + out.write("true") else: - out.write('false') - elif field.cpp_type in _FLOAT_TYPES and float_format is not None: - out.write('{1:{0}}'.format(float_format, value)) + out.write("false") else: out.write(str(value)) -def _ParseOrMerge(lines, message, allow_multiple_scalars): - """Converts an ASCII representation of a protocol message into a message. - - Args: - lines: Lines of a message's ASCII representation. - message: A protocol buffer message to merge into. - allow_multiple_scalars: Determines if repeated values for a non-repeated - field are permitted, e.g., the string "foo: 1 foo: 2" for a - required/optional field named "foo". - - Raises: - ParseError: On ASCII parsing problems. - """ - tokenizer = _Tokenizer(lines) - while not tokenizer.AtEnd(): - _MergeField(tokenizer, message, allow_multiple_scalars) - - -def Parse(text, message): - """Parses an ASCII representation of a protocol message into a message. - - Args: - text: Message ASCII representation. - message: A protocol buffer message to merge into. - - Returns: - The same message passed as argument. - - Raises: - ParseError: On ASCII parsing problems. - """ - if not isinstance(text, str): text = text.decode('utf-8') - return ParseLines(text.split('\n'), message) - - def Merge(text, message): - """Parses an ASCII representation of a protocol message into a message. - - Like Parse(), but allows repeated values for a non-repeated field, and uses - the last one. + """Merges an ASCII representation of a protocol message into a message. Args: text: Message ASCII representation. message: A protocol buffer message to merge into. - Returns: - The same message passed as argument. - - Raises: - ParseError: On ASCII parsing problems. - """ - return MergeLines(text.split('\n'), message) - - -def ParseLines(lines, message): - """Parses an ASCII representation of a protocol message into a message. - - Args: - lines: An iterable of lines of a message's ASCII representation. - message: A protocol buffer message to merge into. - - Returns: - The same message passed as argument. - - Raises: - ParseError: On ASCII parsing problems. - """ - _ParseOrMerge(lines, message, False) - return message - - -def MergeLines(lines, message): - """Parses an ASCII representation of a protocol message into a message. - - Args: - lines: An iterable of lines of a message's ASCII representation. - message: A protocol buffer message to merge into. - - Returns: - The same message passed as argument. - Raises: ParseError: On ASCII parsing problems. """ - _ParseOrMerge(lines, message, True) - return message + tokenizer = _Tokenizer(text) + while not tokenizer.AtEnd(): + _MergeField(tokenizer, message) -def _MergeField(tokenizer, message, allow_multiple_scalars): +def _MergeField(tokenizer, message): """Merges a single protocol message field into a message. Args: tokenizer: A tokenizer to parse the field name and values. message: A protocol message to record the data. - allow_multiple_scalars: Determines if repeated values for a non-repeated - field are permitted, e.g., the string "foo: 1 foo: 2" for a - required/optional field named "foo". Raises: ParseError: In case of ASCII parsing problems. @@ -323,9 +159,7 @@ def _MergeField(tokenizer, message, allow_multiple_scalars): raise tokenizer.ParseErrorPreviousToken( 'Message type "%s" does not have extensions.' % message_descriptor.full_name) - # pylint: disable=protected-access field = message.Extensions._FindExtensionByName(name) - # pylint: enable=protected-access if not field: raise tokenizer.ParseErrorPreviousToken( 'Extension "%s" not registered.' % name) @@ -374,31 +208,23 @@ def _MergeField(tokenizer, message, allow_multiple_scalars): sub_message = message.Extensions[field] else: sub_message = getattr(message, field.name) - sub_message.SetInParent() + sub_message.SetInParent() while not tokenizer.TryConsume(end_token): if tokenizer.AtEnd(): raise tokenizer.ParseErrorPreviousToken('Expected "%s".' % (end_token)) - _MergeField(tokenizer, sub_message, allow_multiple_scalars) + _MergeField(tokenizer, sub_message) else: - _MergeScalarField(tokenizer, message, field, allow_multiple_scalars) - - # For historical reasons, fields may optionally be separated by commas or - # semicolons. - if not tokenizer.TryConsume(','): - tokenizer.TryConsume(';') + _MergeScalarField(tokenizer, message, field) -def _MergeScalarField(tokenizer, message, field, allow_multiple_scalars): +def _MergeScalarField(tokenizer, message, field): """Merges a single protocol message scalar field into a message. Args: tokenizer: A tokenizer to parse the field value. message: A protocol message to record the data. field: The descriptor of the field to be merged. - allow_multiple_scalars: Determines if repeated values for a non-repeated - field are permitted, e.g., the string "foo: 1 foo: 2" for a - required/optional field named "foo". Raises: ParseError: In case of ASCII parsing problems. @@ -431,7 +257,24 @@ def _MergeScalarField(tokenizer, message, field, allow_multiple_scalars): elif field.type == descriptor.FieldDescriptor.TYPE_BYTES: value = tokenizer.ConsumeByteString() elif field.type == descriptor.FieldDescriptor.TYPE_ENUM: - value = tokenizer.ConsumeEnum(field) + # Enum can be specified by a number (the enum value), or by + # a string literal (the enum name). + enum_descriptor = field.enum_type + if tokenizer.LookingAtInteger(): + number = tokenizer.ConsumeInt32() + enum_value = enum_descriptor.values_by_number.get(number, None) + if enum_value is None: + raise tokenizer.ParseErrorPreviousToken( + 'Enum type "%s" has no value with number %d.' % ( + enum_descriptor.full_name, number)) + else: + identifier = tokenizer.ConsumeIdentifier() + enum_value = enum_descriptor.values_by_name.get(identifier, None) + if enum_value is None: + raise tokenizer.ParseErrorPreviousToken( + 'Enum type "%s" has no value named %s.' % ( + enum_descriptor.full_name, identifier)) + value = enum_value.number else: raise RuntimeError('Unknown field type %d' % field.type) @@ -442,19 +285,9 @@ def _MergeScalarField(tokenizer, message, field, allow_multiple_scalars): getattr(message, field.name).append(value) else: if field.is_extension: - if not allow_multiple_scalars and message.HasExtension(field): - raise tokenizer.ParseErrorPreviousToken( - 'Message type "%s" should not have multiple "%s" extensions.' % - (message.DESCRIPTOR.full_name, field.full_name)) - else: - message.Extensions[field] = value + message.Extensions[field] = value else: - if not allow_multiple_scalars and message.HasField(field.name): - raise tokenizer.ParseErrorPreviousToken( - 'Message type "%s" should not have multiple "%s" fields.' % - (message.DESCRIPTOR.full_name, field.name)) - else: - setattr(message, field.name, value) + setattr(message, field.name, value) class _Tokenizer(object): @@ -472,19 +305,26 @@ class _Tokenizer(object): '[0-9+-][0-9a-zA-Z_.+-]*|' # a number '\"([^\"\n\\\\]|\\\\.)*(\"|\\\\?$)|' # a double-quoted string '\'([^\'\n\\\\]|\\\\.)*(\'|\\\\?$)') # a single-quoted string - _IDENTIFIER = re.compile(r'\w+') + _IDENTIFIER = re.compile('\w+') + _INTEGER_CHECKERS = [type_checkers.Uint32ValueChecker(), + type_checkers.Int32ValueChecker(), + type_checkers.Uint64ValueChecker(), + type_checkers.Int64ValueChecker()] + _FLOAT_INFINITY = re.compile('-?inf(inity)?f?', re.IGNORECASE) + _FLOAT_NAN = re.compile("nanf?", re.IGNORECASE) + + def __init__(self, text_message): + self._text_message = text_message - def __init__(self, lines): self._position = 0 self._line = -1 self._column = 0 self._token_start = None self.token = '' - self._lines = iter(lines) + self._lines = deque(text_message.split('\n')) self._current_line = '' self._previous_line = 0 self._previous_column = 0 - self._more_lines = True self._SkipWhitespace() self.NextToken() @@ -494,27 +334,25 @@ class _Tokenizer(object): Returns: True iff the end was reached. """ - return not self.token + return not self._lines and not self._current_line def _PopLine(self): - while len(self._current_line) <= self._column: - try: - self._current_line = self._lines.next() - except StopIteration: + while not self._current_line: + if not self._lines: self._current_line = '' - self._more_lines = False return - else: - self._line += 1 - self._column = 0 + self._line += 1 + self._column = 0 + self._current_line = self._lines.popleft() def _SkipWhitespace(self): while True: self._PopLine() - match = self._WHITESPACE.match(self._current_line, self._column) + match = re.match(self._WHITESPACE, self._current_line) if not match: break length = len(match.group(0)) + self._current_line = self._current_line[length:] self._column += length def TryConsume(self, token): @@ -543,6 +381,17 @@ class _Tokenizer(object): if not self.TryConsume(token): raise self._ParseError('Expected "%s".' % token) + def LookingAtInteger(self): + """Checks if the current token is an integer. + + Returns: + True iff the current token is an integer. + """ + if not self.token: + return False + c = self.token[0] + return (c >= '0' and c <= '9') or c == '-' or c == '+' + def ConsumeIdentifier(self): """Consumes protocol message field identifier. @@ -553,7 +402,7 @@ class _Tokenizer(object): ParseError: If an identifier couldn't be consumed. """ result = self.token - if not self._IDENTIFIER.match(result): + if not re.match(self._IDENTIFIER, result): raise self._ParseError('Expected identifier.') self.NextToken() return result @@ -568,9 +417,9 @@ class _Tokenizer(object): ParseError: If a signed 32bit integer couldn't be consumed. """ try: - result = ParseInteger(self.token, is_signed=True, is_long=False) + result = self._ParseInteger(self.token, is_signed=True, is_long=False) except ValueError, e: - raise self._ParseError(str(e)) + raise self._IntegerParseError(e) self.NextToken() return result @@ -584,9 +433,9 @@ class _Tokenizer(object): ParseError: If an unsigned 32bit integer couldn't be consumed. """ try: - result = ParseInteger(self.token, is_signed=False, is_long=False) + result = self._ParseInteger(self.token, is_signed=False, is_long=False) except ValueError, e: - raise self._ParseError(str(e)) + raise self._IntegerParseError(e) self.NextToken() return result @@ -600,9 +449,9 @@ class _Tokenizer(object): ParseError: If a signed 64bit integer couldn't be consumed. """ try: - result = ParseInteger(self.token, is_signed=True, is_long=True) + result = self._ParseInteger(self.token, is_signed=True, is_long=True) except ValueError, e: - raise self._ParseError(str(e)) + raise self._IntegerParseError(e) self.NextToken() return result @@ -616,9 +465,9 @@ class _Tokenizer(object): ParseError: If an unsigned 64bit integer couldn't be consumed. """ try: - result = ParseInteger(self.token, is_signed=False, is_long=True) + result = self._ParseInteger(self.token, is_signed=False, is_long=True) except ValueError, e: - raise self._ParseError(str(e)) + raise self._IntegerParseError(e) self.NextToken() return result @@ -631,10 +480,21 @@ class _Tokenizer(object): Raises: ParseError: If a floating point number couldn't be consumed. """ + text = self.token + if re.match(self._FLOAT_INFINITY, text): + self.NextToken() + if text.startswith('-'): + return -_INFINITY + return _INFINITY + + if re.match(self._FLOAT_NAN, text): + self.NextToken() + return _NAN + try: - result = ParseFloat(self.token) + result = float(text) except ValueError, e: - raise self._ParseError(str(e)) + raise self._FloatParseError(e) self.NextToken() return result @@ -647,12 +507,14 @@ class _Tokenizer(object): Raises: ParseError: If a boolean value couldn't be consumed. """ - try: - result = ParseBool(self.token) - except ValueError, e: - raise self._ParseError(str(e)) - self.NextToken() - return result + if self.token == 'true': + self.NextToken() + return True + elif self.token == 'false': + self.NextToken() + return False + else: + raise self._ParseError('Expected "true" or "false".') def ConsumeString(self): """Consumes a string value. @@ -663,11 +525,7 @@ class _Tokenizer(object): Raises: ParseError: If a string value couldn't be consumed. """ - the_bytes = self.ConsumeByteString() - try: - return unicode(the_bytes, 'utf-8') - except UnicodeDecodeError, e: - raise self._StringParseError(e) + return unicode(self.ConsumeByteString(), 'utf-8') def ConsumeByteString(self): """Consumes a byte array value. @@ -678,11 +536,10 @@ class _Tokenizer(object): Raises: ParseError: If a byte array value couldn't be consumed. """ - the_list = [self._ConsumeSingleByteString()] - while self.token and self.token[0] in ('\'', '"'): - the_list.append(self._ConsumeSingleByteString()) - return ''.encode('latin1').join(the_list) ##PY25 -##!PY25 return b''.join(the_list) + list = [self._ConsumeSingleByteString()] + while len(self.token) > 0 and self.token[0] in ('\'', '"'): + list.append(self._ConsumeSingleByteString()) + return "".join(list) def _ConsumeSingleByteString(self): """Consume one token of a string literal. @@ -693,24 +550,48 @@ class _Tokenizer(object): """ text = self.token if len(text) < 1 or text[0] not in ('\'', '"'): - raise self._ParseError('Expected string.') + raise self._ParseError('Exptected string.') if len(text) < 2 or text[-1] != text[0]: raise self._ParseError('String missing ending quote.') try: - result = text_encoding.CUnescape(text[1:-1]) + result = _CUnescape(text[1:-1]) except ValueError, e: raise self._ParseError(str(e)) self.NextToken() return result - def ConsumeEnum(self, field): - try: - result = ParseEnum(field, self.token) - except ValueError, e: - raise self._ParseError(str(e)) - self.NextToken() + def _ParseInteger(self, text, is_signed=False, is_long=False): + """Parses an integer. + + Args: + text: The text to parse. + is_signed: True if a signed integer must be parsed. + is_long: True if a long integer must be parsed. + + Returns: + The integer value. + + Raises: + ValueError: Thrown Iff the text is not a valid integer. + """ + pos = 0 + if text.startswith('-'): + pos += 1 + + base = 10 + if text.startswith('0x', pos) or text.startswith('0X', pos): + base = 16 + elif text.startswith('0', pos): + base = 8 + + # Do the actual parsing. Exception handling is propagated to caller. + result = int(text, base) + + # Check if the integer is sane. Exceptions handled by callers. + checker = self._INTEGER_CHECKERS[2 * int(is_long) + int(is_signed)] + checker.CheckValue(result) return result def ParseErrorPreviousToken(self, message): @@ -730,144 +611,63 @@ class _Tokenizer(object): return ParseError('%d:%d : %s' % ( self._line + 1, self._column + 1, message)) - def _StringParseError(self, e): - return self._ParseError('Couldn\'t parse string: ' + str(e)) + def _IntegerParseError(self, e): + return self._ParseError('Couldn\'t parse integer: ' + str(e)) + + def _FloatParseError(self, e): + return self._ParseError('Couldn\'t parse number: ' + str(e)) def NextToken(self): """Reads the next meaningful token.""" self._previous_line = self._line self._previous_column = self._column - - self._column += len(self.token) - self._SkipWhitespace() - - if not self._more_lines: + if self.AtEnd(): self.token = '' return + self._column += len(self.token) + + # Make sure there is data to work on. + self._PopLine() - match = self._TOKEN.match(self._current_line, self._column) + match = re.match(self._TOKEN, self._current_line) if match: token = match.group(0) + self._current_line = self._current_line[len(token):] self.token = token else: - self.token = self._current_line[self._column] - - -def ParseInteger(text, is_signed=False, is_long=False): - """Parses an integer. - - Args: - text: The text to parse. - is_signed: True if a signed integer must be parsed. - is_long: True if a long integer must be parsed. - - Returns: - The integer value. - - Raises: - ValueError: Thrown Iff the text is not a valid integer. - """ - # Do the actual parsing. Exception handling is propagated to caller. - try: - # We force 32-bit values to int and 64-bit values to long to make - # alternate implementations where the distinction is more significant - # (e.g. the C++ implementation) simpler. - if is_long: - result = long(text, 0) - else: - result = int(text, 0) - except ValueError: - raise ValueError('Couldn\'t parse integer: %s' % text) - - # Check if the integer is sane. Exceptions handled by callers. - checker = _INTEGER_CHECKERS[2 * int(is_long) + int(is_signed)] - checker.CheckValue(result) - return result - - -def ParseFloat(text): - """Parse a floating point number. - - Args: - text: Text to parse. - - Returns: - The number parsed. - - Raises: - ValueError: If a floating point number couldn't be parsed. - """ - try: - # Assume Python compatible syntax. - return float(text) - except ValueError: - # Check alternative spellings. - if _FLOAT_INFINITY.match(text): - if text[0] == '-': - return float('-inf') - else: - return float('inf') - elif _FLOAT_NAN.match(text): - return float('nan') - else: - # assume '1.0f' format - try: - return float(text.rstrip('f')) - except ValueError: - raise ValueError('Couldn\'t parse float: %s' % text) - - -def ParseBool(text): - """Parse a boolean value. - - Args: - text: Text to parse. + self.token = self._current_line[0] + self._current_line = self._current_line[1:] + self._SkipWhitespace() - Returns: - Boolean values parsed - Raises: - ValueError: If text is not a valid boolean. - """ - if text in ('true', 't', '1'): - return True - elif text in ('false', 'f', '0'): - return False - else: - raise ValueError('Expected "true" or "false".') +# text.encode('string_escape') does not seem to satisfy our needs as it +# encodes unprintable characters using two-digit hex escapes whereas our +# C++ unescaping function allows hex escapes to be any length. So, +# "\0011".encode('string_escape') ends up being "\\x011", which will be +# decoded in C++ as a single-character string with char code 0x11. +def _CEscape(text): + def escape(c): + o = ord(c) + if o == 10: return r"\n" # optional escape + if o == 13: return r"\r" # optional escape + if o == 9: return r"\t" # optional escape + if o == 39: return r"\'" # optional escape + if o == 34: return r'\"' # necessary escape + if o == 92: return r"\\" # necessary escape -def ParseEnum(field, value): - """Parse an enum value. + if o >= 127 or o < 32: return "\\%03o" % o # necessary escapes + return c + return "".join([escape(c) for c in text]) - The value can be specified by a number (the enum value), or by - a string literal (the enum name). - Args: - field: Enum field descriptor. - value: String value. +_CUNESCAPE_HEX = re.compile('\\\\x([0-9a-fA-F]{2}|[0-9a-f-A-F])') - Returns: - Enum value number. - Raises: - ValueError: If the enum value could not be parsed. - """ - enum_descriptor = field.enum_type - try: - number = int(value, 0) - except ValueError: - # Identifier. - enum_value = enum_descriptor.values_by_name.get(value, None) - if enum_value is None: - raise ValueError( - 'Enum type "%s" has no value named %s.' % ( - enum_descriptor.full_name, value)) - else: - # Numeric value. - enum_value = enum_descriptor.values_by_number.get(number, None) - if enum_value is None: - raise ValueError( - 'Enum type "%s" has no value with number %d.' % ( - enum_descriptor.full_name, number)) - return enum_value.number +def _CUnescape(text): + def ReplaceHex(m): + return chr(int(m.group(0)[2:], 16)) + # This is required because the 'string_escape' encoding doesn't + # allow single-digit hex escapes (like '\xf'). + result = _CUNESCAPE_HEX.sub(ReplaceHex, text) + return result.decode('string_escape') |