summaryrefslogtreecommitdiffstats
path: root/o3d/import
diff options
context:
space:
mode:
authorapatrick@google.com <apatrick@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2009-08-03 19:49:35 +0000
committerapatrick@google.com <apatrick@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2009-08-03 19:49:35 +0000
commit91240c947358b33cc19c0923a06d116ab36737dd (patch)
tree7897c99ad9b41d7735c7a42e0ea75fe6f76760b3 /o3d/import
parent2e25a9f4152a90a179ad37206bff79e1198379d7 (diff)
downloadchromium_src-91240c947358b33cc19c0923a06d116ab36737dd.zip
chromium_src-91240c947358b33cc19c0923a06d116ab36737dd.tar.gz
chromium_src-91240c947358b33cc19c0923a06d116ab36737dd.tar.bz2
Asynchronous tick now uses NPN_PluginAsyncCall.URL streaming callbacks are now also asynchronous.Implemented NPN_PluginAsyncCall for IE.Allowed WM_PAINT handler to be reentered because it no longer calls into the browser (except to schedule an asynchronous tick if none is pending).Fixed a bug where the EventManager would crash if an event callback called cleanUp on the client.Cleanup destroys all the packs. Doing this in NPP_Destroy seems to make Chrome timeout and fail to load the next page.Tar and GZ decoding happens on a new thread.
Review URL: http://codereview.chromium.org/155733 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@22305 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'o3d/import')
-rw-r--r--o3d/import/archive.gyp5
-rw-r--r--o3d/import/build.scons2
-rw-r--r--o3d/import/cross/archive_processor.cc71
-rw-r--r--o3d/import/cross/archive_processor.h22
-rw-r--r--o3d/import/cross/archive_request.cc64
-rw-r--r--o3d/import/cross/archive_request.h21
-rw-r--r--o3d/import/cross/file_output_stream_processor.cc14
-rw-r--r--o3d/import/cross/file_output_stream_processor.h6
-rw-r--r--o3d/import/cross/gz_compressor.cc44
-rw-r--r--o3d/import/cross/gz_compressor.h13
-rw-r--r--o3d/import/cross/gz_compressor_test.cc76
-rw-r--r--o3d/import/cross/gz_decompressor.cc35
-rw-r--r--o3d/import/cross/gz_decompressor.h5
-rw-r--r--o3d/import/cross/gz_decompressor_test.cc52
-rw-r--r--o3d/import/cross/iarchive_generator.h5
-rw-r--r--o3d/import/cross/main_thread_archive_callback_client.cc138
-rw-r--r--o3d/import/cross/main_thread_archive_callback_client.h85
-rw-r--r--o3d/import/cross/main_thread_archive_callback_client_test.cc219
-rw-r--r--o3d/import/cross/memory_stream.h16
-rw-r--r--o3d/import/cross/tar_generator.cc3
-rw-r--r--o3d/import/cross/tar_generator.h7
-rw-r--r--o3d/import/cross/tar_generator_test.cc41
-rw-r--r--o3d/import/cross/tar_processor.cc23
-rw-r--r--o3d/import/cross/tar_processor.h4
-rw-r--r--o3d/import/cross/tar_processor_test.cc34
-rw-r--r--o3d/import/cross/targz_generator.cc18
-rw-r--r--o3d/import/cross/targz_generator.h16
-rw-r--r--o3d/import/cross/targz_generator_test.cc36
-rw-r--r--o3d/import/cross/targz_processor.cc11
-rw-r--r--o3d/import/cross/targz_processor.h8
-rw-r--r--o3d/import/cross/targz_processor_test.cc41
-rw-r--r--o3d/import/cross/threaded_stream_processor.cc116
-rw-r--r--o3d/import/cross/threaded_stream_processor.h72
-rw-r--r--o3d/import/cross/threaded_stream_processor_test.cc155
34 files changed, 1228 insertions, 250 deletions
diff --git a/o3d/import/archive.gyp b/o3d/import/archive.gyp
index 73e12fb..b2e2fc6 100644
--- a/o3d/import/archive.gyp
+++ b/o3d/import/archive.gyp
@@ -37,6 +37,8 @@
'cross/memory_buffer.h',
'cross/memory_stream.cc',
'cross/memory_stream.h',
+ 'cross/main_thread_archive_callback_client.cc',
+ 'cross/main_thread_archive_callback_client.h',
'cross/raw_data.cc',
'cross/raw_data.h',
'cross/tar_processor.cc',
@@ -44,6 +46,8 @@
'cross/targz_generator.h',
'cross/targz_processor.cc',
'cross/targz_processor.h',
+ 'cross/threaded_stream_processor.cc',
+ 'cross/threaded_stream_processor.h',
],
},
{
@@ -61,6 +65,7 @@
'cross/raw_data_test.cc',
'cross/tar_processor_test.cc',
'cross/targz_processor_test.cc',
+ 'thread_stream_processor_test.cc',
],
},
'copies': [
diff --git a/o3d/import/build.scons b/o3d/import/build.scons
index c0789032..8aa5a88 100644
--- a/o3d/import/build.scons
+++ b/o3d/import/build.scons
@@ -84,9 +84,11 @@ archive_inputs = [
'cross/archive_request.cc',
'cross/gz_decompressor.cc',
'cross/memory_stream.cc',
+ 'cross/main_thread_archive_callback_client.cc',
'cross/raw_data.cc',
'cross/tar_processor.cc',
'cross/targz_processor.cc',
+ 'cross/threaded_stream_processor.cc',
]
conditioner_inputs = ['cross/collada_conditioner.cc']
diff --git a/o3d/import/cross/archive_processor.cc b/o3d/import/cross/archive_processor.cc
index 8601695..a10af1b 100644
--- a/o3d/import/cross/archive_processor.cc
+++ b/o3d/import/cross/archive_processor.cc
@@ -41,72 +41,30 @@ const int kChunkSize = 16384;
namespace o3d {
-#ifdef _DEBUG
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-// For debugging only, report a zlib or i/o error
-void zerr(int ret) {
- LOG(ERROR) << "ArchiveProcessor: ";
-
- switch (ret) {
- case Z_ERRNO:
- if (ferror(stdin))
- LOG(ERROR) << "error reading stdin\n";
- if (ferror(stdout))
- LOG(ERROR) << "error writing stdout\n";
- break;
- case Z_STREAM_ERROR:
- LOG(ERROR) << "invalid compression level\n";
- break;
- case Z_DATA_ERROR:
- LOG(ERROR) << "invalid or incomplete deflate data\n";
- break;
- case Z_MEM_ERROR:
- LOG(ERROR) << "out of memory\n";
- break;
- case Z_VERSION_ERROR:
- LOG(ERROR) << "zlib version mismatch!\n";
- break;
- }
-}
-#endif // _DEBUG
-
-// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-int ArchiveProcessor::ProcessEntireStream(MemoryReadStream *stream) {
- int result = Z_OK;
- bool has_error = false;
+StreamProcessor::Status ArchiveProcessor::ProcessEntireStream(
+ MemoryReadStream *stream) {
+ Status status;
// decompress until deflate stream ends or error
do {
int remaining = stream->GetRemainingByteCount();
int process_this_time = remaining < kChunkSize ? remaining : kChunkSize;
- result = ProcessCompressedBytes(stream, process_this_time);
-
- has_error = (result != Z_OK && result != Z_STREAM_END);
+ status = ProcessBytes(stream, process_this_time);
+ } while (status == IN_PROGRESS);
-#ifdef _DEBUG
- if (has_error) {
- zerr(result);
- }
-#endif
- } while (result != Z_STREAM_END && !has_error);
-
- if (result == Z_STREAM_END) {
- // if we got to the end of stream, then we're good...
- result = Z_OK;
- }
-
- return result;
+ return status;
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-int ArchiveProcessor::ProcessFile(const char *filename) {
+StreamProcessor::Status ArchiveProcessor::ProcessFile(const char *filename) {
struct stat file_info;
int result = stat(filename, &file_info);
- if (result != 0) return -1;
+ if (result != 0) return FAILURE;
int file_length = file_info.st_size;
- if (file_length == 0) return -1;
+ if (file_length == 0) return FAILURE;
MemoryBuffer<uint8> buffer;
buffer.Allocate(file_length);
@@ -115,20 +73,13 @@ int ArchiveProcessor::ProcessFile(const char *filename) {
// Test by reading in a tar.gz file and sending through the
// progressive streaming system
FILE *fp = fopen(filename, "rb");
- if (!fp) return -1; // can't open file!
+ if (!fp) return FAILURE; // can't open file!
fread(p, sizeof(uint8), file_length, fp);
fclose(fp);
MemoryReadStream stream(p, file_length);
- result = ProcessEntireStream(&stream);
-
- if (result == Z_STREAM_END) {
- // if we got to the end of stream, then we're good...
- result = Z_OK;
- }
-
- return result;
+ return ProcessEntireStream(&stream);
}
} // namespace o3d
diff --git a/o3d/import/cross/archive_processor.h b/o3d/import/cross/archive_processor.h
index 8380c37..d404249 100644
--- a/o3d/import/cross/archive_processor.h
+++ b/o3d/import/cross/archive_processor.h
@@ -53,8 +53,6 @@ class ArchiveFileInfo {
private:
std::string filename_;
int file_size_;
-
- DISALLOW_COPY_AND_ASSIGN(ArchiveFileInfo);
};
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -63,10 +61,11 @@ class ArchiveCallbackClient {
virtual ~ArchiveCallbackClient() {}
virtual void ReceiveFileHeader(const ArchiveFileInfo &file_info) = 0;
virtual bool ReceiveFileData(MemoryReadStream *stream, size_t nbytes) = 0;
+ virtual void Close(bool success) = 0;
};
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-class ArchiveProcessor {
+class ArchiveProcessor: public StreamProcessor {
public:
explicit ArchiveProcessor(ArchiveCallbackClient *callback_client) {}
@@ -75,25 +74,16 @@ class ArchiveProcessor {
// Call to "push" bytes into the processor. They will be decompressed and
// the appropriate callbacks on |callback_client| will happen
// as files come in...
- //
- // Return values (using zlib error codes):
- // Z_OK : Processing was successful - but not yet done
- // Z_STREAM_END : We're done - archive is completely/successfully processed
- // any other value indicates an error condition
- //
- // Note: even archive formats not based on zlib should use these codes
- // (Z_OK, Z_STREAM_END)
- //
- virtual int ProcessCompressedBytes(MemoryReadStream *stream,
- size_t bytes_to_process) = 0;
+ virtual Status ProcessBytes(MemoryReadStream *stream,
+ size_t bytes_to_process) = 0;
// Decompresses the complete file archive, making file callbacks as the files
// come in...
- virtual int ProcessFile(const char *filename);
+ virtual Status ProcessFile(const char *filename);
// Decompresses the complete archive from memory,
// making file callbacks as the files come in...
- virtual int ProcessEntireStream(MemoryReadStream *stream);
+ virtual Status ProcessEntireStream(MemoryReadStream *stream);
protected:
DISALLOW_COPY_AND_ASSIGN(ArchiveProcessor);
diff --git a/o3d/import/cross/archive_request.cc b/o3d/import/cross/archive_request.cc
index 84448e4..c02e1a5 100644
--- a/o3d/import/cross/archive_request.cc
+++ b/o3d/import/cross/archive_request.cc
@@ -33,8 +33,11 @@
#include "import/cross/archive_request.h"
-#include "import/cross/targz_processor.h"
#include "core/cross/pack.h"
+#include "core/cross/imain_thread_task_poster.h"
+#include "import/cross/targz_processor.h"
+#include "import/cross/main_thread_archive_callback_client.h"
+#include "import/cross/threaded_stream_processor.h"
#define DEBUG_ARCHIVE_CALLBACKS 0
@@ -61,12 +64,25 @@ ArchiveRequest::ArchiveRequest(ServiceLocator* service_locator,
ready_state_(0),
stream_length_(0),
bytes_received_(0) {
- archive_processor_ = new TarGzProcessor(this);
+ IMainThreadTaskPoster* main_thread_task_poster =
+ service_locator->GetService<IMainThreadTaskPoster>();
+ if (main_thread_task_poster->IsSupported()) {
+ main_thread_archive_callback_client_ = new MainThreadArchiveCallbackClient(
+ service_locator, this);
+ extra_processor_ = new TarGzProcessor(main_thread_archive_callback_client_);
+ archive_processor_ = new ThreadedStreamProcessor(extra_processor_);
+ } else {
+ main_thread_archive_callback_client_ = NULL;
+ extra_processor_ = NULL;
+ archive_processor_ = new TarGzProcessor(this);
+ }
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
ArchiveRequest::~ArchiveRequest() {
delete archive_processor_;
+ delete extra_processor_;
+ delete main_thread_archive_callback_client_;
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -85,7 +101,7 @@ void ArchiveRequest::NewStreamCallback(DownloadStream *stream) {
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
int32 ArchiveRequest::WriteReadyCallback(DownloadStream *stream) {
// Setting this too high causes Firefox to timeout in the Write callback.
- return 1024;
+ return 128 * 1024;
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -99,10 +115,10 @@ int32 ArchiveRequest::WriteCallback(DownloadStream *stream,
MemoryReadStream memory_stream(reinterpret_cast<uint8*>(data), length);
// Progressively decompress the bytes we've just been given
- int result =
- archive_processor_->ProcessCompressedBytes(&memory_stream, length);
+ StreamProcessor::Status status =
+ archive_processor_->ProcessBytes(&memory_stream, length);
- if (result != Z_OK && result != Z_STREAM_END) {
+ if (status == StreamProcessor::FAILURE) {
set_success(false);
set_error("Invalid gzipped tar file");
stream->Cancel(); // tell the browser to stop downloading
@@ -120,21 +136,7 @@ void ArchiveRequest::FinishedCallback(DownloadStream *stream,
bool success,
const std::string &filename,
const std::string &mime_type) {
- set_ready_state(ArchiveRequest::STATE_LOADED);
-
- // Since the standard codes only go far enough to tell us that the download
- // succeeded, we set the success [and implicitly the done] flags to give the
- // rest of the story.
- set_success(success);
- if (!success) {
- // I have no idea if an error is already set here but one MUST be set
- // so let's check.
- if (error().empty()) {
- set_error(String("Could not download archive: ") + uri());
- }
- }
- if (onreadystatechange())
- onreadystatechange()->Run();
+ archive_processor_->Close(success);
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -212,7 +214,7 @@ bool ArchiveRequest::ReceiveFileData(MemoryReadStream *input_stream,
return false;
}
} else {
- onfileavailable()->Run();
+ onfileavailable()->Run(raw_data_);
}
// If data hasn't been discarded (inside callback) then writes out to
@@ -227,4 +229,22 @@ bool ArchiveRequest::ReceiveFileData(MemoryReadStream *input_stream,
return true;
}
+void ArchiveRequest::Close(bool success) {
+ set_ready_state(ArchiveRequest::STATE_LOADED);
+
+ // Since the standard codes only go far enough to tell us that the download
+ // succeeded, we set the success [and implicitly the done] flags to give the
+ // rest of the story.
+ set_success(success);
+ if (!success) {
+ // I have no idea if an error is already set here but one MUST be set
+ // so let's check.
+ if (error().empty()) {
+ set_error(String("Could not download archive: ") + uri());
+ }
+ }
+ if (onreadystatechange())
+ onreadystatechange()->Run();
+}
+
} // namespace o3d
diff --git a/o3d/import/cross/archive_request.h b/o3d/import/cross/archive_request.h
index 0a73864..65158bb 100644
--- a/o3d/import/cross/archive_request.h
+++ b/o3d/import/cross/archive_request.h
@@ -50,7 +50,8 @@
namespace o3d {
-typedef Closure ArchiveRequestCallback;
+typedef Closure ArchiveReadyStateChangeCallback;
+typedef Callback1<RawData*> ArchiveFileAvailableCallback;
// An ArchiveRequest object is used to carry out an asynchronous request
// for a file to be loaded.
@@ -104,22 +105,24 @@ class ArchiveRequest : public ObjectBase, public ArchiveCallbackClient {
// ArchiveCallbackClient methods
virtual void ReceiveFileHeader(const ArchiveFileInfo &file_info);
virtual bool ReceiveFileData(MemoryReadStream *stream, size_t nbytes);
+ virtual void Close(bool success);
Pack *pack() {
return pack_.Get(); // Set at creation time and never changed.
}
- ArchiveRequestCallback *onfileavailable() {
+ ArchiveFileAvailableCallback *onfileavailable() {
return onfileavailable_.get();
}
- void set_onfileavailable(ArchiveRequestCallback *onfileavailable) {
+ void set_onfileavailable(ArchiveFileAvailableCallback *onfileavailable) {
onfileavailable_.reset(onfileavailable);
}
- ArchiveRequestCallback *onreadystatechange() {
+ ArchiveReadyStateChangeCallback *onreadystatechange() {
return onreadystatechange_.get();
}
- void set_onreadystatechange(ArchiveRequestCallback *onreadystatechange) {
+ void set_onreadystatechange(
+ ArchiveReadyStateChangeCallback *onreadystatechange) {
onreadystatechange_.reset(onreadystatechange);
}
@@ -174,8 +177,8 @@ class ArchiveRequest : public ObjectBase, public ArchiveCallbackClient {
ArchiveRequest(ServiceLocator* service_locator, Pack *pack);
Pack::Ref pack_;
- scoped_ptr<ArchiveRequestCallback> onreadystatechange_;
- scoped_ptr<ArchiveRequestCallback> onfileavailable_;
+ scoped_ptr<ArchiveReadyStateChangeCallback> onreadystatechange_;
+ scoped_ptr<ArchiveFileAvailableCallback> onfileavailable_;
String uri_;
// Request state
@@ -184,7 +187,9 @@ class ArchiveRequest : public ObjectBase, public ArchiveCallbackClient {
int ready_state_; // Like the XMLHttpRequest variable of the same name.
String error_; // Set after completion on failure.
- TarGzProcessor *archive_processor_;
+ StreamProcessor *archive_processor_;
+ StreamProcessor *extra_processor_;
+ ArchiveCallbackClient* main_thread_archive_callback_client_;
std::vector<RawData::Ref> raw_data_list_;
RawData::Ref raw_data_;
MemoryBuffer<uint8> temp_buffer_;
diff --git a/o3d/import/cross/file_output_stream_processor.cc b/o3d/import/cross/file_output_stream_processor.cc
index 3983faa..eab955b 100644
--- a/o3d/import/cross/file_output_stream_processor.cc
+++ b/o3d/import/cross/file_output_stream_processor.cc
@@ -41,12 +41,20 @@ FileOutputStreamProcessor::FileOutputStreamProcessor(FILE* file)
DCHECK(file != NULL);
}
-int FileOutputStreamProcessor::ProcessBytes(MemoryReadStream *stream,
- size_t bytes_to_process) {
+StreamProcessor::Status FileOutputStreamProcessor::ProcessBytes(
+ MemoryReadStream *stream,
+ size_t bytes_to_process) {
+ DCHECK(file_ != NULL);
size_t num_written = fwrite(stream->GetDirectMemoryPointer(),
1,
bytes_to_process,
file_);
- return num_written == bytes_to_process ? 0 : -1;
+ return num_written == bytes_to_process ? IN_PROGRESS : FAILURE;
}
+
+void FileOutputStreamProcessor::Close(bool success) {
+ fclose(file_);
+ file_ = NULL;
+}
+
} // namespace o3d
diff --git a/o3d/import/cross/file_output_stream_processor.h b/o3d/import/cross/file_output_stream_processor.h
index 3c04562..ff3b24e 100644
--- a/o3d/import/cross/file_output_stream_processor.h
+++ b/o3d/import/cross/file_output_stream_processor.h
@@ -44,8 +44,10 @@ class FileOutputStreamProcessor : public StreamProcessor {
public:
explicit FileOutputStreamProcessor(FILE* file);
- virtual int ProcessBytes(MemoryReadStream *stream,
- size_t bytes_to_process);
+ virtual Status ProcessBytes(MemoryReadStream *stream,
+ size_t bytes_to_process);
+ virtual void Close(bool success);
+
private:
FILE* file_;
DISALLOW_COPY_AND_ASSIGN(FileOutputStreamProcessor);
diff --git a/o3d/import/cross/gz_compressor.cc b/o3d/import/cross/gz_compressor.cc
index e42f2e7..d6a98cf 100644
--- a/o3d/import/cross/gz_compressor.cc
+++ b/o3d/import/cross/gz_compressor.cc
@@ -54,17 +54,19 @@ GzCompressor::GzCompressor(StreamProcessor *callback_client)
strm_.opaque = Z_NULL;
// Store this, so we can later check if it's OK to start processing
- init_result_ = deflateInit2(
+ int result = deflateInit2(
&strm_,
Z_DEFAULT_COMPRESSION,
Z_DEFLATED,
MAX_WBITS + 16, // 16 means write out gzip header/trailer
DEF_MEM_LEVEL,
Z_DEFAULT_STRATEGY);
+
+ initialized_ = result == Z_OK;
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-void GzCompressor::Finalize() {
+void GzCompressor::Close(bool success) {
if (!stream_is_closed_) {
// Flush the compression stream
MemoryReadStream stream(NULL, 0);
@@ -74,39 +76,38 @@ void GzCompressor::Finalize() {
deflateEnd(&strm_);
stream_is_closed_ = true;
}
+
+ callback_client_->Close(success);
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
GzCompressor::~GzCompressor() {
- // Finalize() turns out to be a "nop" if the user has already called it
- Finalize();
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-int GzCompressor::ProcessBytes(MemoryReadStream *stream,
- size_t bytes_to_process) {
+StreamProcessor::Status GzCompressor::ProcessBytes(MemoryReadStream *stream,
+ size_t bytes_to_process) {
// Basic sanity check
if (stream->GetDirectMemoryPointer() == NULL || bytes_to_process == 0) {
- return -1;
+ return FAILURE;
}
return CompressBytes(stream, bytes_to_process, false);
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-int GzCompressor::CompressBytes(MemoryReadStream *stream,
- size_t bytes_to_process,
- bool flush) {
+StreamProcessor::Status GzCompressor::CompressBytes(MemoryReadStream *stream,
+ size_t bytes_to_process,
+ bool flush) {
// Don't even bother trying if we didn't get initialized properly
- if (init_result_ != Z_OK) return init_result_;
+ if (!initialized_) return FAILURE;
uint8 out[kChunkSize];
- int result = Z_OK;
// Don't try to read more than our stream has
size_t remaining = stream->GetRemainingByteCount();
if (bytes_to_process > remaining) {
- return Z_STREAM_ERROR;
+ return FAILURE;
}
// Use direct memory access on the MemoryStream object
@@ -120,30 +121,29 @@ int GzCompressor::CompressBytes(MemoryReadStream *stream,
// We need to flush the stream when we reach the end
int flush_code = flush ? Z_FINISH : Z_NO_FLUSH;
- // Run inflate() on input until output buffer not full
+ // Run deflate() on input until output buffer not full
+ int result;
do {
strm_.avail_out = kChunkSize;
strm_.next_out = out;
result = deflate(&strm_, flush_code);
-
- // error check here - return error codes if necessary
- assert(result != Z_STREAM_ERROR); // state not clobbered
+ if (result == Z_STREAM_ERROR)
+ return FAILURE;
size_t have = kChunkSize - strm_.avail_out;
// Callback with the compressed byte stream
MemoryReadStream decompressed_stream(out, have);
if (have > 0 && callback_client_) {
- int client_result =
- callback_client_->ProcessBytes(&decompressed_stream, have);
- if (client_result != 0) {
- return client_result; // propagate callback errors
+ if (callback_client_->ProcessBytes(&decompressed_stream,
+ have) == FAILURE) {
+ return FAILURE;
}
}
} while (strm_.avail_out == 0);
- return result;
+ return result == Z_OK ? IN_PROGRESS : FAILURE;
}
} // namespace o3d
diff --git a/o3d/import/cross/gz_compressor.h b/o3d/import/cross/gz_compressor.h
index bdf13a1..43dbf0f 100644
--- a/o3d/import/cross/gz_compressor.h
+++ b/o3d/import/cross/gz_compressor.h
@@ -49,18 +49,19 @@ class GzCompressor : public StreamProcessor {
explicit GzCompressor(StreamProcessor *callback_client);
virtual ~GzCompressor();
- virtual int ProcessBytes(MemoryReadStream *stream, size_t bytes_to_process);
+ virtual Status ProcessBytes(MemoryReadStream *stream,
+ size_t bytes_to_process);
// Must call when all bytes to compress have been sent (with ProcessBytes)
- void Finalize();
+ virtual void Close(bool success);
private:
- int CompressBytes(MemoryReadStream *stream,
- size_t bytes_to_process,
- bool flush);
+ Status CompressBytes(MemoryReadStream *stream,
+ size_t bytes_to_process,
+ bool flush);
z_stream strm_; // low-level zlib state
- int init_result_;
+ bool initialized_;
bool stream_is_closed_;
StreamProcessor *callback_client_;
diff --git a/o3d/import/cross/gz_compressor_test.cc b/o3d/import/cross/gz_compressor_test.cc
index 787ab06..4cebb004 100644
--- a/o3d/import/cross/gz_compressor_test.cc
+++ b/o3d/import/cross/gz_compressor_test.cc
@@ -56,10 +56,12 @@ class DecompressorClient : public o3d::StreamProcessor {
public:
explicit DecompressorClient(size_t uncompressed_byte_size)
: buffer_(uncompressed_byte_size),
- write_stream_(buffer_, uncompressed_byte_size) {}
+ write_stream_(buffer_, uncompressed_byte_size),
+ closed_(false),
+ success_(false) {}
- virtual int ProcessBytes(MemoryReadStream *stream,
- size_t bytes_to_process) {
+ virtual Status ProcessBytes(MemoryReadStream *stream,
+ size_t bytes_to_process) {
// Make sure the output stream isn't full yet
EXPECT_TRUE(write_stream_.GetRemainingByteCount() >= bytes_to_process);
@@ -68,7 +70,12 @@ class DecompressorClient : public o3d::StreamProcessor {
stream->Skip(bytes_to_process);
write_stream_.Write(p, bytes_to_process);
- return 0; // return OK
+ return SUCCESS;
+ }
+
+ virtual void Close(bool success) {
+ closed_ = true;
+ success_ = success;
}
void VerifyDecompressedOutput(uint8 *original_data) {
@@ -80,10 +87,20 @@ class DecompressorClient : public o3d::StreamProcessor {
EXPECT_EQ(0, memcmp(original_data, buffer_, buffer_.GetLength()));
}
+ bool closed() const {
+ return closed_;
+ }
+
+ bool success() const {
+ return success_;
+ }
+
private:
size_t uncompressed_byte_size_;
MemoryBuffer<uint8> buffer_;
MemoryWriteStream write_stream_;
+ bool closed_;
+ bool success_;
};
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -92,24 +109,27 @@ class CompressorClient : public o3d::StreamProcessor {
public:
explicit CompressorClient(size_t uncompressed_byte_size)
: decompressor_client_(uncompressed_byte_size),
- decompressor_(&decompressor_client_) {
+ decompressor_(&decompressor_client_),
+ closed_(false),
+ success_(false) {
};
- virtual int ProcessBytes(MemoryReadStream *stream,
+ virtual Status ProcessBytes(MemoryReadStream *stream,
size_t bytes_to_process) {
// We're receiving compressed bytes here, so feed them back into
// the decompressor. Since we're making a compression / decompression
// round trip, we should end up with the same (initial) byte stream
// we can verify this at the end
- int result = decompressor_.ProcessBytes(stream, bytes_to_process);
+ Status status = decompressor_.ProcessBytes(stream, bytes_to_process);
- // Verify the result code:
- // zlib FAQ says Z_BUF_ERROR is OK
- // and may occur in the middle of decompressing the stream
- EXPECT_TRUE(result == Z_OK || result == Z_STREAM_END ||
- result == Z_BUF_ERROR);
+ EXPECT_TRUE(status == IN_PROGRESS || status == SUCCESS);
+
+ return SUCCESS;
+ }
- return 0; // no error
+ virtual void Close(bool success) {
+ closed_ = true;
+ success_ = success;
}
void VerifyDecompressedOutput(uint8 *original_data) {
@@ -117,15 +137,24 @@ class CompressorClient : public o3d::StreamProcessor {
decompressor_client_.VerifyDecompressedOutput(original_data);
}
+ bool closed() const {
+ return closed_;
+ }
+
+ bool success() const {
+ return success_;
+ }
+
private:
DecompressorClient decompressor_client_;
o3d::GzDecompressor decompressor_;
+ bool closed_;
+ bool success_;
};
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-// int main (int argc, const char * argv[]) {
TEST_F(GzCompressorTest, RoundTripCompressionDecompression) {
// We'll compress this file
String filepath = *g_program_path + "/archive_files/keyboard.jpg";
@@ -148,14 +177,27 @@ TEST_F(GzCompressorTest, RoundTripCompressionDecompression) {
// Since we're making a compression / decompression
// round trip, we should end up with the same (initial) byte stream
// we can verify this at the end by calling VerifyDecompressedOutput()
- int result = compressor.ProcessBytes(&input_file_stream, file_length);
- EXPECT_TRUE(result == Z_OK || result == Z_STREAM_END);
+ StreamProcessor::Status status =
+ compressor.ProcessBytes(&input_file_stream, file_length);
+ EXPECT_NE(StreamProcessor::FAILURE, status);
- compressor.Finalize();
+ compressor.Close(true);
compressor_client.VerifyDecompressedOutput(p);
+ EXPECT_TRUE(compressor_client.closed());
+ EXPECT_TRUE(compressor_client.success());
free(p);
}
+// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+TEST_F(GzCompressorTest, PassesFailureThroughToClient) {
+ CompressorClient compressor_client(1000);
+ GzCompressor compressor(&compressor_client);
+ compressor.Close(false);
+
+ EXPECT_TRUE(compressor_client.closed());
+ EXPECT_FALSE(compressor_client.success());
+}
+
} // namespace o3d
diff --git a/o3d/import/cross/gz_decompressor.cc b/o3d/import/cross/gz_decompressor.cc
index 7fa7fee..5499958 100644
--- a/o3d/import/cross/gz_decompressor.cc
+++ b/o3d/import/cross/gz_decompressor.cc
@@ -54,7 +54,7 @@ GzDecompressor::GzDecompressor(StreamProcessor *callback_client)
strm_.next_in = Z_NULL;
// Store this, so we can later check if it's OK to start processing
- init_result_ = inflateInit2(&strm_, 31);
+ initialized_ = inflateInit2(&strm_, 31) == Z_OK;
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -64,10 +64,10 @@ GzDecompressor::~GzDecompressor() {
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
-int GzDecompressor::ProcessBytes(MemoryReadStream *stream,
- size_t bytes_to_process) {
+StreamProcessor::Status GzDecompressor::ProcessBytes(MemoryReadStream *stream,
+ size_t bytes_to_process) {
// Don't even bother trying if we didn't get initialized properly
- if (init_result_ != Z_OK) return init_result_;
+ if (!initialized_) return FAILURE;
uint8 out[kChunkSize];
int result;
@@ -76,7 +76,7 @@ int GzDecompressor::ProcessBytes(MemoryReadStream *stream,
// Don't try to read more than our stream has
size_t remaining = stream->GetRemainingByteCount();
if (bytes_to_process > remaining) {
- return Z_STREAM_ERROR; // could have our own error code, but good enough...
+ return FAILURE;
}
// Use direct memory access on the MemoryStream object
@@ -95,11 +95,9 @@ int GzDecompressor::ProcessBytes(MemoryReadStream *stream,
assert(result != Z_STREAM_ERROR); /* state not clobbered */
switch (result) {
case Z_NEED_DICT:
- result = Z_DATA_ERROR; /* and fall through */
-
case Z_DATA_ERROR:
case Z_MEM_ERROR:
- return result;
+ return FAILURE;
}
have = kChunkSize - strm_.avail_out;
@@ -107,15 +105,26 @@ int GzDecompressor::ProcessBytes(MemoryReadStream *stream,
// Callback with the decompressed byte stream
MemoryReadStream decompressed_stream(out, have);
if (callback_client_) {
- int client_result =
- callback_client_->ProcessBytes(&decompressed_stream, have);
- if (client_result != 0) {
- return client_result; // propagate callback errors
+ if (callback_client_->ProcessBytes(&decompressed_stream,
+ have) == FAILURE) {
+ return FAILURE;
}
}
} while (strm_.avail_out == 0);
- return result;
+ switch (result) {
+ case Z_OK:
+ case Z_BUF_ERROR: // Zlib docs say this is expected.
+ return IN_PROGRESS;
+ case Z_STREAM_END:
+ return SUCCESS;
+ default:
+ return FAILURE;
+ }
+}
+
+void GzDecompressor::Close(bool success) {
+ callback_client_->Close(success);
}
} // namespace o3d
diff --git a/o3d/import/cross/gz_decompressor.h b/o3d/import/cross/gz_decompressor.h
index bd56687..02d2c7a 100644
--- a/o3d/import/cross/gz_decompressor.h
+++ b/o3d/import/cross/gz_decompressor.h
@@ -49,12 +49,13 @@ class GzDecompressor : public StreamProcessor {
explicit GzDecompressor(StreamProcessor *callback_client);
virtual ~GzDecompressor();
- virtual int ProcessBytes(MemoryReadStream *stream,
+ virtual Status ProcessBytes(MemoryReadStream *stream,
size_t bytes_to_process);
+ virtual void Close(bool success);
private:
z_stream strm_; // low-level zlib state
- int init_result_;
+ bool initialized_;
StreamProcessor *callback_client_;
DISALLOW_COPY_AND_ASSIGN(GzDecompressor);
diff --git a/o3d/import/cross/gz_decompressor_test.cc b/o3d/import/cross/gz_decompressor_test.cc
index af45ecd..4bb71d6 100644
--- a/o3d/import/cross/gz_decompressor_test.cc
+++ b/o3d/import/cross/gz_decompressor_test.cc
@@ -51,12 +51,14 @@ class GzDecompressorTest : public testing::Test {
//
class GzTestClient : public StreamProcessor {
public:
- explicit GzTestClient(size_t uncompressed_size) {
+ explicit GzTestClient(size_t uncompressed_size)
+ : closed_(false),
+ success_(false) {
buffer_.Allocate(uncompressed_size);
stream_.Assign(buffer_, uncompressed_size);
}
- virtual int ProcessBytes(MemoryReadStream *input_stream,
+ virtual Status ProcessBytes(MemoryReadStream *input_stream,
size_t bytes_to_process) {
// Buffer the uncompressed bytes we're given
const uint8 *p = input_stream->GetDirectMemoryPointer();
@@ -65,16 +67,31 @@ class GzTestClient : public StreamProcessor {
size_t bytes_written = stream_.Write(p, bytes_to_process);
EXPECT_EQ(bytes_written, bytes_to_process);
- return Z_OK;
+ return SUCCESS;
+ }
+
+ virtual void Close(bool success) {
+ closed_ = true;
+ success_ = success;
}
// When we're done decompressing, we can check the results here
uint8 *GetResultBuffer() { return buffer_; }
size_t GetResultLength() const { return stream_.GetStreamPosition(); }
+ bool closed() const {
+ return closed_;
+ }
+
+ bool success() const {
+ return success_;
+ }
+
private:
MemoryBuffer<uint8> buffer_;
MemoryWriteStream stream_;
+ bool closed_;
+ bool success_;
};
// Loads a tar.gz file, runs it through the processor.
@@ -109,19 +126,20 @@ TEST_F(GzDecompressorTest, LoadGzFile) {
MemoryReadStream compressed_stream(compressed_data, compressed_size);
size_t bytes_to_process = compressed_size;
- int result = Z_OK;
+ StreamProcessor::Status status = StreamProcessor::SUCCESS;
while (bytes_to_process > 0) {
size_t bytes_this_time =
bytes_to_process < kChunkSize ? bytes_to_process : kChunkSize;
- result = decompressor.ProcessBytes(&compressed_stream, bytes_this_time);
- EXPECT_TRUE(result == Z_OK || result == Z_STREAM_END);
+ status = decompressor.ProcessBytes(&compressed_stream, bytes_this_time);
+ EXPECT_TRUE(status != StreamProcessor::FAILURE);
bytes_to_process -= bytes_this_time;
}
- // When the decompressor has finished it should return Z_STREAM_END
- EXPECT_TRUE(result == Z_STREAM_END);
+ decompressor.Close(true);
+
+ EXPECT_TRUE(status == StreamProcessor::SUCCESS);
// Now let's verify that what we just decompressed matches exactly
// what's in the reference file...
@@ -130,13 +148,25 @@ TEST_F(GzDecompressorTest, LoadGzFile) {
EXPECT_EQ(decompressor_client.GetResultLength(), uncompressed_size);
// Now check the data
- result = memcmp(decompressor_client.GetResultBuffer(),
- expected_uncompressed_data,
- uncompressed_size);
+ int result = memcmp(decompressor_client.GetResultBuffer(),
+ expected_uncompressed_data,
+ uncompressed_size);
EXPECT_EQ(0, result);
+ EXPECT_TRUE(decompressor_client.closed());
+ EXPECT_TRUE(decompressor_client.success());
+
free(compressed_data);
free(expected_uncompressed_data);
}
+TEST_F(GzDecompressorTest, PassesFailureThroughToClient) {
+ GzTestClient decompressor_client(1000);
+ GzDecompressor decompressor(&decompressor_client);
+ decompressor.Close(false);
+
+ EXPECT_TRUE(decompressor_client.closed());
+ EXPECT_FALSE(decompressor_client.success());
+}
+
} // namespace o3d
diff --git a/o3d/import/cross/iarchive_generator.h b/o3d/import/cross/iarchive_generator.h
index 4f662da..04f3a46 100644
--- a/o3d/import/cross/iarchive_generator.h
+++ b/o3d/import/cross/iarchive_generator.h
@@ -48,13 +48,16 @@ class IArchiveGenerator {
// Call AddFile() for each file entry, followed by calls to AddFileBytes()
// for the file's data
- virtual void AddFile(const String& file_name,
+ virtual bool AddFile(const String& file_name,
size_t file_size) = 0;
// Call with the file's data (after calling AddFile)
// may be called one time will all the file's data, or multiple times
// until all the data is provided
virtual int AddFileBytes(MemoryReadStream* stream, size_t n) = 0;
+
+ // Must be called to finish the archiving operation.
+ virtual void Close(bool success) = 0;
};
} // namespace o3d
diff --git a/o3d/import/cross/main_thread_archive_callback_client.cc b/o3d/import/cross/main_thread_archive_callback_client.cc
new file mode 100644
index 0000000..2fbbaaa
--- /dev/null
+++ b/o3d/import/cross/main_thread_archive_callback_client.cc
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2009, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// MainThreadArchiveCallbackClient forwards data from one thread to
+// another client that will run on the main thread.
+
+#include "core/cross/imain_thread_task_poster.h"
+#include "core/cross/service_locator.h"
+#include "import/cross/main_thread_archive_callback_client.h"
+#include "base/task.h"
+
+namespace o3d {
+
+// Called on main thread.
+MainThreadArchiveCallbackClient::MainThreadArchiveCallbackClient(
+ ServiceLocator* service_locator,
+ ArchiveCallbackClient *receiver)
+ : main_thread_task_poster_(
+ service_locator->GetService<IMainThreadTaskPoster>()),
+ receiver_(receiver),
+ success_(new Success) {
+ success_->value = true;
+}
+
+// Called on main thread.
+MainThreadArchiveCallbackClient::~MainThreadArchiveCallbackClient() {
+ // Signal pending tasks on main thread not to call the receiver. They are not
+ // running concurrently because this destructor is also called on the main
+ // thread.
+ success_->value = false;
+}
+
+// Called on other thread.
+void MainThreadArchiveCallbackClient::ReceiveFileHeader(
+ const ArchiveFileInfo &file_info) {
+ main_thread_task_poster_->PostTask(NewRunnableFunction(
+ &MainThreadArchiveCallbackClient::ForwardReceiveFileHeader,
+ success_,
+ receiver_,
+ file_info));
+}
+
+// Called on other thread.
+bool MainThreadArchiveCallbackClient::ReceiveFileData(
+ MemoryReadStream *stream, size_t size) {
+ // Copy stream into temporary buffer. Deleted after it is received by the main
+ // thread.
+ uint8* buffer = new uint8[size];
+ stream->Read(buffer, size);
+
+ main_thread_task_poster_->PostTask(NewRunnableFunction(
+ &MainThreadArchiveCallbackClient::ForwardReceiveFileData,
+ success_,
+ receiver_,
+ buffer,
+ size));
+
+ return success_->value;
+}
+
+void MainThreadArchiveCallbackClient::Close(bool success) {
+ main_thread_task_poster_->PostTask(NewRunnableFunction(
+ &MainThreadArchiveCallbackClient::ForwardClose,
+ success_,
+ receiver_,
+ success));
+}
+
+// Called on main thread.
+void MainThreadArchiveCallbackClient::ForwardReceiveFileHeader(
+ SuccessPtr success,
+ ArchiveCallbackClient* client,
+ ArchiveFileInfo file_info) {
+ if (success->value) {
+ client->ReceiveFileHeader(file_info);
+ }
+}
+
+// Called on main thread.
+void MainThreadArchiveCallbackClient::ForwardReceiveFileData(
+ SuccessPtr success,
+ ArchiveCallbackClient* client,
+ uint8* bytes, size_t size) {
+ // Just delete the buffer if there was a previous failure.
+ if (!success->value) {
+ delete[] bytes;
+ return;
+ }
+
+ MemoryReadStream stream(bytes, size);
+ if (!client->ReceiveFileData(&stream, size)) {
+ // Do not set this to true on success. That might overwrite a false written
+ // by the other thread intended to indicate that the receiver is no
+ // longer valid.
+ success->value = false;
+ }
+
+ // Delete temporary buffer allocated by other thread when the main thread is
+ // finished with it.
+ delete[] bytes;
+}
+
+void MainThreadArchiveCallbackClient::ForwardClose(
+ SuccessPtr success_ptr,
+ ArchiveCallbackClient* client,
+ bool success) {
+ client->Close(success && success_ptr->value);
+}
+
+} // namespace o3d
diff --git a/o3d/import/cross/main_thread_archive_callback_client.h b/o3d/import/cross/main_thread_archive_callback_client.h
new file mode 100644
index 0000000..7da39f7
--- /dev/null
+++ b/o3d/import/cross/main_thread_archive_callback_client.h
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// MainThreadArchiveCallbackClient forwards data from one thread to
+// another client that will run on the main thread.
+
+#ifndef O3D_IMPORT_CROSS_MAIN_THREAD_ARCHIVE_CALLBACK_CLIENT_H_
+#define O3D_IMPORT_CROSS_MAIN_THREAD_ARCHIVE_CALLBACK_CLIENT_H_
+
+#include "base/basictypes.h"
+#include "base/thread.h"
+#include "import/cross/archive_processor.h"
+#include "import/cross/memory_stream.h"
+
+namespace o3d {
+
+class IMainThreadTaskPoster;
+class ServiceLocator;
+
+class MainThreadArchiveCallbackClient : public ArchiveCallbackClient {
+ public:
+ MainThreadArchiveCallbackClient(ServiceLocator* service_locator,
+ ArchiveCallbackClient *receiver);
+ virtual ~MainThreadArchiveCallbackClient();
+
+ virtual void ReceiveFileHeader(const ArchiveFileInfo &file_info);
+ virtual bool ReceiveFileData(MemoryReadStream *stream, size_t size);
+ virtual void Close(bool success);
+
+ private:
+ struct Success : ::base::RefCountedThreadSafe< Success > {
+ bool value;
+ };
+
+ typedef scoped_refptr<Success> SuccessPtr;
+
+ static void ForwardReceiveFileHeader(SuccessPtr success,
+ ArchiveCallbackClient* client,
+ ArchiveFileInfo file_info);
+
+ static void ForwardReceiveFileData(SuccessPtr success,
+ ArchiveCallbackClient* client,
+ uint8* bytes, size_t size);
+
+ static void ForwardClose(SuccessPtr success_ptr,
+ ArchiveCallbackClient* client,
+ bool success);
+
+ IMainThreadTaskPoster* main_thread_task_poster_;
+ ArchiveCallbackClient* receiver_;
+ SuccessPtr success_;
+
+ DISALLOW_COPY_AND_ASSIGN(MainThreadArchiveCallbackClient);
+};
+} // namespace o3d
+
+#endif // O3D_IMPORT_CROSS_MAIN_THREAD_ARCHIVE_CALLBACK_CLIENT_H_
diff --git a/o3d/import/cross/main_thread_archive_callback_client_test.cc b/o3d/import/cross/main_thread_archive_callback_client_test.cc
new file mode 100644
index 0000000..ac3243a
--- /dev/null
+++ b/o3d/import/cross/main_thread_archive_callback_client_test.cc
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2009, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+// Tests functionality of the MainThreadArchiveCallbackClient class
+
+#include "tests/common/win/testing_common.h"
+#include "base/task.h"
+#include "core/cross/imain_thread_task_poster.h"
+#include "core/cross/service_implementation.h"
+#include "import/cross/main_thread_archive_callback_client.h"
+
+namespace o3d {
+
+// Simple standin for IMainThreadTaskPoster that invokes tasks asynchronously.
+class TestMainThreadTaskPoster : IMainThreadTaskPoster {
+ public:
+ TestMainThreadTaskPoster(ServiceLocator* service_locator)
+ : service_(service_locator, this) {
+ }
+
+ virtual bool IsSupported() {
+ return true;
+ }
+
+ virtual void PostTask(Task* task) {
+ tasks_.push_back(task);
+ }
+
+ void Flush() {
+ for (size_t i = 0; i < tasks_.size(); ++i) {
+ tasks_[i]->Run();
+ delete tasks_[i];
+ }
+ tasks_.clear();
+ }
+
+ private:
+ ServiceImplementation<IMainThreadTaskPoster> service_;
+ std::vector<Task*> tasks_;
+};
+
+class MockArchiveCallbackClient : public ArchiveCallbackClient {
+ public:
+ MockArchiveCallbackClient()
+ : receive_file_header_calls_(0),
+ receive_file_header_calls_file_info_("", 0),
+ receive_file_data_calls_(0),
+ receive_file_data_stream_(NULL),
+ receive_file_data_size_(0),
+ receive_file_data_result_(false),
+ close_calls_(0),
+ close_success_(false) {
+ }
+
+ virtual void ReceiveFileHeader(const ArchiveFileInfo &file_info) {
+ ++receive_file_header_calls_;
+ receive_file_header_calls_file_info_ = file_info;
+ }
+
+ virtual bool ReceiveFileData(MemoryReadStream* stream, size_t size) {
+ ++receive_file_data_calls_;
+ receive_file_data_stream_.reset(new uint8[size]);
+ stream->Read(receive_file_data_stream_.get(), size);
+ receive_file_data_size_ = size;
+ return receive_file_data_result_;
+ }
+
+ virtual void Close(bool success) {
+ ++close_calls_;
+ close_success_ = success;
+ }
+
+ int receive_file_header_calls_;
+ ArchiveFileInfo receive_file_header_calls_file_info_;
+
+ int receive_file_data_calls_;
+ scoped_array<uint8> receive_file_data_stream_;
+ size_t receive_file_data_size_;
+ bool receive_file_data_result_;
+
+ int close_calls_;
+ bool close_success_;
+};
+
+class MainThreadArchiveCallbackClientTest : public testing::Test {
+ public:
+ MainThreadArchiveCallbackClientTest()
+ : main_thread_task_poster_(&service_locator_) {
+ }
+
+ protected:
+ virtual void SetUp() {
+ receiver_client_ = new MockArchiveCallbackClient;
+ main_thread_client_ = new MainThreadArchiveCallbackClient(
+ &service_locator_, receiver_client_);
+ }
+
+ virtual void TearDown() {
+ delete receiver_client_;
+ delete main_thread_client_;
+ }
+
+ ServiceLocator service_locator_;
+ TestMainThreadTaskPoster main_thread_task_poster_;
+ MockArchiveCallbackClient* receiver_client_;
+ MainThreadArchiveCallbackClient* main_thread_client_;
+};
+
+TEST_F(MainThreadArchiveCallbackClientTest,
+ ReceiveFileHeaderForwardsToReceiver) {
+ ArchiveFileInfo info("hello", 7);
+ main_thread_client_->ReceiveFileHeader(info);
+ main_thread_task_poster_.Flush();
+
+ EXPECT_EQ(1, receiver_client_->receive_file_header_calls_);
+ EXPECT_EQ("hello",
+ receiver_client_->receive_file_header_calls_file_info_.GetFileName());
+ EXPECT_EQ(7,
+ receiver_client_->receive_file_header_calls_file_info_.GetFileSize());
+}
+
+TEST_F(MainThreadArchiveCallbackClientTest,
+ ReceiveFileDataForwardsToReceiverAndReturnsTrueFirstTime) {
+ uint8 buffer[] = {1, 2, 3};
+ MemoryReadStream stream(buffer, sizeof(buffer));
+ receiver_client_->receive_file_data_result_ = true;
+ EXPECT_TRUE(main_thread_client_->ReceiveFileData(&stream, sizeof(buffer)));
+ main_thread_task_poster_.Flush();
+
+ EXPECT_EQ(1, receiver_client_->receive_file_data_calls_);
+ EXPECT_EQ(0, memcmp(receiver_client_->receive_file_data_stream_.get(),
+ buffer, sizeof(buffer)));
+ EXPECT_EQ(sizeof(buffer), receiver_client_->receive_file_data_size_);
+}
+
+TEST_F(MainThreadArchiveCallbackClientTest,
+ ReceiveFileDataReportsFailureInSubsequentCall) {
+ receiver_client_->receive_file_data_result_ = true;
+
+ uint8 buffer[] = {1, 2, 3, 4, 5, 6};
+ MemoryReadStream stream(buffer, sizeof(buffer));
+
+ receiver_client_->receive_file_data_result_ = false;
+ EXPECT_TRUE(main_thread_client_->ReceiveFileData(&stream, 3));
+ main_thread_task_poster_.Flush();
+ EXPECT_EQ(1, receiver_client_->receive_file_data_calls_);
+
+
+ receiver_client_->receive_file_data_result_ = false;
+ EXPECT_FALSE(main_thread_client_->ReceiveFileData(&stream, 3));
+ main_thread_task_poster_.Flush();
+ EXPECT_EQ(1, receiver_client_->receive_file_data_calls_);
+}
+
+TEST_F(MainThreadArchiveCallbackClientTest,
+ ReceiveFileHeaderForwardsUnsuccessfulClose) {
+ main_thread_client_->Close(false);
+ main_thread_task_poster_.Flush();
+
+ EXPECT_EQ(1, receiver_client_->close_calls_);
+ EXPECT_FALSE(receiver_client_->close_success_);
+}
+
+TEST_F(MainThreadArchiveCallbackClientTest,
+ ReceiveFileHeaderForwardsSuccessfulClose) {
+ main_thread_client_->Close(true);
+ main_thread_task_poster_.Flush();
+
+ EXPECT_EQ(1, receiver_client_->close_calls_);
+ EXPECT_TRUE(receiver_client_->close_success_);
+}
+
+TEST_F(MainThreadArchiveCallbackClientTest,
+ CloseForwardsFailureOfPreviousUnreportedCall) {
+ receiver_client_->receive_file_data_result_ = true;
+
+ uint8 buffer[] = {1, 2, 3};
+ MemoryReadStream stream(buffer, sizeof(buffer));
+
+ receiver_client_->receive_file_data_result_ = false;
+ EXPECT_TRUE(main_thread_client_->ReceiveFileData(&stream, sizeof(buffer)));
+ main_thread_task_poster_.Flush();
+ EXPECT_EQ(1, receiver_client_->receive_file_data_calls_);
+
+ main_thread_client_->Close(true);
+ main_thread_task_poster_.Flush();
+ EXPECT_FALSE(receiver_client_->close_success_);
+}
+
+} // namespace o3d
diff --git a/o3d/import/cross/memory_stream.h b/o3d/import/cross/memory_stream.h
index e71379a..2801b2b 100644
--- a/o3d/import/cross/memory_stream.h
+++ b/o3d/import/cross/memory_stream.h
@@ -266,9 +266,21 @@ class MemoryWriteStream {
// Abstract interface to process a memory stream
class StreamProcessor {
public:
+
+// Status is defined in Linux
+#ifdef Status
+#undef Status
+#endif
+ enum Status {
+ IN_PROGRESS,
+ SUCCESS,
+ FAILURE,
+ };
+
virtual ~StreamProcessor() {}
- virtual int ProcessBytes(MemoryReadStream *stream,
- size_t bytes_to_process) = 0;
+ virtual Status ProcessBytes(MemoryReadStream *stream,
+ size_t bytes_to_process) = 0;
+ virtual void Close(bool success) = 0;
};
} // namespace o3d
diff --git a/o3d/import/cross/tar_generator.cc b/o3d/import/cross/tar_generator.cc
index ed72e05..cbfab24 100644
--- a/o3d/import/cross/tar_generator.cc
+++ b/o3d/import/cross/tar_generator.cc
@@ -234,8 +234,9 @@ void TarGenerator::FlushDataBuffer(bool flush_padding_zeroes) {
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-void TarGenerator::Finalize() {
+void TarGenerator::Close(bool success) {
FlushDataBuffer(true);
+ callback_client_->Close(success);
}
} // namespace o3d
diff --git a/o3d/import/cross/tar_generator.h b/o3d/import/cross/tar_generator.h
index 21bc733..7bb0ac4 100644
--- a/o3d/import/cross/tar_generator.h
+++ b/o3d/import/cross/tar_generator.h
@@ -50,21 +50,20 @@
#include <string>
#include "base/basictypes.h"
#include "core/cross/types.h"
+#include "import/cross/iarchive_generator.h"
#include "import/cross/memory_buffer.h"
#include "import/cross/memory_stream.h"
namespace o3d {
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-class TarGenerator {
+class TarGenerator : public IArchiveGenerator {
public:
explicit TarGenerator(StreamProcessor *callback_client)
: callback_client_(callback_client),
data_block_buffer_(TAR_BLOCK_SIZE), // initialized to zeroes
data_buffer_stream_(data_block_buffer_, TAR_BLOCK_SIZE) {}
- virtual ~TarGenerator() { Finalize(); }
-
// Call AddFile() for each file entry, followed by calls to AddFileBytes()
// for the file's data. Returns true on success.
virtual bool AddFile(const String &file_name, size_t file_size);
@@ -75,7 +74,7 @@ class TarGenerator {
virtual int AddFileBytes(MemoryReadStream *stream, size_t n);
// Must call this after all files and file data have been written
- virtual void Finalize();
+ virtual void Close(bool success);
private:
// Returns true on success.
diff --git a/o3d/import/cross/tar_generator_test.cc b/o3d/import/cross/tar_generator_test.cc
index 9403d62..0e6dccb 100644
--- a/o3d/import/cross/tar_generator_test.cc
+++ b/o3d/import/cross/tar_generator_test.cc
@@ -132,15 +132,30 @@ class CallbackClient : public StreamProcessor {
: state_(VALIDATE_DIRECTORY_HEADER1),
total_bytes_received_(0),
memory_block_(kBlockSize),
- write_stream_(memory_block_, kBlockSize) {
+ write_stream_(memory_block_, kBlockSize),
+ closed_(false),
+ success_(false) {
}
- virtual int ProcessBytes(MemoryReadStream *stream,
+ virtual Status ProcessBytes(MemoryReadStream *stream,
size_t bytes_to_process);
+ virtual void Close(bool success) {
+ closed_ = true;
+ success_ = success;
+ }
+
size_t GetTotalBytesReceived() { return total_bytes_received_; }
int GetState() { return state_; }
+ bool closed() const {
+ return closed_;
+ }
+
+ bool success() const {
+ return success_;
+ }
+
private:
bool IsOctalDigit(uint8 c);
bool IsOctalString(uint8 *p);
@@ -159,11 +174,13 @@ class CallbackClient : public StreamProcessor {
size_t total_bytes_received_;
MemoryBuffer<uint8> memory_block_;
MemoryWriteStream write_stream_;
+ bool closed_;
+ bool success_;
};
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-int CallbackClient::ProcessBytes(MemoryReadStream *stream,
- size_t bytes_to_process) {
+StreamProcessor::Status CallbackClient::ProcessBytes(MemoryReadStream *stream,
+ size_t bytes_to_process) {
total_bytes_received_ += bytes_to_process;
while (bytes_to_process > 0) {
@@ -254,7 +271,7 @@ int CallbackClient::ProcessBytes(MemoryReadStream *stream,
bytes_to_process -= bytes_this_time;
}
- return 0;
+ return IN_PROGRESS;
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -403,7 +420,7 @@ TEST_F(TarGeneratorTest, CreateSimpleArchive) {
kFileLength3);
generator.AddFileBytes(&file4_stream, kFileLength3);
- generator.Finalize();
+ generator.Close(true);
// Verify that the tar byte stream produced is exactly divisible by
// the block size
@@ -412,6 +429,18 @@ TEST_F(TarGeneratorTest, CreateSimpleArchive) {
// Make sure the state machine is in the expected state
EXPECT_EQ(CallbackClient::FINISHED, client.GetState());
+
+ EXPECT_TRUE(client.closed());
+ EXPECT_TRUE(client.success());
+}
+
+TEST_F(TarGeneratorTest, PassesThroughFailure) {
+ CallbackClient client;
+ TarGenerator generator(&client);
+ generator.Close(false);
+
+ EXPECT_TRUE(client.closed());
+ EXPECT_FALSE(client.success());
}
} // namespace
diff --git a/o3d/import/cross/tar_processor.cc b/o3d/import/cross/tar_processor.cc
index 7b14c22..b4c268e 100644
--- a/o3d/import/cross/tar_processor.cc
+++ b/o3d/import/cross/tar_processor.cc
@@ -41,7 +41,8 @@ static const int kFileSizeOffset = 124;
static const int kLinkFlagOffset = 156;
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-int TarProcessor::ProcessBytes(MemoryReadStream *stream, size_t n) {
+StreamProcessor::Status TarProcessor::ProcessBytes(MemoryReadStream *stream,
+ size_t n) {
// Keep processing the byte-stream until we've consumed all we're given
//
size_t bytes_to_consume = n;
@@ -56,7 +57,7 @@ int TarProcessor::ProcessBytes(MemoryReadStream *stream, size_t n) {
stream->Read(reinterpret_cast<uint8*>(header_ + header_bytes_read_),
bytes_to_read);
if (bytes_read != bytes_to_read) {
- return -1;
+ return FAILURE;
}
header_bytes_read_ += bytes_to_read;
@@ -73,7 +74,7 @@ int TarProcessor::ProcessBytes(MemoryReadStream *stream, size_t n) {
getting_filename_ = true;
// We should pick some size that's too large.
if (file_size > 1024) {
- return -1;
+ return FAILURE;
}
} else {
getting_filename_ = false;
@@ -92,10 +93,10 @@ int TarProcessor::ProcessBytes(MemoryReadStream *stream, size_t n) {
ArchiveFileInfo info(filename, file_size);
callback_client_->ReceiveFileHeader(info);
} else if (header_[0] == 0) {
- // If filename is NULL due to zero-padding then file size
- // should also be NULL
- // TODO(gman): Won't this crash the plugin if I make a bad tar?
- assert(file_size == 0);
+ // If filename is empty due to zero-padding then file size
+ // should also be zero.
+ if (file_size != 0)
+ return FAILURE;
}
}
@@ -134,7 +135,7 @@ int TarProcessor::ProcessBytes(MemoryReadStream *stream, size_t n) {
} else {
if (!callback_client_->ReceiveFileData(&client_read_stream,
client_bytes_this_time)) {
- return -1;
+ return FAILURE;
}
}
@@ -166,7 +167,11 @@ int TarProcessor::ProcessBytes(MemoryReadStream *stream, size_t n) {
}
}
- return 0;
+ return IN_PROGRESS;
+}
+
+void TarProcessor::Close(bool success) {
+ callback_client_->Close(success);
}
} // namespace o3d
diff --git a/o3d/import/cross/tar_processor.h b/o3d/import/cross/tar_processor.h
index 76976c3..66d4b4aa 100644
--- a/o3d/import/cross/tar_processor.h
+++ b/o3d/import/cross/tar_processor.h
@@ -66,7 +66,9 @@ class TarProcessor : public StreamProcessor {
// Call to "push" bytes to be processed - the appropriate callback will get
// called when we have enough data
- virtual int ProcessBytes(MemoryReadStream *stream, size_t n);
+ virtual Status ProcessBytes(MemoryReadStream *stream, size_t n);
+
+ virtual void Close(bool success);
private:
enum {TAR_HEADER_SIZE = 512};
diff --git a/o3d/import/cross/tar_processor_test.cc b/o3d/import/cross/tar_processor_test.cc
index 1317042..77a2522 100644
--- a/o3d/import/cross/tar_processor_test.cc
+++ b/o3d/import/cross/tar_processor_test.cc
@@ -67,12 +67,27 @@ class TarTestClient : public ArchiveCallbackClient {
virtual void ReceiveFileHeader(const ArchiveFileInfo &file_info);
virtual bool ReceiveFileData(MemoryReadStream *stream, size_t nbytes);
+ virtual void Close(bool success) {
+ closed_ = true;
+ success_ = success;
+ }
+
int GetFileCount() const { return file_count_; }
size_t GetNumTotalBytesReceived() const { return index_; }
+ bool closed() const {
+ return closed_;
+ }
+
+ bool success() const {
+ return success_;
+ }
+
private:
int file_count_;
int index_;
+ bool closed_;
+ bool success_;
};
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -142,18 +157,31 @@ TEST_F(TarProcessorTest, LoadTarFile) {
MemoryReadStream tar_stream(tar_data, file_size);
size_t bytes_to_process = file_size;
- int result = Z_OK;
while (bytes_to_process > 0) {
size_t bytes_this_time =
bytes_to_process < kChunkSize ? bytes_to_process : kChunkSize;
- result = tar_processor.ProcessBytes(&tar_stream, bytes_this_time);
- EXPECT_TRUE(result == Z_OK);
+ StreamProcessor::Status status = tar_processor.ProcessBytes(
+ &tar_stream, bytes_this_time);
+ EXPECT_TRUE(status != StreamProcessor::FAILURE);
bytes_to_process -= bytes_this_time;
}
+ tar_processor.Close(true);
+
+ EXPECT_TRUE(callback_client.closed());
+ EXPECT_TRUE(callback_client.success());
free(tar_data);
}
+TEST_F(TarProcessorTest, PassesThroughFailure) {
+ TarTestClient callback_client;
+ TarProcessor tar_processor(&callback_client);
+ tar_processor.Close(false);
+
+ EXPECT_TRUE(callback_client.closed());
+ EXPECT_FALSE(callback_client.success());
+}
+
} // namespace
diff --git a/o3d/import/cross/targz_generator.cc b/o3d/import/cross/targz_generator.cc
index 02cb5b3..aa75712 100644
--- a/o3d/import/cross/targz_generator.cc
+++ b/o3d/import/cross/targz_generator.cc
@@ -37,12 +37,12 @@ namespace o3d {
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
TarGzGenerator::TarGzGenerator(StreamProcessor *callback_client)
- : gz_compressor_(callback_client), tar_generator_(this) {
+ : gz_compressor_(callback_client), tar_generator_(&gz_compressor_) {
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-void TarGzGenerator::AddFile(const String &file_name, size_t file_size) {
- tar_generator_.AddFile(file_name, file_size);
+bool TarGzGenerator::AddFile(const String &file_name, size_t file_size) {
+ return tar_generator_.AddFile(file_name, file_size);
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -57,16 +57,8 @@ int TarGzGenerator::AddFileBytes(const uint8 *data, size_t n) {
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-void TarGzGenerator::Finalize() {
- tar_generator_.Finalize();
- gz_compressor_.Finalize();
-}
-
-// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-int TarGzGenerator::ProcessBytes(MemoryReadStream *stream,
- size_t bytes_to_process) {
- // pass bytestream from tar generator to the compressor
- return gz_compressor_.ProcessBytes(stream, bytes_to_process);
+void TarGzGenerator::Close(bool success) {
+ tar_generator_.Close(success);
}
} // namespace o3d
diff --git a/o3d/import/cross/targz_generator.h b/o3d/import/cross/targz_generator.h
index fd58c86..b8eb216 100644
--- a/o3d/import/cross/targz_generator.h
+++ b/o3d/import/cross/targz_generator.h
@@ -54,32 +54,26 @@
namespace o3d {
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-class TarGzGenerator : public StreamProcessor, public IArchiveGenerator {
+class TarGzGenerator : public IArchiveGenerator {
public:
// |callback_client| receives the tar.gz byte stream
explicit TarGzGenerator(StreamProcessor *callback_client);
// Call AddFile() for each file entry, followed by calls to AddFileBytes()
// for the file's data
- void AddFile(const String &file_name,
- size_t file_size);
+ virtual bool AddFile(const String &file_name,
+ size_t file_size);
// Call with the file's data (after calling AddFile)
// may be called one time with all the file's data, or multiple times
// until all the data is provided
- int AddFileBytes(MemoryReadStream *stream, size_t n);
+ virtual int AddFileBytes(MemoryReadStream *stream, size_t n);
int AddFileBytes(const uint8 *data, size_t n);
// Must call this after all files and file data have been written
- virtual void Finalize();
+ virtual void Close(bool success);
private:
- // StreamProcessor method:
- // Receives the tar bytestream from the TarGenerator.
- // It then feeds this into the GzCompressor
- virtual int ProcessBytes(MemoryReadStream *stream,
- size_t bytes_to_process);
-
GzCompressor gz_compressor_;
TarGenerator tar_generator_;
diff --git a/o3d/import/cross/targz_generator_test.cc b/o3d/import/cross/targz_generator_test.cc
index 3104890..2e1cf7b 100644
--- a/o3d/import/cross/targz_generator_test.cc
+++ b/o3d/import/cross/targz_generator_test.cc
@@ -54,10 +54,12 @@ class TarGzTestClient : public StreamProcessor {
public:
explicit TarGzTestClient(size_t reference_size)
: compressed_data_(reference_size),
- write_stream_(compressed_data_, reference_size) {
+ write_stream_(compressed_data_, reference_size),
+ closed_(false),
+ success_(false) {
};
- virtual int ProcessBytes(MemoryReadStream *stream,
+ virtual Status ProcessBytes(MemoryReadStream *stream,
size_t bytes_to_process) {
// Simply buffer up the tar.gz bytes
// When we've gotten them all our Validate() method will be called
@@ -69,7 +71,12 @@ class TarGzTestClient : public StreamProcessor {
write_stream_.Write(p, bytes_to_process);
- return 0;
+ return SUCCESS;
+ }
+
+ virtual void Close(bool success) {
+ closed_ = true;
+ success_ = success;
}
// Checks that the data from the reference tar.gz file matches the tar.gz
@@ -95,9 +102,19 @@ class TarGzTestClient : public StreamProcessor {
}
#endif
+ bool closed() const {
+ return closed_;
+ }
+
+ bool success() const {
+ return success_;
+ }
+
private:
MemoryBuffer<uint8> compressed_data_;
MemoryWriteStream write_stream_;
+ bool closed_;
+ bool success_;
DISALLOW_COPY_AND_ASSIGN(TarGzTestClient);
};
@@ -142,7 +159,7 @@ TEST_F(TarGzGeneratorTest, GenerateTarGz) {
targz_generator.AddFile("test/shaders/BumpReflect.fx", shader_size);
targz_generator.AddFileBytes(shader_data, shader_size);
- targz_generator.Finalize();
+ targz_generator.Close(true);
#if defined(GENERATE_GOLDEN)
std::string new_golden_file = *g_program_path +
@@ -163,10 +180,21 @@ TEST_F(TarGzGeneratorTest, GenerateTarGz) {
received_data,
test_client.compressed_data_length()));
+ EXPECT_TRUE(test_client.closed());
+ EXPECT_TRUE(test_client.success());
+
free(targz_data);
free(image_data);
free(audio_data);
free(shader_data);
}
+TEST_F(TarGzGeneratorTest, PassesThroughCloseFailure) {
+ TarGzTestClient test_client(1000);
+ TarGzGenerator targz_generator(&test_client);
+ targz_generator.Close(false);
+
+ EXPECT_TRUE(test_client.closed());
+ EXPECT_FALSE(test_client.success());
+}
} // namespace o3d
diff --git a/o3d/import/cross/targz_processor.cc b/o3d/import/cross/targz_processor.cc
index a2a4ba8..5fb1275 100644
--- a/o3d/import/cross/targz_processor.cc
+++ b/o3d/import/cross/targz_processor.cc
@@ -57,10 +57,13 @@ TarGzProcessor::TarGzProcessor(ArchiveCallbackClient *callback_client)
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
-int TarGzProcessor::ProcessCompressedBytes(MemoryReadStream *stream,
- size_t bytes_to_process) {
- int result = gz_decompressor_.ProcessBytes(stream, bytes_to_process);
- return result;
+StreamProcessor::Status TarGzProcessor::ProcessBytes(MemoryReadStream *stream,
+ size_t bytes_to_process) {
+ return gz_decompressor_.ProcessBytes(stream, bytes_to_process);
+}
+
+void TarGzProcessor::Close(bool success) {
+ gz_decompressor_.Close(success);
}
} // namespace o3d
diff --git a/o3d/import/cross/targz_processor.h b/o3d/import/cross/targz_processor.h
index ef83278..061d66e 100644
--- a/o3d/import/cross/targz_processor.h
+++ b/o3d/import/cross/targz_processor.h
@@ -46,12 +46,14 @@
namespace o3d {
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-class TarGzProcessor : public ArchiveProcessor {
+class TarGzProcessor : public ArchiveProcessor {
public:
explicit TarGzProcessor(ArchiveCallbackClient *callback_client);
- virtual int ProcessCompressedBytes(MemoryReadStream *stream,
- size_t bytes_to_process);
+ virtual Status ProcessBytes(MemoryReadStream *stream,
+ size_t bytes_to_process);
+
+ virtual void Close(bool success);
private:
TarProcessor tar_processor_;
diff --git a/o3d/import/cross/targz_processor_test.cc b/o3d/import/cross/targz_processor_test.cc
index 5fc5914..a351ec8 100644
--- a/o3d/import/cross/targz_processor_test.cc
+++ b/o3d/import/cross/targz_processor_test.cc
@@ -56,17 +56,37 @@ const char *kConcatenatedContents =
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class ArchiveTestClient : public ArchiveCallbackClient {
public:
- explicit ArchiveTestClient() : file_count_(0), index_(0) {}
+ explicit ArchiveTestClient()
+ : file_count_(0),
+ index_(0),
+ closed_(false),
+ success_(false) {}
+
// ArchiveCallbackClient methods
virtual void ReceiveFileHeader(const ArchiveFileInfo &file_info);
virtual bool ReceiveFileData(MemoryReadStream *stream, size_t nbytes);
+ virtual void Close(bool success) {
+ closed_ = true;
+ success_ = success;
+ }
+
int GetFileCount() const { return file_count_; }
size_t GetNumTotalBytesReceived() const { return index_; }
+ bool closed() const {
+ return closed_;
+ }
+
+ bool success() const {
+ return success_;
+ }
+
private:
int file_count_;
int index_;
+ bool closed_;
+ bool success_;
};
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -119,12 +139,17 @@ TEST_F(TarGzProcessorTest, LoadTarGzFile) {
ArchiveTestClient test_callback_client;
TarGzProcessor processor(&test_callback_client);
- int result = processor.ProcessFile(filepath.c_str());
- EXPECT_EQ(Z_OK, result);
+ StreamProcessor::Status status = processor.ProcessFile(filepath.c_str());
+ processor.Close(true);
+
+ EXPECT_EQ(StreamProcessor::SUCCESS, status);
EXPECT_EQ(3, test_callback_client.GetFileCount());
EXPECT_EQ(strlen(kConcatenatedContents),
test_callback_client.GetNumTotalBytesReceived());
+
+ EXPECT_TRUE(test_callback_client.closed());
+ EXPECT_TRUE(test_callback_client.success());
}
// Tries to load something with a tar.gz extension, but which isn't
@@ -136,8 +161,12 @@ TEST_F(TarGzProcessorTest, LoadBogusTarGzFile) {
ArchiveTestClient test_callback_client;
TarGzProcessor processor(&test_callback_client);
- int result = processor.ProcessFile(filepath.c_str());
- EXPECT_TRUE(result != Z_OK);
-}
+ StreamProcessor::Status status = processor.ProcessFile(filepath.c_str());
+ processor.Close(false);
+
+ EXPECT_EQ(StreamProcessor::FAILURE, status);
+ EXPECT_TRUE(test_callback_client.closed());
+ EXPECT_FALSE(test_callback_client.success());
+}
} // namespace
diff --git a/o3d/import/cross/threaded_stream_processor.cc b/o3d/import/cross/threaded_stream_processor.cc
new file mode 100644
index 0000000..c389c54
--- /dev/null
+++ b/o3d/import/cross/threaded_stream_processor.cc
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2009, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// ThreadedStreamProcessor forwards data from one thread to
+// another processor that will run on a new thread owned by the
+// ThreadedStreamProcessor.
+
+#include "import/cross/threaded_stream_processor.h"
+#include "base/task.h"
+
+namespace o3d {
+
+ThreadedStreamProcessor::ThreadedStreamProcessor(StreamProcessor *receiver)
+ : receiver_(receiver),
+ thread_("ThreadedStreamProcessor"),
+ status_(IN_PROGRESS) {
+}
+
+ThreadedStreamProcessor::~ThreadedStreamProcessor() {
+ // Wait for the other thread to stop so it does not access this object after
+ // it has been destroyed.
+ StopThread();
+}
+
+void ThreadedStreamProcessor::StartThread() {
+ if (!thread_.IsRunning()) {
+ thread_.Start();
+ }
+}
+
+void ThreadedStreamProcessor::StopThread() {
+ if (thread_.IsRunning()) {
+ thread_.Stop();
+ }
+}
+
+StreamProcessor::Status ThreadedStreamProcessor::ProcessBytes(
+ MemoryReadStream *stream,
+ size_t bytes_to_process) {
+ // Report any error on the other thread.
+ if (status_ == FAILURE) {
+ return status_;
+ }
+
+ StartThread();
+
+ // Copy the bytes. They are deleted by the other thread when it has processed
+ // them.
+ uint8* copy = new uint8[bytes_to_process];
+ stream->Read(copy, bytes_to_process);
+
+ // Post a task to call ForwardBytes on the other thread.
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableFunction(
+ &ThreadedStreamProcessor::ForwardBytes, this, copy, bytes_to_process));
+
+ return IN_PROGRESS;
+}
+
+void ThreadedStreamProcessor::Close(bool success) {
+ StartThread();
+
+ // Post a task to call ForwardClose on the other thread.
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableFunction(
+ &ThreadedStreamProcessor::ForwardClose, this, success));
+}
+
+void ThreadedStreamProcessor::ForwardBytes(
+ ThreadedStreamProcessor* processor, const uint8* bytes, size_t size) {
+ // Do not forward if an error has ocurred. Just delete the buffer.
+ if (processor->status_ == FAILURE) {
+ delete[] bytes;
+ return;
+ }
+
+ // Pass bytes to the receiver.
+ MemoryReadStream stream(bytes, size);
+ processor->status_ = processor->receiver_->ProcessBytes(&stream, size);
+
+ // Delete the buffer once the receiver is finished with them.
+ delete[] bytes;
+}
+
+void ThreadedStreamProcessor::ForwardClose(ThreadedStreamProcessor* processor,
+ bool success) {
+ processor->receiver_->Close(success && (processor->status_ != FAILURE));
+}
+
+} // namespace o3d
diff --git a/o3d/import/cross/threaded_stream_processor.h b/o3d/import/cross/threaded_stream_processor.h
new file mode 100644
index 0000000..f040eb4
--- /dev/null
+++ b/o3d/import/cross/threaded_stream_processor.h
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2009, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// ThreadedStreamProcessor forwards data from one thread to
+// another processor that will run on a new thread owned by the
+// ThreadedStreamProcessor.
+
+#ifndef O3D_IMPORT_CROSS_THREADED_STREAM_PROCESSOR_H_
+#define O3D_IMPORT_CROSS_THREADED_STREAM_PROCESSOR_H_
+
+#include "base/basictypes.h"
+#include "base/thread.h"
+#include "import/cross/memory_stream.h"
+
+namespace o3d {
+
+class ThreadedStreamProcessor : public StreamProcessor {
+ public:
+ explicit ThreadedStreamProcessor(StreamProcessor *receiver);
+ virtual ~ThreadedStreamProcessor();
+
+ virtual Status ProcessBytes(MemoryReadStream *stream,
+ size_t bytes_to_process);
+
+ virtual void Close(bool success);
+
+ void StartThread();
+ void StopThread();
+
+ private:
+ static void ForwardBytes(ThreadedStreamProcessor* processor,
+ const uint8* data, size_t size);
+
+ static void ForwardClose(ThreadedStreamProcessor* processor, bool success);
+
+ StreamProcessor* receiver_;
+ ::base::Thread thread_;
+ Status status_;
+
+ DISALLOW_COPY_AND_ASSIGN(ThreadedStreamProcessor);
+};
+} // namespace o3d
+
+#endif // O3D_IMPORT_CROSS_THREADED_STREAM_PROCESSOR_H_
diff --git a/o3d/import/cross/threaded_stream_processor_test.cc b/o3d/import/cross/threaded_stream_processor_test.cc
new file mode 100644
index 0000000..44698ad3
--- /dev/null
+++ b/o3d/import/cross/threaded_stream_processor_test.cc
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2009, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "import/cross/memory_buffer.h"
+#include "import/cross/threaded_stream_processor.h"
+#include "tests/common/win/testing_common.h"
+#include "tests/common/cross/test_utils.h"
+
+using test_utils::ReadFile;
+
+namespace o3d {
+
+class ThreadedStreamProcessorTest : public testing::Test {
+ protected:
+};
+
+class TestReceiver : public StreamProcessor {
+ public:
+ explicit TestReceiver(size_t size) : closed_(false), success_(false) {
+ buffer_.Allocate(size);
+ stream_.Assign(buffer_, size);
+ }
+
+ virtual Status ProcessBytes(MemoryReadStream *input_stream,
+ size_t bytes_to_process) {
+ const uint8 *p = input_stream->GetDirectMemoryPointer();
+ input_stream->Skip(bytes_to_process);
+
+ size_t bytes_written = stream_.Write(p, bytes_to_process);
+ EXPECT_EQ(bytes_written, bytes_to_process);
+
+ return SUCCESS;
+ }
+
+ virtual void Close(bool success) {
+ closed_ = true;
+ success_ = success;
+ }
+
+ uint8 *GetResultBuffer() { return buffer_; }
+ size_t GetResultLength() const { return stream_.GetStreamPosition(); }
+
+ bool closed() const {
+ return closed_;
+ }
+
+ bool success() const {
+ return success_;
+ }
+
+ private:
+ MemoryBuffer<uint8> buffer_;
+ MemoryWriteStream stream_;
+ bool closed_;
+ bool success_;
+};
+
+TEST_F(ThreadedStreamProcessorTest, CanForwardZeroBuffers) {
+ TestReceiver receiver(1000);
+ ThreadedStreamProcessor processor(&receiver);
+ processor.Close(true);
+ processor.StopThread();
+
+ EXPECT_EQ(0, receiver.GetResultLength());
+ EXPECT_TRUE(receiver.closed());
+ EXPECT_TRUE(receiver.success());
+}
+
+TEST_F(ThreadedStreamProcessorTest, CanForwardOneBufferOfZeroBytes) {
+ uint8 buffer[5] = { 1, 2, 3, 4, 5 };
+ TestReceiver receiver(1000);
+
+ ThreadedStreamProcessor processor(&receiver);
+ MemoryReadStream stream(buffer, sizeof(buffer));
+ processor.ProcessBytes(&stream, 0);
+ processor.Close(true);
+ processor.StopThread();
+
+ EXPECT_EQ(0, receiver.GetResultLength());
+ EXPECT_TRUE(receiver.closed());
+ EXPECT_TRUE(receiver.success());
+}
+
+TEST_F(ThreadedStreamProcessorTest, CanForwardOneBufferFullOfBytes) {
+ uint8 buffer[5] = { 1, 2, 3, 4, 5 };
+ TestReceiver receiver(1000);
+
+ ThreadedStreamProcessor processor(&receiver);
+ MemoryReadStream stream(buffer, sizeof(buffer));
+ processor.ProcessBytes(&stream, sizeof(buffer));
+ processor.Close(true);
+ processor.StopThread();
+
+ EXPECT_EQ(sizeof(buffer), receiver.GetResultLength());
+ EXPECT_EQ(0, memcmp(buffer, receiver.GetResultBuffer(), sizeof(buffer)));
+ EXPECT_TRUE(receiver.closed());
+ EXPECT_TRUE(receiver.success());
+}
+
+TEST_F(ThreadedStreamProcessorTest, CanForwardTwoBuffersFullOfBytes) {
+ uint8 buffer[5] = { 1, 2, 3, 4, 5 };
+ TestReceiver receiver(1000);
+
+ ThreadedStreamProcessor processor(&receiver);
+ MemoryReadStream stream(buffer, sizeof(buffer));
+ processor.ProcessBytes(&stream, 2);
+ processor.ProcessBytes(&stream, 3);
+ processor.Close(true);
+ processor.StopThread();
+
+ EXPECT_EQ(sizeof(buffer), receiver.GetResultLength());
+ EXPECT_EQ(0, memcmp(buffer, receiver.GetResultBuffer(), sizeof(buffer)));
+ EXPECT_TRUE(receiver.closed());
+ EXPECT_TRUE(receiver.success());
+}
+
+TEST_F(ThreadedStreamProcessorTest, PassesThroughStreamFailed) {
+ TestReceiver receiver(1000);
+ ThreadedStreamProcessor processor(&receiver);
+ processor.Close(false);
+ processor.StopThread();
+
+ EXPECT_TRUE(receiver.closed());
+ EXPECT_FALSE(receiver.success());
+}
+
+} // namespace o3d