diff --git a/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java b/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java index 399e126dc..0696cd54c 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java +++ b/msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java @@ -940,12 +940,13 @@ public String unpackString() if (len > stringSizeLimit) { throw new MessageSizeException(String.format("cannot unpack a String of size larger than %,d: %,d", stringSizeLimit, len), len); } + + resetDecoder(); // should be invoked only once per value + if (buffer.size() - position >= len) { return decodeStringFastPath(len); } - resetDecoder(); - try { int rawRemaining = len; while (rawRemaining > 0) { @@ -1039,10 +1040,7 @@ private String decodeStringFastPath(int length) return s; } else { - resetDecoder(); - ByteBuffer bb = buffer.sliceAsByteBuffer(); - bb.limit(position + length); - bb.position(position); + ByteBuffer bb = buffer.sliceAsByteBuffer(position, length); CharBuffer cb; try { cb = decoder.decode(bb); diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java new file mode 100644 index 000000000..10b91d20a --- /dev/null +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java @@ -0,0 +1,81 @@ +// +// MessagePack for Java +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package org.msgpack.core.buffer; + +import java.io.IOException; +import java.util.Enumeration; + +import static org.msgpack.core.Preconditions.checkNotNull; + +/** + * {@link MessageBufferInput} adapter for {@link MessageBufferInput} Enumeration + */ +public class SequenceMessageBufferInput + implements MessageBufferInput +{ + private Enumeration sequence; + private MessageBufferInput input; + + public SequenceMessageBufferInput(Enumeration sequence) + { + this.sequence = checkNotNull(sequence, "input sequence is null"); + try { + nextInput(); + } + catch (IOException ignore) { + } + } + + @Override + public MessageBuffer next() throws IOException + { + if (input == null) { + return null; + } + MessageBuffer buffer = input.next(); + if (buffer == null) { + nextInput(); + return next(); + } + + return buffer; + } + + private void nextInput() throws IOException + { + if (input != null) { + input.close(); + } + + if (sequence.hasMoreElements()) { + input = sequence.nextElement(); + if (input == null) { + throw new NullPointerException("An element in the MessageBufferInput sequence is null"); + } + } + else { + input = null; + } + } + + @Override + public void close() throws IOException + { + do { + nextInput(); + } while (input != null); + } +} diff --git a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala index 1c8864c7c..db512373b 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala @@ -17,11 +17,13 @@ package org.msgpack.core import java.io._ import java.nio.ByteBuffer +import java.util.Collections import org.msgpack.core.buffer._ import org.msgpack.value.ValueType import xerial.core.io.IOUtil._ +import scala.collection.JavaConversions._ import scala.util.Random object MessageUnpackerTest { @@ -205,6 +207,34 @@ class MessageUnpackerTest extends MessagePackSpec { builder.result() } + def unpackerCollectionWithVariousBuffers(data: Array[Byte], chunkSize: Int) : Seq[MessageUnpacker] = { + val seqBytes = Seq.newBuilder[MessageBufferInput] + val seqByteBuffers = Seq.newBuilder[MessageBufferInput] + val seqDirectBuffers = Seq.newBuilder[MessageBufferInput] + var left = data.length + var position = 0 + while (left > 0) { + val length = Math.min(chunkSize, left) + seqBytes += new ArrayBufferInput(data, position, length) + val bb = ByteBuffer.allocate(length) + val db = ByteBuffer.allocateDirect(length) + bb.put(data, position, length).flip() + db.put(data, position, length).flip() + seqByteBuffers += new ByteBufferInput(bb) + seqDirectBuffers += new ByteBufferInput(db) + left -= length + position += length + } + val builder = Seq.newBuilder[MessageUnpacker] + builder += MessagePack.newDefaultUnpacker(new SequenceMessageBufferInput(Collections.enumeration(seqBytes.result()))) + builder += MessagePack.newDefaultUnpacker(new SequenceMessageBufferInput(Collections.enumeration(seqByteBuffers.result()))) + if (!universal) { + builder += MessagePack.newDefaultUnpacker(new SequenceMessageBufferInput(Collections.enumeration(seqDirectBuffers.result()))) + } + + builder.result() + } + "MessageUnpacker" should { "parse message packed data" taggedAs ("unpack") in { @@ -330,21 +360,50 @@ class MessageUnpackerTest extends MessagePackSpec { new SplitTest {val data = testData3(30)}.run } - "read numeric data at buffer boundary" taggedAs("boundary2") in { + "read integer at MessageBuffer boundaries" taggedAs("integer-buffer-boundary") in { val packer = MessagePack.newDefaultBufferPacker() (0 until 1170).foreach{i => packer.packLong(0x0011223344556677L) - packer.packString("hello") } packer.close val data = packer.toByteArray - val unpacker = MessagePack.newDefaultUnpacker(new InputStreamBufferInput(new ByteArrayInputStream(data), 8192)) - (0 until 1170).foreach { i => - unpacker.unpackLong() shouldBe 0x0011223344556677L - unpacker.unpackString() shouldBe "hello" + // Boundary test + withResource(MessagePack.newDefaultUnpacker(new InputStreamBufferInput(new ByteArrayInputStream(data), 8192))) { unpacker => + (0 until 1170).foreach { i => + unpacker.unpackLong() shouldBe 0x0011223344556677L + } + } + + // Boundary test for sequences of ByteBuffer, DirectByteBuffer backed MessageInput. + for (unpacker <- unpackerCollectionWithVariousBuffers(data, 32)) { + (0 until 1170).foreach { i => + unpacker.unpackLong() shouldBe 0x0011223344556677L + } + } + } + + "read string at MessageBuffer boundaries" taggedAs("string-buffer-boundary") in { + val packer = MessagePack.newDefaultBufferPacker() + (0 until 1170).foreach{i => + packer.packString("hello world") + } + packer.close + val data = packer.toByteArray + + // Boundary test + withResource(MessagePack.newDefaultUnpacker(new InputStreamBufferInput(new ByteArrayInputStream(data), 8192))) { unpacker => + (0 until 1170).foreach { i => + unpacker.unpackString() shouldBe "hello world" + } + } + + // Boundary test for sequences of ByteBuffer, DirectByteBuffer backed MessageInput. + for (unpacker <- unpackerCollectionWithVariousBuffers(data, 32)) { + (0 until 1170).foreach { i => + unpacker.unpackString() shouldBe "hello world" + } } - unpacker.close() } "be faster then msgpack-v6 skip" taggedAs ("cmp-skip") in {