summaryrefslogtreecommitdiffstats
path: root/python/google/protobuf/internal/encoder_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/google/protobuf/internal/encoder_test.py')
-rwxr-xr-xpython/google/protobuf/internal/encoder_test.py286
1 files changed, 286 insertions, 0 deletions
diff --git a/python/google/protobuf/internal/encoder_test.py b/python/google/protobuf/internal/encoder_test.py
new file mode 100755
index 0000000..bf75ea8
--- /dev/null
+++ b/python/google/protobuf/internal/encoder_test.py
@@ -0,0 +1,286 @@
+#! /usr/bin/python
+#
+# Protocol Buffers - Google's data interchange format
+# Copyright 2008 Google Inc. All rights reserved.
+# http://code.google.com/p/protobuf/
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Test for google.protobuf.internal.encoder."""
+
+__author__ = 'robinson@google.com (Will Robinson)'
+
+import struct
+import logging
+import unittest
+from google.protobuf.internal import wire_format
+from google.protobuf.internal import encoder
+from google.protobuf.internal import output_stream
+from google.protobuf import message
+import mox
+
+
+class EncoderTest(unittest.TestCase):
+
+ def setUp(self):
+ self.mox = mox.Mox()
+ self.encoder = encoder.Encoder()
+ self.mock_stream = self.mox.CreateMock(output_stream.OutputStream)
+ self.mock_message = self.mox.CreateMock(message.Message)
+ self.encoder._stream = self.mock_stream
+
+ def PackTag(self, field_number, wire_type):
+ return wire_format.PackTag(field_number, wire_type)
+
+ def AppendScalarTestHelper(self, test_name, encoder_method,
+ expected_stream_method_name,
+ wire_type, field_value,
+ expected_value=None, expected_length=None,
+ is_tag_test=True):
+ """Helper for testAppendScalars.
+
+ Calls one of the Encoder methods, and ensures that the Encoder
+ in turn makes the expected calls into its OutputStream.
+
+ Args:
+ test_name: Name of this test, used only for logging.
+ encoder_method: Callable on self.encoder. This is the Encoder
+ method we're testing. If is_tag_test=True, the encoder method
+ accepts a field_number and field_value. if is_tag_test=False,
+ the encoder method accepts a field_value.
+ expected_stream_method_name: (string) Name of the OutputStream
+ method we expect Encoder to call to actually put the value
+ on the wire.
+ wire_type: The WIRETYPE_* constant we expect encoder to
+ use in the specified encoder_method.
+ field_value: The value we're trying to encode. Passed
+ into encoder_method.
+ expected_value: The value we expect Encoder to pass into
+ the OutputStream method. If None, we expect field_value
+ to pass through unmodified.
+ expected_length: The length we expect Encoder to pass to the
+ AppendVarUInt32 method. If None we expect the length of the
+ field_value.
+ is_tag_test: A Boolean. If True (the default), we append the
+ the packed field number and wire_type to the stream before
+ the field value.
+ """
+ if expected_value is None:
+ expected_value = field_value
+
+ logging.info('Testing %s scalar output.\n'
+ 'Calling %r(%r), and expecting that to call the '
+ 'stream method %s(%r).' % (
+ test_name, encoder_method, field_value,
+ expected_stream_method_name, expected_value))
+
+ if is_tag_test:
+ field_number = 10
+ # Should first append the field number and type information.
+ self.mock_stream.AppendVarUInt32(self.PackTag(field_number, wire_type))
+ # If we're length-delimited, we should then append the length.
+ if wire_type == wire_format.WIRETYPE_LENGTH_DELIMITED:
+ if expected_length is None:
+ expected_length = len(field_value)
+ self.mock_stream.AppendVarUInt32(expected_length)
+
+ # Should then append the value itself.
+ # We have to use names instead of methods to work around some
+ # mox weirdness. (ResetAll() is overzealous).
+ expected_stream_method = getattr(self.mock_stream,
+ expected_stream_method_name)
+ expected_stream_method(expected_value)
+
+ self.mox.ReplayAll()
+ if is_tag_test:
+ encoder_method(field_number, field_value)
+ else:
+ encoder_method(field_value)
+ self.mox.VerifyAll()
+ self.mox.ResetAll()
+
+ VAL = 1.125 # Perfectly representable as a float (no rounding error).
+ LITTLE_FLOAT_VAL = '\x00\x00\x90?'
+ LITTLE_DOUBLE_VAL = '\x00\x00\x00\x00\x00\x00\xf2?'
+
+ def testAppendScalars(self):
+ utf8_bytes = '\xd0\xa2\xd0\xb5\xd1\x81\xd1\x82'
+ utf8_string = unicode(utf8_bytes, 'utf-8')
+ scalar_tests = [
+ ['int32', self.encoder.AppendInt32, 'AppendVarint32',
+ wire_format.WIRETYPE_VARINT, 0],
+ ['int64', self.encoder.AppendInt64, 'AppendVarint64',
+ wire_format.WIRETYPE_VARINT, 0],
+ ['uint32', self.encoder.AppendUInt32, 'AppendVarUInt32',
+ wire_format.WIRETYPE_VARINT, 0],
+ ['uint64', self.encoder.AppendUInt64, 'AppendVarUInt64',
+ wire_format.WIRETYPE_VARINT, 0],
+ ['fixed32', self.encoder.AppendFixed32, 'AppendLittleEndian32',
+ wire_format.WIRETYPE_FIXED32, 0],
+ ['fixed64', self.encoder.AppendFixed64, 'AppendLittleEndian64',
+ wire_format.WIRETYPE_FIXED64, 0],
+ ['sfixed32', self.encoder.AppendSFixed32, 'AppendLittleEndian32',
+ wire_format.WIRETYPE_FIXED32, -1, 0xffffffff],
+ ['sfixed64', self.encoder.AppendSFixed64, 'AppendLittleEndian64',
+ wire_format.WIRETYPE_FIXED64, -1, 0xffffffffffffffff],
+ ['float', self.encoder.AppendFloat, 'AppendRawBytes',
+ wire_format.WIRETYPE_FIXED32, self.VAL, self.LITTLE_FLOAT_VAL],
+ ['double', self.encoder.AppendDouble, 'AppendRawBytes',
+ wire_format.WIRETYPE_FIXED64, self.VAL, self.LITTLE_DOUBLE_VAL],
+ ['bool', self.encoder.AppendBool, 'AppendVarint32',
+ wire_format.WIRETYPE_VARINT, False],
+ ['enum', self.encoder.AppendEnum, 'AppendVarint32',
+ wire_format.WIRETYPE_VARINT, 0],
+ ['string', self.encoder.AppendString, 'AppendRawBytes',
+ wire_format.WIRETYPE_LENGTH_DELIMITED,
+ "You're in a maze of twisty little passages, all alike."],
+ ['utf8-string', self.encoder.AppendString, 'AppendRawBytes',
+ wire_format.WIRETYPE_LENGTH_DELIMITED, utf8_string,
+ utf8_bytes, len(utf8_bytes)],
+ # We test zigzag encoding routines more extensively below.
+ ['sint32', self.encoder.AppendSInt32, 'AppendVarUInt32',
+ wire_format.WIRETYPE_VARINT, -1, 1],
+ ['sint64', self.encoder.AppendSInt64, 'AppendVarUInt64',
+ wire_format.WIRETYPE_VARINT, -1, 1],
+ ]
+ # Ensure that we're testing different Encoder methods and using
+ # different test names in all test cases above.
+ self.assertEqual(len(scalar_tests), len(set(t[0] for t in scalar_tests)))
+ self.assert_(len(scalar_tests) >= len(set(t[1] for t in scalar_tests)))
+ for args in scalar_tests:
+ self.AppendScalarTestHelper(*args)
+
+ def testAppendScalarsWithoutTags(self):
+ scalar_no_tag_tests = [
+ ['int32', self.encoder.AppendInt32NoTag, 'AppendVarint32', None, 0],
+ ['int64', self.encoder.AppendInt64NoTag, 'AppendVarint64', None, 0],
+ ['uint32', self.encoder.AppendUInt32NoTag, 'AppendVarUInt32', None, 0],
+ ['uint64', self.encoder.AppendUInt64NoTag, 'AppendVarUInt64', None, 0],
+ ['fixed32', self.encoder.AppendFixed32NoTag,
+ 'AppendLittleEndian32', None, 0],
+ ['fixed64', self.encoder.AppendFixed64NoTag,
+ 'AppendLittleEndian64', None, 0],
+ ['sfixed32', self.encoder.AppendSFixed32NoTag,
+ 'AppendLittleEndian32', None, 0],
+ ['sfixed64', self.encoder.AppendSFixed64NoTag,
+ 'AppendLittleEndian64', None, 0],
+ ['float', self.encoder.AppendFloatNoTag,
+ 'AppendRawBytes', None, self.VAL, self.LITTLE_FLOAT_VAL],
+ ['double', self.encoder.AppendDoubleNoTag,
+ 'AppendRawBytes', None, self.VAL, self.LITTLE_DOUBLE_VAL],
+ ['bool', self.encoder.AppendBoolNoTag, 'AppendVarint32', None, 0],
+ ['enum', self.encoder.AppendEnumNoTag, 'AppendVarint32', None, 0],
+ ['sint32', self.encoder.AppendSInt32NoTag,
+ 'AppendVarUInt32', None, -1, 1],
+ ['sint64', self.encoder.AppendSInt64NoTag,
+ 'AppendVarUInt64', None, -1, 1],
+ ]
+
+ self.assertEqual(len(scalar_no_tag_tests),
+ len(set(t[0] for t in scalar_no_tag_tests)))
+ self.assert_(len(scalar_no_tag_tests) >=
+ len(set(t[1] for t in scalar_no_tag_tests)))
+ for args in scalar_no_tag_tests:
+ # For no tag tests, the wire_type is not used, so we put in None.
+ self.AppendScalarTestHelper(is_tag_test=False, *args)
+
+ def testAppendGroup(self):
+ field_number = 23
+ # Should first append the start-group marker.
+ self.mock_stream.AppendVarUInt32(
+ self.PackTag(field_number, wire_format.WIRETYPE_START_GROUP))
+ # Should then serialize itself.
+ self.mock_message.SerializeToString().AndReturn('foo')
+ self.mock_stream.AppendRawBytes('foo')
+ # Should finally append the end-group marker.
+ self.mock_stream.AppendVarUInt32(
+ self.PackTag(field_number, wire_format.WIRETYPE_END_GROUP))
+
+ self.mox.ReplayAll()
+ self.encoder.AppendGroup(field_number, self.mock_message)
+ self.mox.VerifyAll()
+
+ def testAppendMessage(self):
+ field_number = 23
+ byte_size = 42
+ # Should first append the field number and type information.
+ self.mock_stream.AppendVarUInt32(
+ self.PackTag(field_number, wire_format.WIRETYPE_LENGTH_DELIMITED))
+ # Should then append its length.
+ self.mock_message.ByteSize().AndReturn(byte_size)
+ self.mock_stream.AppendVarUInt32(byte_size)
+ # Should then serialize itself to the encoder.
+ self.mock_message.SerializeToString().AndReturn('foo')
+ self.mock_stream.AppendRawBytes('foo')
+
+ self.mox.ReplayAll()
+ self.encoder.AppendMessage(field_number, self.mock_message)
+ self.mox.VerifyAll()
+
+ def testAppendMessageSetItem(self):
+ field_number = 23
+ byte_size = 42
+ # Should first append the field number and type information.
+ self.mock_stream.AppendVarUInt32(
+ self.PackTag(1, wire_format.WIRETYPE_START_GROUP))
+ self.mock_stream.AppendVarUInt32(
+ self.PackTag(2, wire_format.WIRETYPE_VARINT))
+ self.mock_stream.AppendVarint32(field_number)
+ self.mock_stream.AppendVarUInt32(
+ self.PackTag(3, wire_format.WIRETYPE_LENGTH_DELIMITED))
+ # Should then append its length.
+ self.mock_message.ByteSize().AndReturn(byte_size)
+ self.mock_stream.AppendVarUInt32(byte_size)
+ # Should then serialize itself to the encoder.
+ self.mock_message.SerializeToString().AndReturn('foo')
+ self.mock_stream.AppendRawBytes('foo')
+ self.mock_stream.AppendVarUInt32(
+ self.PackTag(1, wire_format.WIRETYPE_END_GROUP))
+
+ self.mox.ReplayAll()
+ self.encoder.AppendMessageSetItem(field_number, self.mock_message)
+ self.mox.VerifyAll()
+
+ def testAppendSFixed(self):
+ # Most of our bounds-checking is done in output_stream.py,
+ # but encoder.py is responsible for transforming signed
+ # fixed-width integers into unsigned ones, so we test here
+ # to ensure that we're not losing any entropy when we do
+ # that conversion.
+ field_number = 10
+ self.assertRaises(message.EncodeError, self.encoder.AppendSFixed32,
+ 10, wire_format.UINT32_MAX + 1)
+ self.assertRaises(message.EncodeError, self.encoder.AppendSFixed32,
+ 10, -(1 << 32))
+ self.assertRaises(message.EncodeError, self.encoder.AppendSFixed64,
+ 10, wire_format.UINT64_MAX + 1)
+ self.assertRaises(message.EncodeError, self.encoder.AppendSFixed64,
+ 10, -(1 << 64))
+
+
+if __name__ == '__main__':
+ unittest.main()