summaryrefslogtreecommitdiffstats
path: root/simple/simple-http/src/main/java/org/simpleframework/http/message/FileUploadConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'simple/simple-http/src/main/java/org/simpleframework/http/message/FileUploadConsumer.java')
-rw-r--r--simple/simple-http/src/main/java/org/simpleframework/http/message/FileUploadConsumer.java272
1 files changed, 272 insertions, 0 deletions
diff --git a/simple/simple-http/src/main/java/org/simpleframework/http/message/FileUploadConsumer.java b/simple/simple-http/src/main/java/org/simpleframework/http/message/FileUploadConsumer.java
new file mode 100644
index 0000000..8318013
--- /dev/null
+++ b/simple/simple-http/src/main/java/org/simpleframework/http/message/FileUploadConsumer.java
@@ -0,0 +1,272 @@
+/*
+ * FileUploadConsumer.java February 2013
+ *
+ * Copyright (C) 2013, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.http.message;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.simpleframework.common.buffer.Allocator;
+import org.simpleframework.transport.ByteCursor;
+
+/**
+ * The <code>FileUploadConsumer</code> object is used to consume a
+ * list of parts encoded in the multipart format. This is can consume
+ * any number of parts from a cursor. Each part consumed is added to an
+ * internal part list which can be used to acquire the contents of the
+ * upload and inspect the headers provided for each uploaded part. To
+ * ensure that only a fixed number of bytes are consumed this wraps
+ * the provided cursor with a counter to ensure reads a limited amount.
+ *
+ * @author Niall Gallagher
+ */
+public class FileUploadConsumer implements BodyConsumer {
+
+ /**
+ * This is used to read and parse the contents of the part series.
+ */
+ private final BodyConsumer consumer;
+
+ /**
+ * This counts the number of bytes remaining the the part series.
+ */
+ private final AtomicLong count;
+
+ /**
+ * Constructor for the <code>FileUploadConsumer</code> object.
+ * This is used to create an object that read a series of parts
+ * from a fixed length body. When consuming the body this will not
+ * read any more than the content length from the cursor.
+ *
+ * @param allocator this is the allocator used to allocate buffers
+ * @param boundary this is the boundary that is used by this
+ * @param length this is the number of bytes for this part series
+ */
+ public FileUploadConsumer(Allocator allocator, byte[] boundary, long length) {
+ this.consumer = new PartSeriesConsumer(allocator, boundary, length);
+ this.count = new AtomicLong(length);
+ }
+
+ /**
+ * This is used to acquire the body that has been consumed. This
+ * will return a body which can be used to read the content of
+ * the message, also if the request is multipart upload then all
+ * of the parts are provided as <code>Part</code> objects.
+ * Each part can then be read as an individual message.
+ *
+ * @return the body that has been consumed by this instance
+ */
+ public Body getBody() {
+ return consumer.getBody();
+ }
+
+ /**
+ * This method is used to consume bytes from the provided cursor.
+ * Consuming of bytes from the cursor should be done in such a
+ * way that it does not block. So typically only the number of
+ * ready bytes in the <code>ByteCursor</code> object should be
+ * read. If there are no ready bytes then this will return.
+ *
+ * @param cursor used to consume the bytes from the HTTP pipeline
+ */
+ public void consume(ByteCursor cursor) throws IOException {
+ ByteCounter counter = new ByteCounter(cursor);
+
+ while(counter.isReady()) {
+ if(consumer.isFinished()) {
+ break;
+ }
+ consumer.consume(counter);
+ }
+ }
+
+ /**
+ * This is used to determine whether the consumer has finished
+ * reading. The consumer is considered finished if it has read a
+ * terminal token or if it has exhausted the stream and can not
+ * read any more. Once finished the consumed bytes can be parsed.
+ *
+ * @return true if the consumer has finished reading its content
+ */
+ public boolean isFinished() {
+ long remaining = count.get();
+
+ if(consumer.isFinished()) {
+ return true;
+ }
+ return remaining <= 0;
+ }
+
+ /**
+ * The <code>ByteCounter</code> is a wrapper for a cursor that can
+ * be used to restrict the number of bytes consumed. This will
+ * count the bytes consumed and ensure that any requested data is
+ * restricted to a chunk less than or equal to the remaining bytes.
+ */
+ private class ByteCounter implements ByteCursor {
+
+ /**
+ * This is the cursor that this counter will delegate to.
+ */
+ private final ByteCursor cursor;
+
+ /**
+ * Constructor for the <code>Counter</code> object. This is used
+ * to create a special cursor that counts the bytes read and
+ * limits reads to the remaining bytes left in the part series.
+ *
+ * @param cursor this is the cursor that is delegated to
+ */
+ public ByteCounter(ByteCursor cursor) {
+ this.cursor = cursor;
+ }
+
+ /**
+ * Determines whether the cursor is still open. The cursor is
+ * considered open if there are still bytes to read. If there is
+ * still bytes buffered and the underlying transport is closed
+ * then the cursor is still considered open.
+ *
+ * @return true if the read method does not return a -1 value
+ */
+ public boolean isOpen() throws IOException {
+ return cursor.isOpen();
+ }
+
+ /**
+ * Determines whether the cursor is ready for reading. When the
+ * cursor is ready then it guarantees that some amount of bytes
+ * can be read from the underlying stream without blocking.
+ *
+ * @return true if some data can be read without blocking
+ */
+ public boolean isReady() throws IOException {
+ long limit = count.get();
+
+ if(limit > 0) {
+ return cursor.isReady();
+ }
+ return false;
+ }
+
+ /**
+ * Provides the number of bytes that can be read from the stream
+ * without blocking. This is typically the number of buffered or
+ * available bytes within the stream. When this reaches zero then
+ * the cursor may perform a blocking read.
+ *
+ * @return the number of bytes that can be read without blocking
+ */
+ public int ready() throws IOException {
+ int limit = (int)count.get();
+ int ready = cursor.ready();
+
+ if(ready > limit) {
+ return limit;
+ }
+ return ready;
+ }
+
+ /**
+ * Reads a block of bytes from the underlying stream. This will
+ * read up to the requested number of bytes from the underlying
+ * stream. If there are no ready bytes on the stream this can
+ * return zero, representing the fact that nothing was read.
+ *
+ * @param data this is the array to read the bytes in to
+ *
+ * @return this returns the number of bytes read from the stream
+ */
+ public int read(byte[] data) throws IOException {
+ return read(data, 0, data.length);
+ }
+
+ /**
+ * Reads a block of bytes from the underlying stream. This will
+ * read up to the requested number of bytes from the underlying
+ * stream. If there are no ready bytes on the stream this can
+ * return zero, representing the fact that nothing was read.
+ *
+ * @param data this is the array to read the bytes in to
+ * @param off this is the offset to begin writing the bytes to
+ * @param len this is the number of bytes that are requested
+ *
+ * @return this returns the number of bytes read from the stream
+ */
+ public int read(byte[] data, int off, int len) throws IOException {
+ int limit = (int)count.get();
+ int size = Math.min(limit, len);
+ int chunk = cursor.read(data, off, size);
+
+ if(chunk > 0) {
+ count.addAndGet(-chunk);
+ }
+ return chunk;
+ }
+
+ /**
+ * Pushes the provided data on to the cursor. Data pushed on to
+ * the cursor will be the next data read from the cursor. This
+ * complements the <code>reset</code> method which will reset
+ * the cursors position on a stream. Allowing data to be pushed
+ * on to the cursor allows more flexibility.
+ *
+ * @param data this is the data to be pushed on to the cursor
+ */
+ public void push(byte[] data) throws IOException {
+ push(data, 0, data.length);
+ }
+
+ /**
+ * Pushes the provided data on to the cursor. Data pushed on to
+ * the cursor will be the next data read from the cursor. This
+ * complements the <code>reset</code> method which will reset
+ * the cursors position on a stream. Allowing data to be pushed
+ * on to the cursor allows more flexibility.
+ *
+ * @param data this is the data to be pushed on to the cursor
+ * @param off this is the offset to begin reading the bytes
+ * @param len this is the number of bytes that are to be used
+ */
+ public void push(byte[] data, int off, int len) throws IOException {
+ if(len > 0) {
+ count.addAndGet(len);
+ }
+ cursor.push(data, off, len);
+ }
+
+ /**
+ * Moves the cursor backward within the stream. This ensures
+ * that any bytes read from the last read can be pushed back
+ * in to the stream so that they can be read again. This will
+ * throw an exception if the reset can not be performed.
+ *
+ * @param len this is the number of bytes to reset back
+ *
+ * @return this is the number of bytes that have been reset
+ */
+ public int reset(int len) throws IOException {
+ int reset = cursor.reset(len);
+
+ if(reset > 0) {
+ count.addAndGet(reset);
+ }
+ return reset;
+ }
+ }
+}