diff options
author | apatrick@google.com <apatrick@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-08-03 19:49:35 +0000 |
---|---|---|
committer | apatrick@google.com <apatrick@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-08-03 19:49:35 +0000 |
commit | 91240c947358b33cc19c0923a06d116ab36737dd (patch) | |
tree | 7897c99ad9b41d7735c7a42e0ea75fe6f76760b3 /o3d/import | |
parent | 2e25a9f4152a90a179ad37206bff79e1198379d7 (diff) | |
download | chromium_src-91240c947358b33cc19c0923a06d116ab36737dd.zip chromium_src-91240c947358b33cc19c0923a06d116ab36737dd.tar.gz chromium_src-91240c947358b33cc19c0923a06d116ab36737dd.tar.bz2 |
Asynchronous tick now uses NPN_PluginAsyncCall.URL streaming callbacks are now also asynchronous.Implemented NPN_PluginAsyncCall for IE.Allowed WM_PAINT handler to be reentered because it no longer calls into the browser (except to schedule an asynchronous tick if none is pending).Fixed a bug where the EventManager would crash if an event callback called cleanUp on the client.Cleanup destroys all the packs. Doing this in NPP_Destroy seems to make Chrome timeout and fail to load the next page.Tar and GZ decoding happens on a new thread.
Review URL: http://codereview.chromium.org/155733
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@22305 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'o3d/import')
34 files changed, 1228 insertions, 250 deletions
diff --git a/o3d/import/archive.gyp b/o3d/import/archive.gyp index 73e12fb..b2e2fc6 100644 --- a/o3d/import/archive.gyp +++ b/o3d/import/archive.gyp @@ -37,6 +37,8 @@ 'cross/memory_buffer.h', 'cross/memory_stream.cc', 'cross/memory_stream.h', + 'cross/main_thread_archive_callback_client.cc', + 'cross/main_thread_archive_callback_client.h', 'cross/raw_data.cc', 'cross/raw_data.h', 'cross/tar_processor.cc', @@ -44,6 +46,8 @@ 'cross/targz_generator.h', 'cross/targz_processor.cc', 'cross/targz_processor.h', + 'cross/threaded_stream_processor.cc', + 'cross/threaded_stream_processor.h', ], }, { @@ -61,6 +65,7 @@ 'cross/raw_data_test.cc', 'cross/tar_processor_test.cc', 'cross/targz_processor_test.cc', + 'thread_stream_processor_test.cc', ], }, 'copies': [ diff --git a/o3d/import/build.scons b/o3d/import/build.scons index c0789032..8aa5a88 100644 --- a/o3d/import/build.scons +++ b/o3d/import/build.scons @@ -84,9 +84,11 @@ archive_inputs = [ 'cross/archive_request.cc', 'cross/gz_decompressor.cc', 'cross/memory_stream.cc', + 'cross/main_thread_archive_callback_client.cc', 'cross/raw_data.cc', 'cross/tar_processor.cc', 'cross/targz_processor.cc', + 'cross/threaded_stream_processor.cc', ] conditioner_inputs = ['cross/collada_conditioner.cc'] diff --git a/o3d/import/cross/archive_processor.cc b/o3d/import/cross/archive_processor.cc index 8601695..a10af1b 100644 --- a/o3d/import/cross/archive_processor.cc +++ b/o3d/import/cross/archive_processor.cc @@ -41,72 +41,30 @@ const int kChunkSize = 16384; namespace o3d { -#ifdef _DEBUG // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -// For debugging only, report a zlib or i/o error -void zerr(int ret) { - LOG(ERROR) << "ArchiveProcessor: "; - - switch (ret) { - case Z_ERRNO: - if (ferror(stdin)) - LOG(ERROR) << "error reading stdin\n"; - if (ferror(stdout)) - LOG(ERROR) << "error writing stdout\n"; - break; - case Z_STREAM_ERROR: - LOG(ERROR) << "invalid compression level\n"; - break; - case Z_DATA_ERROR: - LOG(ERROR) << "invalid or incomplete deflate data\n"; - break; - case Z_MEM_ERROR: - LOG(ERROR) << "out of memory\n"; - break; - case Z_VERSION_ERROR: - LOG(ERROR) << "zlib version mismatch!\n"; - break; - } -} -#endif // _DEBUG - -// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -int ArchiveProcessor::ProcessEntireStream(MemoryReadStream *stream) { - int result = Z_OK; - bool has_error = false; +StreamProcessor::Status ArchiveProcessor::ProcessEntireStream( + MemoryReadStream *stream) { + Status status; // decompress until deflate stream ends or error do { int remaining = stream->GetRemainingByteCount(); int process_this_time = remaining < kChunkSize ? remaining : kChunkSize; - result = ProcessCompressedBytes(stream, process_this_time); - - has_error = (result != Z_OK && result != Z_STREAM_END); + status = ProcessBytes(stream, process_this_time); + } while (status == IN_PROGRESS); -#ifdef _DEBUG - if (has_error) { - zerr(result); - } -#endif - } while (result != Z_STREAM_END && !has_error); - - if (result == Z_STREAM_END) { - // if we got to the end of stream, then we're good... - result = Z_OK; - } - - return result; + return status; } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -int ArchiveProcessor::ProcessFile(const char *filename) { +StreamProcessor::Status ArchiveProcessor::ProcessFile(const char *filename) { struct stat file_info; int result = stat(filename, &file_info); - if (result != 0) return -1; + if (result != 0) return FAILURE; int file_length = file_info.st_size; - if (file_length == 0) return -1; + if (file_length == 0) return FAILURE; MemoryBuffer<uint8> buffer; buffer.Allocate(file_length); @@ -115,20 +73,13 @@ int ArchiveProcessor::ProcessFile(const char *filename) { // Test by reading in a tar.gz file and sending through the // progressive streaming system FILE *fp = fopen(filename, "rb"); - if (!fp) return -1; // can't open file! + if (!fp) return FAILURE; // can't open file! fread(p, sizeof(uint8), file_length, fp); fclose(fp); MemoryReadStream stream(p, file_length); - result = ProcessEntireStream(&stream); - - if (result == Z_STREAM_END) { - // if we got to the end of stream, then we're good... - result = Z_OK; - } - - return result; + return ProcessEntireStream(&stream); } } // namespace o3d diff --git a/o3d/import/cross/archive_processor.h b/o3d/import/cross/archive_processor.h index 8380c37..d404249 100644 --- a/o3d/import/cross/archive_processor.h +++ b/o3d/import/cross/archive_processor.h @@ -53,8 +53,6 @@ class ArchiveFileInfo { private: std::string filename_; int file_size_; - - DISALLOW_COPY_AND_ASSIGN(ArchiveFileInfo); }; // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -63,10 +61,11 @@ class ArchiveCallbackClient { virtual ~ArchiveCallbackClient() {} virtual void ReceiveFileHeader(const ArchiveFileInfo &file_info) = 0; virtual bool ReceiveFileData(MemoryReadStream *stream, size_t nbytes) = 0; + virtual void Close(bool success) = 0; }; // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -class ArchiveProcessor { +class ArchiveProcessor: public StreamProcessor { public: explicit ArchiveProcessor(ArchiveCallbackClient *callback_client) {} @@ -75,25 +74,16 @@ class ArchiveProcessor { // Call to "push" bytes into the processor. They will be decompressed and // the appropriate callbacks on |callback_client| will happen // as files come in... - // - // Return values (using zlib error codes): - // Z_OK : Processing was successful - but not yet done - // Z_STREAM_END : We're done - archive is completely/successfully processed - // any other value indicates an error condition - // - // Note: even archive formats not based on zlib should use these codes - // (Z_OK, Z_STREAM_END) - // - virtual int ProcessCompressedBytes(MemoryReadStream *stream, - size_t bytes_to_process) = 0; + virtual Status ProcessBytes(MemoryReadStream *stream, + size_t bytes_to_process) = 0; // Decompresses the complete file archive, making file callbacks as the files // come in... - virtual int ProcessFile(const char *filename); + virtual Status ProcessFile(const char *filename); // Decompresses the complete archive from memory, // making file callbacks as the files come in... - virtual int ProcessEntireStream(MemoryReadStream *stream); + virtual Status ProcessEntireStream(MemoryReadStream *stream); protected: DISALLOW_COPY_AND_ASSIGN(ArchiveProcessor); diff --git a/o3d/import/cross/archive_request.cc b/o3d/import/cross/archive_request.cc index 84448e4..c02e1a5 100644 --- a/o3d/import/cross/archive_request.cc +++ b/o3d/import/cross/archive_request.cc @@ -33,8 +33,11 @@ #include "import/cross/archive_request.h" -#include "import/cross/targz_processor.h" #include "core/cross/pack.h" +#include "core/cross/imain_thread_task_poster.h" +#include "import/cross/targz_processor.h" +#include "import/cross/main_thread_archive_callback_client.h" +#include "import/cross/threaded_stream_processor.h" #define DEBUG_ARCHIVE_CALLBACKS 0 @@ -61,12 +64,25 @@ ArchiveRequest::ArchiveRequest(ServiceLocator* service_locator, ready_state_(0), stream_length_(0), bytes_received_(0) { - archive_processor_ = new TarGzProcessor(this); + IMainThreadTaskPoster* main_thread_task_poster = + service_locator->GetService<IMainThreadTaskPoster>(); + if (main_thread_task_poster->IsSupported()) { + main_thread_archive_callback_client_ = new MainThreadArchiveCallbackClient( + service_locator, this); + extra_processor_ = new TarGzProcessor(main_thread_archive_callback_client_); + archive_processor_ = new ThreadedStreamProcessor(extra_processor_); + } else { + main_thread_archive_callback_client_ = NULL; + extra_processor_ = NULL; + archive_processor_ = new TarGzProcessor(this); + } } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ArchiveRequest::~ArchiveRequest() { delete archive_processor_; + delete extra_processor_; + delete main_thread_archive_callback_client_; } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -85,7 +101,7 @@ void ArchiveRequest::NewStreamCallback(DownloadStream *stream) { // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ int32 ArchiveRequest::WriteReadyCallback(DownloadStream *stream) { // Setting this too high causes Firefox to timeout in the Write callback. - return 1024; + return 128 * 1024; } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -99,10 +115,10 @@ int32 ArchiveRequest::WriteCallback(DownloadStream *stream, MemoryReadStream memory_stream(reinterpret_cast<uint8*>(data), length); // Progressively decompress the bytes we've just been given - int result = - archive_processor_->ProcessCompressedBytes(&memory_stream, length); + StreamProcessor::Status status = + archive_processor_->ProcessBytes(&memory_stream, length); - if (result != Z_OK && result != Z_STREAM_END) { + if (status == StreamProcessor::FAILURE) { set_success(false); set_error("Invalid gzipped tar file"); stream->Cancel(); // tell the browser to stop downloading @@ -120,21 +136,7 @@ void ArchiveRequest::FinishedCallback(DownloadStream *stream, bool success, const std::string &filename, const std::string &mime_type) { - set_ready_state(ArchiveRequest::STATE_LOADED); - - // Since the standard codes only go far enough to tell us that the download - // succeeded, we set the success [and implicitly the done] flags to give the - // rest of the story. - set_success(success); - if (!success) { - // I have no idea if an error is already set here but one MUST be set - // so let's check. - if (error().empty()) { - set_error(String("Could not download archive: ") + uri()); - } - } - if (onreadystatechange()) - onreadystatechange()->Run(); + archive_processor_->Close(success); } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -212,7 +214,7 @@ bool ArchiveRequest::ReceiveFileData(MemoryReadStream *input_stream, return false; } } else { - onfileavailable()->Run(); + onfileavailable()->Run(raw_data_); } // If data hasn't been discarded (inside callback) then writes out to @@ -227,4 +229,22 @@ bool ArchiveRequest::ReceiveFileData(MemoryReadStream *input_stream, return true; } +void ArchiveRequest::Close(bool success) { + set_ready_state(ArchiveRequest::STATE_LOADED); + + // Since the standard codes only go far enough to tell us that the download + // succeeded, we set the success [and implicitly the done] flags to give the + // rest of the story. + set_success(success); + if (!success) { + // I have no idea if an error is already set here but one MUST be set + // so let's check. + if (error().empty()) { + set_error(String("Could not download archive: ") + uri()); + } + } + if (onreadystatechange()) + onreadystatechange()->Run(); +} + } // namespace o3d diff --git a/o3d/import/cross/archive_request.h b/o3d/import/cross/archive_request.h index 0a73864..65158bb 100644 --- a/o3d/import/cross/archive_request.h +++ b/o3d/import/cross/archive_request.h @@ -50,7 +50,8 @@ namespace o3d { -typedef Closure ArchiveRequestCallback; +typedef Closure ArchiveReadyStateChangeCallback; +typedef Callback1<RawData*> ArchiveFileAvailableCallback; // An ArchiveRequest object is used to carry out an asynchronous request // for a file to be loaded. @@ -104,22 +105,24 @@ class ArchiveRequest : public ObjectBase, public ArchiveCallbackClient { // ArchiveCallbackClient methods virtual void ReceiveFileHeader(const ArchiveFileInfo &file_info); virtual bool ReceiveFileData(MemoryReadStream *stream, size_t nbytes); + virtual void Close(bool success); Pack *pack() { return pack_.Get(); // Set at creation time and never changed. } - ArchiveRequestCallback *onfileavailable() { + ArchiveFileAvailableCallback *onfileavailable() { return onfileavailable_.get(); } - void set_onfileavailable(ArchiveRequestCallback *onfileavailable) { + void set_onfileavailable(ArchiveFileAvailableCallback *onfileavailable) { onfileavailable_.reset(onfileavailable); } - ArchiveRequestCallback *onreadystatechange() { + ArchiveReadyStateChangeCallback *onreadystatechange() { return onreadystatechange_.get(); } - void set_onreadystatechange(ArchiveRequestCallback *onreadystatechange) { + void set_onreadystatechange( + ArchiveReadyStateChangeCallback *onreadystatechange) { onreadystatechange_.reset(onreadystatechange); } @@ -174,8 +177,8 @@ class ArchiveRequest : public ObjectBase, public ArchiveCallbackClient { ArchiveRequest(ServiceLocator* service_locator, Pack *pack); Pack::Ref pack_; - scoped_ptr<ArchiveRequestCallback> onreadystatechange_; - scoped_ptr<ArchiveRequestCallback> onfileavailable_; + scoped_ptr<ArchiveReadyStateChangeCallback> onreadystatechange_; + scoped_ptr<ArchiveFileAvailableCallback> onfileavailable_; String uri_; // Request state @@ -184,7 +187,9 @@ class ArchiveRequest : public ObjectBase, public ArchiveCallbackClient { int ready_state_; // Like the XMLHttpRequest variable of the same name. String error_; // Set after completion on failure. - TarGzProcessor *archive_processor_; + StreamProcessor *archive_processor_; + StreamProcessor *extra_processor_; + ArchiveCallbackClient* main_thread_archive_callback_client_; std::vector<RawData::Ref> raw_data_list_; RawData::Ref raw_data_; MemoryBuffer<uint8> temp_buffer_; diff --git a/o3d/import/cross/file_output_stream_processor.cc b/o3d/import/cross/file_output_stream_processor.cc index 3983faa..eab955b 100644 --- a/o3d/import/cross/file_output_stream_processor.cc +++ b/o3d/import/cross/file_output_stream_processor.cc @@ -41,12 +41,20 @@ FileOutputStreamProcessor::FileOutputStreamProcessor(FILE* file) DCHECK(file != NULL); } -int FileOutputStreamProcessor::ProcessBytes(MemoryReadStream *stream, - size_t bytes_to_process) { +StreamProcessor::Status FileOutputStreamProcessor::ProcessBytes( + MemoryReadStream *stream, + size_t bytes_to_process) { + DCHECK(file_ != NULL); size_t num_written = fwrite(stream->GetDirectMemoryPointer(), 1, bytes_to_process, file_); - return num_written == bytes_to_process ? 0 : -1; + return num_written == bytes_to_process ? IN_PROGRESS : FAILURE; } + +void FileOutputStreamProcessor::Close(bool success) { + fclose(file_); + file_ = NULL; +} + } // namespace o3d diff --git a/o3d/import/cross/file_output_stream_processor.h b/o3d/import/cross/file_output_stream_processor.h index 3c04562..ff3b24e 100644 --- a/o3d/import/cross/file_output_stream_processor.h +++ b/o3d/import/cross/file_output_stream_processor.h @@ -44,8 +44,10 @@ class FileOutputStreamProcessor : public StreamProcessor { public: explicit FileOutputStreamProcessor(FILE* file); - virtual int ProcessBytes(MemoryReadStream *stream, - size_t bytes_to_process); + virtual Status ProcessBytes(MemoryReadStream *stream, + size_t bytes_to_process); + virtual void Close(bool success); + private: FILE* file_; DISALLOW_COPY_AND_ASSIGN(FileOutputStreamProcessor); diff --git a/o3d/import/cross/gz_compressor.cc b/o3d/import/cross/gz_compressor.cc index e42f2e7..d6a98cf 100644 --- a/o3d/import/cross/gz_compressor.cc +++ b/o3d/import/cross/gz_compressor.cc @@ -54,17 +54,19 @@ GzCompressor::GzCompressor(StreamProcessor *callback_client) strm_.opaque = Z_NULL; // Store this, so we can later check if it's OK to start processing - init_result_ = deflateInit2( + int result = deflateInit2( &strm_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, // 16 means write out gzip header/trailer DEF_MEM_LEVEL, Z_DEFAULT_STRATEGY); + + initialized_ = result == Z_OK; } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -void GzCompressor::Finalize() { +void GzCompressor::Close(bool success) { if (!stream_is_closed_) { // Flush the compression stream MemoryReadStream stream(NULL, 0); @@ -74,39 +76,38 @@ void GzCompressor::Finalize() { deflateEnd(&strm_); stream_is_closed_ = true; } + + callback_client_->Close(success); } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ GzCompressor::~GzCompressor() { - // Finalize() turns out to be a "nop" if the user has already called it - Finalize(); } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -int GzCompressor::ProcessBytes(MemoryReadStream *stream, - size_t bytes_to_process) { +StreamProcessor::Status GzCompressor::ProcessBytes(MemoryReadStream *stream, + size_t bytes_to_process) { // Basic sanity check if (stream->GetDirectMemoryPointer() == NULL || bytes_to_process == 0) { - return -1; + return FAILURE; } return CompressBytes(stream, bytes_to_process, false); } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -int GzCompressor::CompressBytes(MemoryReadStream *stream, - size_t bytes_to_process, - bool flush) { +StreamProcessor::Status GzCompressor::CompressBytes(MemoryReadStream *stream, + size_t bytes_to_process, + bool flush) { // Don't even bother trying if we didn't get initialized properly - if (init_result_ != Z_OK) return init_result_; + if (!initialized_) return FAILURE; uint8 out[kChunkSize]; - int result = Z_OK; // Don't try to read more than our stream has size_t remaining = stream->GetRemainingByteCount(); if (bytes_to_process > remaining) { - return Z_STREAM_ERROR; + return FAILURE; } // Use direct memory access on the MemoryStream object @@ -120,30 +121,29 @@ int GzCompressor::CompressBytes(MemoryReadStream *stream, // We need to flush the stream when we reach the end int flush_code = flush ? Z_FINISH : Z_NO_FLUSH; - // Run inflate() on input until output buffer not full + // Run deflate() on input until output buffer not full + int result; do { strm_.avail_out = kChunkSize; strm_.next_out = out; result = deflate(&strm_, flush_code); - - // error check here - return error codes if necessary - assert(result != Z_STREAM_ERROR); // state not clobbered + if (result == Z_STREAM_ERROR) + return FAILURE; size_t have = kChunkSize - strm_.avail_out; // Callback with the compressed byte stream MemoryReadStream decompressed_stream(out, have); if (have > 0 && callback_client_) { - int client_result = - callback_client_->ProcessBytes(&decompressed_stream, have); - if (client_result != 0) { - return client_result; // propagate callback errors + if (callback_client_->ProcessBytes(&decompressed_stream, + have) == FAILURE) { + return FAILURE; } } } while (strm_.avail_out == 0); - return result; + return result == Z_OK ? IN_PROGRESS : FAILURE; } } // namespace o3d diff --git a/o3d/import/cross/gz_compressor.h b/o3d/import/cross/gz_compressor.h index bdf13a1..43dbf0f 100644 --- a/o3d/import/cross/gz_compressor.h +++ b/o3d/import/cross/gz_compressor.h @@ -49,18 +49,19 @@ class GzCompressor : public StreamProcessor { explicit GzCompressor(StreamProcessor *callback_client); virtual ~GzCompressor(); - virtual int ProcessBytes(MemoryReadStream *stream, size_t bytes_to_process); + virtual Status ProcessBytes(MemoryReadStream *stream, + size_t bytes_to_process); // Must call when all bytes to compress have been sent (with ProcessBytes) - void Finalize(); + virtual void Close(bool success); private: - int CompressBytes(MemoryReadStream *stream, - size_t bytes_to_process, - bool flush); + Status CompressBytes(MemoryReadStream *stream, + size_t bytes_to_process, + bool flush); z_stream strm_; // low-level zlib state - int init_result_; + bool initialized_; bool stream_is_closed_; StreamProcessor *callback_client_; diff --git a/o3d/import/cross/gz_compressor_test.cc b/o3d/import/cross/gz_compressor_test.cc index 787ab06..4cebb004 100644 --- a/o3d/import/cross/gz_compressor_test.cc +++ b/o3d/import/cross/gz_compressor_test.cc @@ -56,10 +56,12 @@ class DecompressorClient : public o3d::StreamProcessor { public: explicit DecompressorClient(size_t uncompressed_byte_size) : buffer_(uncompressed_byte_size), - write_stream_(buffer_, uncompressed_byte_size) {} + write_stream_(buffer_, uncompressed_byte_size), + closed_(false), + success_(false) {} - virtual int ProcessBytes(MemoryReadStream *stream, - size_t bytes_to_process) { + virtual Status ProcessBytes(MemoryReadStream *stream, + size_t bytes_to_process) { // Make sure the output stream isn't full yet EXPECT_TRUE(write_stream_.GetRemainingByteCount() >= bytes_to_process); @@ -68,7 +70,12 @@ class DecompressorClient : public o3d::StreamProcessor { stream->Skip(bytes_to_process); write_stream_.Write(p, bytes_to_process); - return 0; // return OK + return SUCCESS; + } + + virtual void Close(bool success) { + closed_ = true; + success_ = success; } void VerifyDecompressedOutput(uint8 *original_data) { @@ -80,10 +87,20 @@ class DecompressorClient : public o3d::StreamProcessor { EXPECT_EQ(0, memcmp(original_data, buffer_, buffer_.GetLength())); } + bool closed() const { + return closed_; + } + + bool success() const { + return success_; + } + private: size_t uncompressed_byte_size_; MemoryBuffer<uint8> buffer_; MemoryWriteStream write_stream_; + bool closed_; + bool success_; }; // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -92,24 +109,27 @@ class CompressorClient : public o3d::StreamProcessor { public: explicit CompressorClient(size_t uncompressed_byte_size) : decompressor_client_(uncompressed_byte_size), - decompressor_(&decompressor_client_) { + decompressor_(&decompressor_client_), + closed_(false), + success_(false) { }; - virtual int ProcessBytes(MemoryReadStream *stream, + virtual Status ProcessBytes(MemoryReadStream *stream, size_t bytes_to_process) { // We're receiving compressed bytes here, so feed them back into // the decompressor. Since we're making a compression / decompression // round trip, we should end up with the same (initial) byte stream // we can verify this at the end - int result = decompressor_.ProcessBytes(stream, bytes_to_process); + Status status = decompressor_.ProcessBytes(stream, bytes_to_process); - // Verify the result code: - // zlib FAQ says Z_BUF_ERROR is OK - // and may occur in the middle of decompressing the stream - EXPECT_TRUE(result == Z_OK || result == Z_STREAM_END || - result == Z_BUF_ERROR); + EXPECT_TRUE(status == IN_PROGRESS || status == SUCCESS); + + return SUCCESS; + } - return 0; // no error + virtual void Close(bool success) { + closed_ = true; + success_ = success; } void VerifyDecompressedOutput(uint8 *original_data) { @@ -117,15 +137,24 @@ class CompressorClient : public o3d::StreamProcessor { decompressor_client_.VerifyDecompressedOutput(original_data); } + bool closed() const { + return closed_; + } + + bool success() const { + return success_; + } + private: DecompressorClient decompressor_client_; o3d::GzDecompressor decompressor_; + bool closed_; + bool success_; }; // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -// int main (int argc, const char * argv[]) { TEST_F(GzCompressorTest, RoundTripCompressionDecompression) { // We'll compress this file String filepath = *g_program_path + "/archive_files/keyboard.jpg"; @@ -148,14 +177,27 @@ TEST_F(GzCompressorTest, RoundTripCompressionDecompression) { // Since we're making a compression / decompression // round trip, we should end up with the same (initial) byte stream // we can verify this at the end by calling VerifyDecompressedOutput() - int result = compressor.ProcessBytes(&input_file_stream, file_length); - EXPECT_TRUE(result == Z_OK || result == Z_STREAM_END); + StreamProcessor::Status status = + compressor.ProcessBytes(&input_file_stream, file_length); + EXPECT_NE(StreamProcessor::FAILURE, status); - compressor.Finalize(); + compressor.Close(true); compressor_client.VerifyDecompressedOutput(p); + EXPECT_TRUE(compressor_client.closed()); + EXPECT_TRUE(compressor_client.success()); free(p); } +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +TEST_F(GzCompressorTest, PassesFailureThroughToClient) { + CompressorClient compressor_client(1000); + GzCompressor compressor(&compressor_client); + compressor.Close(false); + + EXPECT_TRUE(compressor_client.closed()); + EXPECT_FALSE(compressor_client.success()); +} + } // namespace o3d diff --git a/o3d/import/cross/gz_decompressor.cc b/o3d/import/cross/gz_decompressor.cc index 7fa7fee..5499958 100644 --- a/o3d/import/cross/gz_decompressor.cc +++ b/o3d/import/cross/gz_decompressor.cc @@ -54,7 +54,7 @@ GzDecompressor::GzDecompressor(StreamProcessor *callback_client) strm_.next_in = Z_NULL; // Store this, so we can later check if it's OK to start processing - init_result_ = inflateInit2(&strm_, 31); + initialized_ = inflateInit2(&strm_, 31) == Z_OK; } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -64,10 +64,10 @@ GzDecompressor::~GzDecompressor() { // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // -int GzDecompressor::ProcessBytes(MemoryReadStream *stream, - size_t bytes_to_process) { +StreamProcessor::Status GzDecompressor::ProcessBytes(MemoryReadStream *stream, + size_t bytes_to_process) { // Don't even bother trying if we didn't get initialized properly - if (init_result_ != Z_OK) return init_result_; + if (!initialized_) return FAILURE; uint8 out[kChunkSize]; int result; @@ -76,7 +76,7 @@ int GzDecompressor::ProcessBytes(MemoryReadStream *stream, // Don't try to read more than our stream has size_t remaining = stream->GetRemainingByteCount(); if (bytes_to_process > remaining) { - return Z_STREAM_ERROR; // could have our own error code, but good enough... + return FAILURE; } // Use direct memory access on the MemoryStream object @@ -95,11 +95,9 @@ int GzDecompressor::ProcessBytes(MemoryReadStream *stream, assert(result != Z_STREAM_ERROR); /* state not clobbered */ switch (result) { case Z_NEED_DICT: - result = Z_DATA_ERROR; /* and fall through */ - case Z_DATA_ERROR: case Z_MEM_ERROR: - return result; + return FAILURE; } have = kChunkSize - strm_.avail_out; @@ -107,15 +105,26 @@ int GzDecompressor::ProcessBytes(MemoryReadStream *stream, // Callback with the decompressed byte stream MemoryReadStream decompressed_stream(out, have); if (callback_client_) { - int client_result = - callback_client_->ProcessBytes(&decompressed_stream, have); - if (client_result != 0) { - return client_result; // propagate callback errors + if (callback_client_->ProcessBytes(&decompressed_stream, + have) == FAILURE) { + return FAILURE; } } } while (strm_.avail_out == 0); - return result; + switch (result) { + case Z_OK: + case Z_BUF_ERROR: // Zlib docs say this is expected. + return IN_PROGRESS; + case Z_STREAM_END: + return SUCCESS; + default: + return FAILURE; + } +} + +void GzDecompressor::Close(bool success) { + callback_client_->Close(success); } } // namespace o3d diff --git a/o3d/import/cross/gz_decompressor.h b/o3d/import/cross/gz_decompressor.h index bd56687..02d2c7a 100644 --- a/o3d/import/cross/gz_decompressor.h +++ b/o3d/import/cross/gz_decompressor.h @@ -49,12 +49,13 @@ class GzDecompressor : public StreamProcessor { explicit GzDecompressor(StreamProcessor *callback_client); virtual ~GzDecompressor(); - virtual int ProcessBytes(MemoryReadStream *stream, + virtual Status ProcessBytes(MemoryReadStream *stream, size_t bytes_to_process); + virtual void Close(bool success); private: z_stream strm_; // low-level zlib state - int init_result_; + bool initialized_; StreamProcessor *callback_client_; DISALLOW_COPY_AND_ASSIGN(GzDecompressor); diff --git a/o3d/import/cross/gz_decompressor_test.cc b/o3d/import/cross/gz_decompressor_test.cc index af45ecd..4bb71d6 100644 --- a/o3d/import/cross/gz_decompressor_test.cc +++ b/o3d/import/cross/gz_decompressor_test.cc @@ -51,12 +51,14 @@ class GzDecompressorTest : public testing::Test { // class GzTestClient : public StreamProcessor { public: - explicit GzTestClient(size_t uncompressed_size) { + explicit GzTestClient(size_t uncompressed_size) + : closed_(false), + success_(false) { buffer_.Allocate(uncompressed_size); stream_.Assign(buffer_, uncompressed_size); } - virtual int ProcessBytes(MemoryReadStream *input_stream, + virtual Status ProcessBytes(MemoryReadStream *input_stream, size_t bytes_to_process) { // Buffer the uncompressed bytes we're given const uint8 *p = input_stream->GetDirectMemoryPointer(); @@ -65,16 +67,31 @@ class GzTestClient : public StreamProcessor { size_t bytes_written = stream_.Write(p, bytes_to_process); EXPECT_EQ(bytes_written, bytes_to_process); - return Z_OK; + return SUCCESS; + } + + virtual void Close(bool success) { + closed_ = true; + success_ = success; } // When we're done decompressing, we can check the results here uint8 *GetResultBuffer() { return buffer_; } size_t GetResultLength() const { return stream_.GetStreamPosition(); } + bool closed() const { + return closed_; + } + + bool success() const { + return success_; + } + private: MemoryBuffer<uint8> buffer_; MemoryWriteStream stream_; + bool closed_; + bool success_; }; // Loads a tar.gz file, runs it through the processor. @@ -109,19 +126,20 @@ TEST_F(GzDecompressorTest, LoadGzFile) { MemoryReadStream compressed_stream(compressed_data, compressed_size); size_t bytes_to_process = compressed_size; - int result = Z_OK; + StreamProcessor::Status status = StreamProcessor::SUCCESS; while (bytes_to_process > 0) { size_t bytes_this_time = bytes_to_process < kChunkSize ? bytes_to_process : kChunkSize; - result = decompressor.ProcessBytes(&compressed_stream, bytes_this_time); - EXPECT_TRUE(result == Z_OK || result == Z_STREAM_END); + status = decompressor.ProcessBytes(&compressed_stream, bytes_this_time); + EXPECT_TRUE(status != StreamProcessor::FAILURE); bytes_to_process -= bytes_this_time; } - // When the decompressor has finished it should return Z_STREAM_END - EXPECT_TRUE(result == Z_STREAM_END); + decompressor.Close(true); + + EXPECT_TRUE(status == StreamProcessor::SUCCESS); // Now let's verify that what we just decompressed matches exactly // what's in the reference file... @@ -130,13 +148,25 @@ TEST_F(GzDecompressorTest, LoadGzFile) { EXPECT_EQ(decompressor_client.GetResultLength(), uncompressed_size); // Now check the data - result = memcmp(decompressor_client.GetResultBuffer(), - expected_uncompressed_data, - uncompressed_size); + int result = memcmp(decompressor_client.GetResultBuffer(), + expected_uncompressed_data, + uncompressed_size); EXPECT_EQ(0, result); + EXPECT_TRUE(decompressor_client.closed()); + EXPECT_TRUE(decompressor_client.success()); + free(compressed_data); free(expected_uncompressed_data); } +TEST_F(GzDecompressorTest, PassesFailureThroughToClient) { + GzTestClient decompressor_client(1000); + GzDecompressor decompressor(&decompressor_client); + decompressor.Close(false); + + EXPECT_TRUE(decompressor_client.closed()); + EXPECT_FALSE(decompressor_client.success()); +} + } // namespace o3d diff --git a/o3d/import/cross/iarchive_generator.h b/o3d/import/cross/iarchive_generator.h index 4f662da..04f3a46 100644 --- a/o3d/import/cross/iarchive_generator.h +++ b/o3d/import/cross/iarchive_generator.h @@ -48,13 +48,16 @@ class IArchiveGenerator { // Call AddFile() for each file entry, followed by calls to AddFileBytes() // for the file's data - virtual void AddFile(const String& file_name, + virtual bool AddFile(const String& file_name, size_t file_size) = 0; // Call with the file's data (after calling AddFile) // may be called one time will all the file's data, or multiple times // until all the data is provided virtual int AddFileBytes(MemoryReadStream* stream, size_t n) = 0; + + // Must be called to finish the archiving operation. + virtual void Close(bool success) = 0; }; } // namespace o3d diff --git a/o3d/import/cross/main_thread_archive_callback_client.cc b/o3d/import/cross/main_thread_archive_callback_client.cc new file mode 100644 index 0000000..2fbbaaa --- /dev/null +++ b/o3d/import/cross/main_thread_archive_callback_client.cc @@ -0,0 +1,138 @@ +/* + * Copyright 2009, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +// MainThreadArchiveCallbackClient forwards data from one thread to +// another client that will run on the main thread. + +#include "core/cross/imain_thread_task_poster.h" +#include "core/cross/service_locator.h" +#include "import/cross/main_thread_archive_callback_client.h" +#include "base/task.h" + +namespace o3d { + +// Called on main thread. +MainThreadArchiveCallbackClient::MainThreadArchiveCallbackClient( + ServiceLocator* service_locator, + ArchiveCallbackClient *receiver) + : main_thread_task_poster_( + service_locator->GetService<IMainThreadTaskPoster>()), + receiver_(receiver), + success_(new Success) { + success_->value = true; +} + +// Called on main thread. +MainThreadArchiveCallbackClient::~MainThreadArchiveCallbackClient() { + // Signal pending tasks on main thread not to call the receiver. They are not + // running concurrently because this destructor is also called on the main + // thread. + success_->value = false; +} + +// Called on other thread. +void MainThreadArchiveCallbackClient::ReceiveFileHeader( + const ArchiveFileInfo &file_info) { + main_thread_task_poster_->PostTask(NewRunnableFunction( + &MainThreadArchiveCallbackClient::ForwardReceiveFileHeader, + success_, + receiver_, + file_info)); +} + +// Called on other thread. +bool MainThreadArchiveCallbackClient::ReceiveFileData( + MemoryReadStream *stream, size_t size) { + // Copy stream into temporary buffer. Deleted after it is received by the main + // thread. + uint8* buffer = new uint8[size]; + stream->Read(buffer, size); + + main_thread_task_poster_->PostTask(NewRunnableFunction( + &MainThreadArchiveCallbackClient::ForwardReceiveFileData, + success_, + receiver_, + buffer, + size)); + + return success_->value; +} + +void MainThreadArchiveCallbackClient::Close(bool success) { + main_thread_task_poster_->PostTask(NewRunnableFunction( + &MainThreadArchiveCallbackClient::ForwardClose, + success_, + receiver_, + success)); +} + +// Called on main thread. +void MainThreadArchiveCallbackClient::ForwardReceiveFileHeader( + SuccessPtr success, + ArchiveCallbackClient* client, + ArchiveFileInfo file_info) { + if (success->value) { + client->ReceiveFileHeader(file_info); + } +} + +// Called on main thread. +void MainThreadArchiveCallbackClient::ForwardReceiveFileData( + SuccessPtr success, + ArchiveCallbackClient* client, + uint8* bytes, size_t size) { + // Just delete the buffer if there was a previous failure. + if (!success->value) { + delete[] bytes; + return; + } + + MemoryReadStream stream(bytes, size); + if (!client->ReceiveFileData(&stream, size)) { + // Do not set this to true on success. That might overwrite a false written + // by the other thread intended to indicate that the receiver is no + // longer valid. + success->value = false; + } + + // Delete temporary buffer allocated by other thread when the main thread is + // finished with it. + delete[] bytes; +} + +void MainThreadArchiveCallbackClient::ForwardClose( + SuccessPtr success_ptr, + ArchiveCallbackClient* client, + bool success) { + client->Close(success && success_ptr->value); +} + +} // namespace o3d diff --git a/o3d/import/cross/main_thread_archive_callback_client.h b/o3d/import/cross/main_thread_archive_callback_client.h new file mode 100644 index 0000000..7da39f7 --- /dev/null +++ b/o3d/import/cross/main_thread_archive_callback_client.h @@ -0,0 +1,85 @@ +/* + * Copyright 2009, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +// MainThreadArchiveCallbackClient forwards data from one thread to +// another client that will run on the main thread. + +#ifndef O3D_IMPORT_CROSS_MAIN_THREAD_ARCHIVE_CALLBACK_CLIENT_H_ +#define O3D_IMPORT_CROSS_MAIN_THREAD_ARCHIVE_CALLBACK_CLIENT_H_ + +#include "base/basictypes.h" +#include "base/thread.h" +#include "import/cross/archive_processor.h" +#include "import/cross/memory_stream.h" + +namespace o3d { + +class IMainThreadTaskPoster; +class ServiceLocator; + +class MainThreadArchiveCallbackClient : public ArchiveCallbackClient { + public: + MainThreadArchiveCallbackClient(ServiceLocator* service_locator, + ArchiveCallbackClient *receiver); + virtual ~MainThreadArchiveCallbackClient(); + + virtual void ReceiveFileHeader(const ArchiveFileInfo &file_info); + virtual bool ReceiveFileData(MemoryReadStream *stream, size_t size); + virtual void Close(bool success); + + private: + struct Success : ::base::RefCountedThreadSafe< Success > { + bool value; + }; + + typedef scoped_refptr<Success> SuccessPtr; + + static void ForwardReceiveFileHeader(SuccessPtr success, + ArchiveCallbackClient* client, + ArchiveFileInfo file_info); + + static void ForwardReceiveFileData(SuccessPtr success, + ArchiveCallbackClient* client, + uint8* bytes, size_t size); + + static void ForwardClose(SuccessPtr success_ptr, + ArchiveCallbackClient* client, + bool success); + + IMainThreadTaskPoster* main_thread_task_poster_; + ArchiveCallbackClient* receiver_; + SuccessPtr success_; + + DISALLOW_COPY_AND_ASSIGN(MainThreadArchiveCallbackClient); +}; +} // namespace o3d + +#endif // O3D_IMPORT_CROSS_MAIN_THREAD_ARCHIVE_CALLBACK_CLIENT_H_ diff --git a/o3d/import/cross/main_thread_archive_callback_client_test.cc b/o3d/import/cross/main_thread_archive_callback_client_test.cc new file mode 100644 index 0000000..ac3243a --- /dev/null +++ b/o3d/import/cross/main_thread_archive_callback_client_test.cc @@ -0,0 +1,219 @@ +/* + * Copyright 2009, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + +// Tests functionality of the MainThreadArchiveCallbackClient class + +#include "tests/common/win/testing_common.h" +#include "base/task.h" +#include "core/cross/imain_thread_task_poster.h" +#include "core/cross/service_implementation.h" +#include "import/cross/main_thread_archive_callback_client.h" + +namespace o3d { + +// Simple standin for IMainThreadTaskPoster that invokes tasks asynchronously. +class TestMainThreadTaskPoster : IMainThreadTaskPoster { + public: + TestMainThreadTaskPoster(ServiceLocator* service_locator) + : service_(service_locator, this) { + } + + virtual bool IsSupported() { + return true; + } + + virtual void PostTask(Task* task) { + tasks_.push_back(task); + } + + void Flush() { + for (size_t i = 0; i < tasks_.size(); ++i) { + tasks_[i]->Run(); + delete tasks_[i]; + } + tasks_.clear(); + } + + private: + ServiceImplementation<IMainThreadTaskPoster> service_; + std::vector<Task*> tasks_; +}; + +class MockArchiveCallbackClient : public ArchiveCallbackClient { + public: + MockArchiveCallbackClient() + : receive_file_header_calls_(0), + receive_file_header_calls_file_info_("", 0), + receive_file_data_calls_(0), + receive_file_data_stream_(NULL), + receive_file_data_size_(0), + receive_file_data_result_(false), + close_calls_(0), + close_success_(false) { + } + + virtual void ReceiveFileHeader(const ArchiveFileInfo &file_info) { + ++receive_file_header_calls_; + receive_file_header_calls_file_info_ = file_info; + } + + virtual bool ReceiveFileData(MemoryReadStream* stream, size_t size) { + ++receive_file_data_calls_; + receive_file_data_stream_.reset(new uint8[size]); + stream->Read(receive_file_data_stream_.get(), size); + receive_file_data_size_ = size; + return receive_file_data_result_; + } + + virtual void Close(bool success) { + ++close_calls_; + close_success_ = success; + } + + int receive_file_header_calls_; + ArchiveFileInfo receive_file_header_calls_file_info_; + + int receive_file_data_calls_; + scoped_array<uint8> receive_file_data_stream_; + size_t receive_file_data_size_; + bool receive_file_data_result_; + + int close_calls_; + bool close_success_; +}; + +class MainThreadArchiveCallbackClientTest : public testing::Test { + public: + MainThreadArchiveCallbackClientTest() + : main_thread_task_poster_(&service_locator_) { + } + + protected: + virtual void SetUp() { + receiver_client_ = new MockArchiveCallbackClient; + main_thread_client_ = new MainThreadArchiveCallbackClient( + &service_locator_, receiver_client_); + } + + virtual void TearDown() { + delete receiver_client_; + delete main_thread_client_; + } + + ServiceLocator service_locator_; + TestMainThreadTaskPoster main_thread_task_poster_; + MockArchiveCallbackClient* receiver_client_; + MainThreadArchiveCallbackClient* main_thread_client_; +}; + +TEST_F(MainThreadArchiveCallbackClientTest, + ReceiveFileHeaderForwardsToReceiver) { + ArchiveFileInfo info("hello", 7); + main_thread_client_->ReceiveFileHeader(info); + main_thread_task_poster_.Flush(); + + EXPECT_EQ(1, receiver_client_->receive_file_header_calls_); + EXPECT_EQ("hello", + receiver_client_->receive_file_header_calls_file_info_.GetFileName()); + EXPECT_EQ(7, + receiver_client_->receive_file_header_calls_file_info_.GetFileSize()); +} + +TEST_F(MainThreadArchiveCallbackClientTest, + ReceiveFileDataForwardsToReceiverAndReturnsTrueFirstTime) { + uint8 buffer[] = {1, 2, 3}; + MemoryReadStream stream(buffer, sizeof(buffer)); + receiver_client_->receive_file_data_result_ = true; + EXPECT_TRUE(main_thread_client_->ReceiveFileData(&stream, sizeof(buffer))); + main_thread_task_poster_.Flush(); + + EXPECT_EQ(1, receiver_client_->receive_file_data_calls_); + EXPECT_EQ(0, memcmp(receiver_client_->receive_file_data_stream_.get(), + buffer, sizeof(buffer))); + EXPECT_EQ(sizeof(buffer), receiver_client_->receive_file_data_size_); +} + +TEST_F(MainThreadArchiveCallbackClientTest, + ReceiveFileDataReportsFailureInSubsequentCall) { + receiver_client_->receive_file_data_result_ = true; + + uint8 buffer[] = {1, 2, 3, 4, 5, 6}; + MemoryReadStream stream(buffer, sizeof(buffer)); + + receiver_client_->receive_file_data_result_ = false; + EXPECT_TRUE(main_thread_client_->ReceiveFileData(&stream, 3)); + main_thread_task_poster_.Flush(); + EXPECT_EQ(1, receiver_client_->receive_file_data_calls_); + + + receiver_client_->receive_file_data_result_ = false; + EXPECT_FALSE(main_thread_client_->ReceiveFileData(&stream, 3)); + main_thread_task_poster_.Flush(); + EXPECT_EQ(1, receiver_client_->receive_file_data_calls_); +} + +TEST_F(MainThreadArchiveCallbackClientTest, + ReceiveFileHeaderForwardsUnsuccessfulClose) { + main_thread_client_->Close(false); + main_thread_task_poster_.Flush(); + + EXPECT_EQ(1, receiver_client_->close_calls_); + EXPECT_FALSE(receiver_client_->close_success_); +} + +TEST_F(MainThreadArchiveCallbackClientTest, + ReceiveFileHeaderForwardsSuccessfulClose) { + main_thread_client_->Close(true); + main_thread_task_poster_.Flush(); + + EXPECT_EQ(1, receiver_client_->close_calls_); + EXPECT_TRUE(receiver_client_->close_success_); +} + +TEST_F(MainThreadArchiveCallbackClientTest, + CloseForwardsFailureOfPreviousUnreportedCall) { + receiver_client_->receive_file_data_result_ = true; + + uint8 buffer[] = {1, 2, 3}; + MemoryReadStream stream(buffer, sizeof(buffer)); + + receiver_client_->receive_file_data_result_ = false; + EXPECT_TRUE(main_thread_client_->ReceiveFileData(&stream, sizeof(buffer))); + main_thread_task_poster_.Flush(); + EXPECT_EQ(1, receiver_client_->receive_file_data_calls_); + + main_thread_client_->Close(true); + main_thread_task_poster_.Flush(); + EXPECT_FALSE(receiver_client_->close_success_); +} + +} // namespace o3d diff --git a/o3d/import/cross/memory_stream.h b/o3d/import/cross/memory_stream.h index e71379a..2801b2b 100644 --- a/o3d/import/cross/memory_stream.h +++ b/o3d/import/cross/memory_stream.h @@ -266,9 +266,21 @@ class MemoryWriteStream { // Abstract interface to process a memory stream class StreamProcessor { public: + +// Status is defined in Linux +#ifdef Status +#undef Status +#endif + enum Status { + IN_PROGRESS, + SUCCESS, + FAILURE, + }; + virtual ~StreamProcessor() {} - virtual int ProcessBytes(MemoryReadStream *stream, - size_t bytes_to_process) = 0; + virtual Status ProcessBytes(MemoryReadStream *stream, + size_t bytes_to_process) = 0; + virtual void Close(bool success) = 0; }; } // namespace o3d diff --git a/o3d/import/cross/tar_generator.cc b/o3d/import/cross/tar_generator.cc index ed72e05..cbfab24 100644 --- a/o3d/import/cross/tar_generator.cc +++ b/o3d/import/cross/tar_generator.cc @@ -234,8 +234,9 @@ void TarGenerator::FlushDataBuffer(bool flush_padding_zeroes) { } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -void TarGenerator::Finalize() { +void TarGenerator::Close(bool success) { FlushDataBuffer(true); + callback_client_->Close(success); } } // namespace o3d diff --git a/o3d/import/cross/tar_generator.h b/o3d/import/cross/tar_generator.h index 21bc733..7bb0ac4 100644 --- a/o3d/import/cross/tar_generator.h +++ b/o3d/import/cross/tar_generator.h @@ -50,21 +50,20 @@ #include <string> #include "base/basictypes.h" #include "core/cross/types.h" +#include "import/cross/iarchive_generator.h" #include "import/cross/memory_buffer.h" #include "import/cross/memory_stream.h" namespace o3d { // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -class TarGenerator { +class TarGenerator : public IArchiveGenerator { public: explicit TarGenerator(StreamProcessor *callback_client) : callback_client_(callback_client), data_block_buffer_(TAR_BLOCK_SIZE), // initialized to zeroes data_buffer_stream_(data_block_buffer_, TAR_BLOCK_SIZE) {} - virtual ~TarGenerator() { Finalize(); } - // Call AddFile() for each file entry, followed by calls to AddFileBytes() // for the file's data. Returns true on success. virtual bool AddFile(const String &file_name, size_t file_size); @@ -75,7 +74,7 @@ class TarGenerator { virtual int AddFileBytes(MemoryReadStream *stream, size_t n); // Must call this after all files and file data have been written - virtual void Finalize(); + virtual void Close(bool success); private: // Returns true on success. diff --git a/o3d/import/cross/tar_generator_test.cc b/o3d/import/cross/tar_generator_test.cc index 9403d62..0e6dccb 100644 --- a/o3d/import/cross/tar_generator_test.cc +++ b/o3d/import/cross/tar_generator_test.cc @@ -132,15 +132,30 @@ class CallbackClient : public StreamProcessor { : state_(VALIDATE_DIRECTORY_HEADER1), total_bytes_received_(0), memory_block_(kBlockSize), - write_stream_(memory_block_, kBlockSize) { + write_stream_(memory_block_, kBlockSize), + closed_(false), + success_(false) { } - virtual int ProcessBytes(MemoryReadStream *stream, + virtual Status ProcessBytes(MemoryReadStream *stream, size_t bytes_to_process); + virtual void Close(bool success) { + closed_ = true; + success_ = success; + } + size_t GetTotalBytesReceived() { return total_bytes_received_; } int GetState() { return state_; } + bool closed() const { + return closed_; + } + + bool success() const { + return success_; + } + private: bool IsOctalDigit(uint8 c); bool IsOctalString(uint8 *p); @@ -159,11 +174,13 @@ class CallbackClient : public StreamProcessor { size_t total_bytes_received_; MemoryBuffer<uint8> memory_block_; MemoryWriteStream write_stream_; + bool closed_; + bool success_; }; // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -int CallbackClient::ProcessBytes(MemoryReadStream *stream, - size_t bytes_to_process) { +StreamProcessor::Status CallbackClient::ProcessBytes(MemoryReadStream *stream, + size_t bytes_to_process) { total_bytes_received_ += bytes_to_process; while (bytes_to_process > 0) { @@ -254,7 +271,7 @@ int CallbackClient::ProcessBytes(MemoryReadStream *stream, bytes_to_process -= bytes_this_time; } - return 0; + return IN_PROGRESS; } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -403,7 +420,7 @@ TEST_F(TarGeneratorTest, CreateSimpleArchive) { kFileLength3); generator.AddFileBytes(&file4_stream, kFileLength3); - generator.Finalize(); + generator.Close(true); // Verify that the tar byte stream produced is exactly divisible by // the block size @@ -412,6 +429,18 @@ TEST_F(TarGeneratorTest, CreateSimpleArchive) { // Make sure the state machine is in the expected state EXPECT_EQ(CallbackClient::FINISHED, client.GetState()); + + EXPECT_TRUE(client.closed()); + EXPECT_TRUE(client.success()); +} + +TEST_F(TarGeneratorTest, PassesThroughFailure) { + CallbackClient client; + TarGenerator generator(&client); + generator.Close(false); + + EXPECT_TRUE(client.closed()); + EXPECT_FALSE(client.success()); } } // namespace diff --git a/o3d/import/cross/tar_processor.cc b/o3d/import/cross/tar_processor.cc index 7b14c22..b4c268e 100644 --- a/o3d/import/cross/tar_processor.cc +++ b/o3d/import/cross/tar_processor.cc @@ -41,7 +41,8 @@ static const int kFileSizeOffset = 124; static const int kLinkFlagOffset = 156; // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -int TarProcessor::ProcessBytes(MemoryReadStream *stream, size_t n) { +StreamProcessor::Status TarProcessor::ProcessBytes(MemoryReadStream *stream, + size_t n) { // Keep processing the byte-stream until we've consumed all we're given // size_t bytes_to_consume = n; @@ -56,7 +57,7 @@ int TarProcessor::ProcessBytes(MemoryReadStream *stream, size_t n) { stream->Read(reinterpret_cast<uint8*>(header_ + header_bytes_read_), bytes_to_read); if (bytes_read != bytes_to_read) { - return -1; + return FAILURE; } header_bytes_read_ += bytes_to_read; @@ -73,7 +74,7 @@ int TarProcessor::ProcessBytes(MemoryReadStream *stream, size_t n) { getting_filename_ = true; // We should pick some size that's too large. if (file_size > 1024) { - return -1; + return FAILURE; } } else { getting_filename_ = false; @@ -92,10 +93,10 @@ int TarProcessor::ProcessBytes(MemoryReadStream *stream, size_t n) { ArchiveFileInfo info(filename, file_size); callback_client_->ReceiveFileHeader(info); } else if (header_[0] == 0) { - // If filename is NULL due to zero-padding then file size - // should also be NULL - // TODO(gman): Won't this crash the plugin if I make a bad tar? - assert(file_size == 0); + // If filename is empty due to zero-padding then file size + // should also be zero. + if (file_size != 0) + return FAILURE; } } @@ -134,7 +135,7 @@ int TarProcessor::ProcessBytes(MemoryReadStream *stream, size_t n) { } else { if (!callback_client_->ReceiveFileData(&client_read_stream, client_bytes_this_time)) { - return -1; + return FAILURE; } } @@ -166,7 +167,11 @@ int TarProcessor::ProcessBytes(MemoryReadStream *stream, size_t n) { } } - return 0; + return IN_PROGRESS; +} + +void TarProcessor::Close(bool success) { + callback_client_->Close(success); } } // namespace o3d diff --git a/o3d/import/cross/tar_processor.h b/o3d/import/cross/tar_processor.h index 76976c3..66d4b4aa 100644 --- a/o3d/import/cross/tar_processor.h +++ b/o3d/import/cross/tar_processor.h @@ -66,7 +66,9 @@ class TarProcessor : public StreamProcessor { // Call to "push" bytes to be processed - the appropriate callback will get // called when we have enough data - virtual int ProcessBytes(MemoryReadStream *stream, size_t n); + virtual Status ProcessBytes(MemoryReadStream *stream, size_t n); + + virtual void Close(bool success); private: enum {TAR_HEADER_SIZE = 512}; diff --git a/o3d/import/cross/tar_processor_test.cc b/o3d/import/cross/tar_processor_test.cc index 1317042..77a2522 100644 --- a/o3d/import/cross/tar_processor_test.cc +++ b/o3d/import/cross/tar_processor_test.cc @@ -67,12 +67,27 @@ class TarTestClient : public ArchiveCallbackClient { virtual void ReceiveFileHeader(const ArchiveFileInfo &file_info); virtual bool ReceiveFileData(MemoryReadStream *stream, size_t nbytes); + virtual void Close(bool success) { + closed_ = true; + success_ = success; + } + int GetFileCount() const { return file_count_; } size_t GetNumTotalBytesReceived() const { return index_; } + bool closed() const { + return closed_; + } + + bool success() const { + return success_; + } + private: int file_count_; int index_; + bool closed_; + bool success_; }; // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -142,18 +157,31 @@ TEST_F(TarProcessorTest, LoadTarFile) { MemoryReadStream tar_stream(tar_data, file_size); size_t bytes_to_process = file_size; - int result = Z_OK; while (bytes_to_process > 0) { size_t bytes_this_time = bytes_to_process < kChunkSize ? bytes_to_process : kChunkSize; - result = tar_processor.ProcessBytes(&tar_stream, bytes_this_time); - EXPECT_TRUE(result == Z_OK); + StreamProcessor::Status status = tar_processor.ProcessBytes( + &tar_stream, bytes_this_time); + EXPECT_TRUE(status != StreamProcessor::FAILURE); bytes_to_process -= bytes_this_time; } + tar_processor.Close(true); + + EXPECT_TRUE(callback_client.closed()); + EXPECT_TRUE(callback_client.success()); free(tar_data); } +TEST_F(TarProcessorTest, PassesThroughFailure) { + TarTestClient callback_client; + TarProcessor tar_processor(&callback_client); + tar_processor.Close(false); + + EXPECT_TRUE(callback_client.closed()); + EXPECT_FALSE(callback_client.success()); +} + } // namespace diff --git a/o3d/import/cross/targz_generator.cc b/o3d/import/cross/targz_generator.cc index 02cb5b3..aa75712 100644 --- a/o3d/import/cross/targz_generator.cc +++ b/o3d/import/cross/targz_generator.cc @@ -37,12 +37,12 @@ namespace o3d { // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ TarGzGenerator::TarGzGenerator(StreamProcessor *callback_client) - : gz_compressor_(callback_client), tar_generator_(this) { + : gz_compressor_(callback_client), tar_generator_(&gz_compressor_) { } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -void TarGzGenerator::AddFile(const String &file_name, size_t file_size) { - tar_generator_.AddFile(file_name, file_size); +bool TarGzGenerator::AddFile(const String &file_name, size_t file_size) { + return tar_generator_.AddFile(file_name, file_size); } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -57,16 +57,8 @@ int TarGzGenerator::AddFileBytes(const uint8 *data, size_t n) { } // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -void TarGzGenerator::Finalize() { - tar_generator_.Finalize(); - gz_compressor_.Finalize(); -} - -// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -int TarGzGenerator::ProcessBytes(MemoryReadStream *stream, - size_t bytes_to_process) { - // pass bytestream from tar generator to the compressor - return gz_compressor_.ProcessBytes(stream, bytes_to_process); +void TarGzGenerator::Close(bool success) { + tar_generator_.Close(success); } } // namespace o3d diff --git a/o3d/import/cross/targz_generator.h b/o3d/import/cross/targz_generator.h index fd58c86..b8eb216 100644 --- a/o3d/import/cross/targz_generator.h +++ b/o3d/import/cross/targz_generator.h @@ -54,32 +54,26 @@ namespace o3d { // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -class TarGzGenerator : public StreamProcessor, public IArchiveGenerator { +class TarGzGenerator : public IArchiveGenerator { public: // |callback_client| receives the tar.gz byte stream explicit TarGzGenerator(StreamProcessor *callback_client); // Call AddFile() for each file entry, followed by calls to AddFileBytes() // for the file's data - void AddFile(const String &file_name, - size_t file_size); + virtual bool AddFile(const String &file_name, + size_t file_size); // Call with the file's data (after calling AddFile) // may be called one time with all the file's data, or multiple times // until all the data is provided - int AddFileBytes(MemoryReadStream *stream, size_t n); + virtual int AddFileBytes(MemoryReadStream *stream, size_t n); int AddFileBytes(const uint8 *data, size_t n); // Must call this after all files and file data have been written - virtual void Finalize(); + virtual void Close(bool success); private: - // StreamProcessor method: - // Receives the tar bytestream from the TarGenerator. - // It then feeds this into the GzCompressor - virtual int ProcessBytes(MemoryReadStream *stream, - size_t bytes_to_process); - GzCompressor gz_compressor_; TarGenerator tar_generator_; diff --git a/o3d/import/cross/targz_generator_test.cc b/o3d/import/cross/targz_generator_test.cc index 3104890..2e1cf7b 100644 --- a/o3d/import/cross/targz_generator_test.cc +++ b/o3d/import/cross/targz_generator_test.cc @@ -54,10 +54,12 @@ class TarGzTestClient : public StreamProcessor { public: explicit TarGzTestClient(size_t reference_size) : compressed_data_(reference_size), - write_stream_(compressed_data_, reference_size) { + write_stream_(compressed_data_, reference_size), + closed_(false), + success_(false) { }; - virtual int ProcessBytes(MemoryReadStream *stream, + virtual Status ProcessBytes(MemoryReadStream *stream, size_t bytes_to_process) { // Simply buffer up the tar.gz bytes // When we've gotten them all our Validate() method will be called @@ -69,7 +71,12 @@ class TarGzTestClient : public StreamProcessor { write_stream_.Write(p, bytes_to_process); - return 0; + return SUCCESS; + } + + virtual void Close(bool success) { + closed_ = true; + success_ = success; } // Checks that the data from the reference tar.gz file matches the tar.gz @@ -95,9 +102,19 @@ class TarGzTestClient : public StreamProcessor { } #endif + bool closed() const { + return closed_; + } + + bool success() const { + return success_; + } + private: MemoryBuffer<uint8> compressed_data_; MemoryWriteStream write_stream_; + bool closed_; + bool success_; DISALLOW_COPY_AND_ASSIGN(TarGzTestClient); }; @@ -142,7 +159,7 @@ TEST_F(TarGzGeneratorTest, GenerateTarGz) { targz_generator.AddFile("test/shaders/BumpReflect.fx", shader_size); targz_generator.AddFileBytes(shader_data, shader_size); - targz_generator.Finalize(); + targz_generator.Close(true); #if defined(GENERATE_GOLDEN) std::string new_golden_file = *g_program_path + @@ -163,10 +180,21 @@ TEST_F(TarGzGeneratorTest, GenerateTarGz) { received_data, test_client.compressed_data_length())); + EXPECT_TRUE(test_client.closed()); + EXPECT_TRUE(test_client.success()); + free(targz_data); free(image_data); free(audio_data); free(shader_data); } +TEST_F(TarGzGeneratorTest, PassesThroughCloseFailure) { + TarGzTestClient test_client(1000); + TarGzGenerator targz_generator(&test_client); + targz_generator.Close(false); + + EXPECT_TRUE(test_client.closed()); + EXPECT_FALSE(test_client.success()); +} } // namespace o3d diff --git a/o3d/import/cross/targz_processor.cc b/o3d/import/cross/targz_processor.cc index a2a4ba8..5fb1275 100644 --- a/o3d/import/cross/targz_processor.cc +++ b/o3d/import/cross/targz_processor.cc @@ -57,10 +57,13 @@ TarGzProcessor::TarGzProcessor(ArchiveCallbackClient *callback_client) // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // -int TarGzProcessor::ProcessCompressedBytes(MemoryReadStream *stream, - size_t bytes_to_process) { - int result = gz_decompressor_.ProcessBytes(stream, bytes_to_process); - return result; +StreamProcessor::Status TarGzProcessor::ProcessBytes(MemoryReadStream *stream, + size_t bytes_to_process) { + return gz_decompressor_.ProcessBytes(stream, bytes_to_process); +} + +void TarGzProcessor::Close(bool success) { + gz_decompressor_.Close(success); } } // namespace o3d diff --git a/o3d/import/cross/targz_processor.h b/o3d/import/cross/targz_processor.h index ef83278..061d66e 100644 --- a/o3d/import/cross/targz_processor.h +++ b/o3d/import/cross/targz_processor.h @@ -46,12 +46,14 @@ namespace o3d { // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -class TarGzProcessor : public ArchiveProcessor { +class TarGzProcessor : public ArchiveProcessor { public: explicit TarGzProcessor(ArchiveCallbackClient *callback_client); - virtual int ProcessCompressedBytes(MemoryReadStream *stream, - size_t bytes_to_process); + virtual Status ProcessBytes(MemoryReadStream *stream, + size_t bytes_to_process); + + virtual void Close(bool success); private: TarProcessor tar_processor_; diff --git a/o3d/import/cross/targz_processor_test.cc b/o3d/import/cross/targz_processor_test.cc index 5fc5914..a351ec8 100644 --- a/o3d/import/cross/targz_processor_test.cc +++ b/o3d/import/cross/targz_processor_test.cc @@ -56,17 +56,37 @@ const char *kConcatenatedContents = // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ class ArchiveTestClient : public ArchiveCallbackClient { public: - explicit ArchiveTestClient() : file_count_(0), index_(0) {} + explicit ArchiveTestClient() + : file_count_(0), + index_(0), + closed_(false), + success_(false) {} + // ArchiveCallbackClient methods virtual void ReceiveFileHeader(const ArchiveFileInfo &file_info); virtual bool ReceiveFileData(MemoryReadStream *stream, size_t nbytes); + virtual void Close(bool success) { + closed_ = true; + success_ = success; + } + int GetFileCount() const { return file_count_; } size_t GetNumTotalBytesReceived() const { return index_; } + bool closed() const { + return closed_; + } + + bool success() const { + return success_; + } + private: int file_count_; int index_; + bool closed_; + bool success_; }; // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -119,12 +139,17 @@ TEST_F(TarGzProcessorTest, LoadTarGzFile) { ArchiveTestClient test_callback_client; TarGzProcessor processor(&test_callback_client); - int result = processor.ProcessFile(filepath.c_str()); - EXPECT_EQ(Z_OK, result); + StreamProcessor::Status status = processor.ProcessFile(filepath.c_str()); + processor.Close(true); + + EXPECT_EQ(StreamProcessor::SUCCESS, status); EXPECT_EQ(3, test_callback_client.GetFileCount()); EXPECT_EQ(strlen(kConcatenatedContents), test_callback_client.GetNumTotalBytesReceived()); + + EXPECT_TRUE(test_callback_client.closed()); + EXPECT_TRUE(test_callback_client.success()); } // Tries to load something with a tar.gz extension, but which isn't @@ -136,8 +161,12 @@ TEST_F(TarGzProcessorTest, LoadBogusTarGzFile) { ArchiveTestClient test_callback_client; TarGzProcessor processor(&test_callback_client); - int result = processor.ProcessFile(filepath.c_str()); - EXPECT_TRUE(result != Z_OK); -} + StreamProcessor::Status status = processor.ProcessFile(filepath.c_str()); + processor.Close(false); + + EXPECT_EQ(StreamProcessor::FAILURE, status); + EXPECT_TRUE(test_callback_client.closed()); + EXPECT_FALSE(test_callback_client.success()); +} } // namespace diff --git a/o3d/import/cross/threaded_stream_processor.cc b/o3d/import/cross/threaded_stream_processor.cc new file mode 100644 index 0000000..c389c54 --- /dev/null +++ b/o3d/import/cross/threaded_stream_processor.cc @@ -0,0 +1,116 @@ +/* + * Copyright 2009, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +// ThreadedStreamProcessor forwards data from one thread to +// another processor that will run on a new thread owned by the +// ThreadedStreamProcessor. + +#include "import/cross/threaded_stream_processor.h" +#include "base/task.h" + +namespace o3d { + +ThreadedStreamProcessor::ThreadedStreamProcessor(StreamProcessor *receiver) + : receiver_(receiver), + thread_("ThreadedStreamProcessor"), + status_(IN_PROGRESS) { +} + +ThreadedStreamProcessor::~ThreadedStreamProcessor() { + // Wait for the other thread to stop so it does not access this object after + // it has been destroyed. + StopThread(); +} + +void ThreadedStreamProcessor::StartThread() { + if (!thread_.IsRunning()) { + thread_.Start(); + } +} + +void ThreadedStreamProcessor::StopThread() { + if (thread_.IsRunning()) { + thread_.Stop(); + } +} + +StreamProcessor::Status ThreadedStreamProcessor::ProcessBytes( + MemoryReadStream *stream, + size_t bytes_to_process) { + // Report any error on the other thread. + if (status_ == FAILURE) { + return status_; + } + + StartThread(); + + // Copy the bytes. They are deleted by the other thread when it has processed + // them. + uint8* copy = new uint8[bytes_to_process]; + stream->Read(copy, bytes_to_process); + + // Post a task to call ForwardBytes on the other thread. + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableFunction( + &ThreadedStreamProcessor::ForwardBytes, this, copy, bytes_to_process)); + + return IN_PROGRESS; +} + +void ThreadedStreamProcessor::Close(bool success) { + StartThread(); + + // Post a task to call ForwardClose on the other thread. + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableFunction( + &ThreadedStreamProcessor::ForwardClose, this, success)); +} + +void ThreadedStreamProcessor::ForwardBytes( + ThreadedStreamProcessor* processor, const uint8* bytes, size_t size) { + // Do not forward if an error has ocurred. Just delete the buffer. + if (processor->status_ == FAILURE) { + delete[] bytes; + return; + } + + // Pass bytes to the receiver. + MemoryReadStream stream(bytes, size); + processor->status_ = processor->receiver_->ProcessBytes(&stream, size); + + // Delete the buffer once the receiver is finished with them. + delete[] bytes; +} + +void ThreadedStreamProcessor::ForwardClose(ThreadedStreamProcessor* processor, + bool success) { + processor->receiver_->Close(success && (processor->status_ != FAILURE)); +} + +} // namespace o3d diff --git a/o3d/import/cross/threaded_stream_processor.h b/o3d/import/cross/threaded_stream_processor.h new file mode 100644 index 0000000..f040eb4 --- /dev/null +++ b/o3d/import/cross/threaded_stream_processor.h @@ -0,0 +1,72 @@ +/* + * Copyright 2009, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +// ThreadedStreamProcessor forwards data from one thread to +// another processor that will run on a new thread owned by the +// ThreadedStreamProcessor. + +#ifndef O3D_IMPORT_CROSS_THREADED_STREAM_PROCESSOR_H_ +#define O3D_IMPORT_CROSS_THREADED_STREAM_PROCESSOR_H_ + +#include "base/basictypes.h" +#include "base/thread.h" +#include "import/cross/memory_stream.h" + +namespace o3d { + +class ThreadedStreamProcessor : public StreamProcessor { + public: + explicit ThreadedStreamProcessor(StreamProcessor *receiver); + virtual ~ThreadedStreamProcessor(); + + virtual Status ProcessBytes(MemoryReadStream *stream, + size_t bytes_to_process); + + virtual void Close(bool success); + + void StartThread(); + void StopThread(); + + private: + static void ForwardBytes(ThreadedStreamProcessor* processor, + const uint8* data, size_t size); + + static void ForwardClose(ThreadedStreamProcessor* processor, bool success); + + StreamProcessor* receiver_; + ::base::Thread thread_; + Status status_; + + DISALLOW_COPY_AND_ASSIGN(ThreadedStreamProcessor); +}; +} // namespace o3d + +#endif // O3D_IMPORT_CROSS_THREADED_STREAM_PROCESSOR_H_ diff --git a/o3d/import/cross/threaded_stream_processor_test.cc b/o3d/import/cross/threaded_stream_processor_test.cc new file mode 100644 index 0000000..44698ad3 --- /dev/null +++ b/o3d/import/cross/threaded_stream_processor_test.cc @@ -0,0 +1,155 @@ +/* + * Copyright 2009, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "import/cross/memory_buffer.h" +#include "import/cross/threaded_stream_processor.h" +#include "tests/common/win/testing_common.h" +#include "tests/common/cross/test_utils.h" + +using test_utils::ReadFile; + +namespace o3d { + +class ThreadedStreamProcessorTest : public testing::Test { + protected: +}; + +class TestReceiver : public StreamProcessor { + public: + explicit TestReceiver(size_t size) : closed_(false), success_(false) { + buffer_.Allocate(size); + stream_.Assign(buffer_, size); + } + + virtual Status ProcessBytes(MemoryReadStream *input_stream, + size_t bytes_to_process) { + const uint8 *p = input_stream->GetDirectMemoryPointer(); + input_stream->Skip(bytes_to_process); + + size_t bytes_written = stream_.Write(p, bytes_to_process); + EXPECT_EQ(bytes_written, bytes_to_process); + + return SUCCESS; + } + + virtual void Close(bool success) { + closed_ = true; + success_ = success; + } + + uint8 *GetResultBuffer() { return buffer_; } + size_t GetResultLength() const { return stream_.GetStreamPosition(); } + + bool closed() const { + return closed_; + } + + bool success() const { + return success_; + } + + private: + MemoryBuffer<uint8> buffer_; + MemoryWriteStream stream_; + bool closed_; + bool success_; +}; + +TEST_F(ThreadedStreamProcessorTest, CanForwardZeroBuffers) { + TestReceiver receiver(1000); + ThreadedStreamProcessor processor(&receiver); + processor.Close(true); + processor.StopThread(); + + EXPECT_EQ(0, receiver.GetResultLength()); + EXPECT_TRUE(receiver.closed()); + EXPECT_TRUE(receiver.success()); +} + +TEST_F(ThreadedStreamProcessorTest, CanForwardOneBufferOfZeroBytes) { + uint8 buffer[5] = { 1, 2, 3, 4, 5 }; + TestReceiver receiver(1000); + + ThreadedStreamProcessor processor(&receiver); + MemoryReadStream stream(buffer, sizeof(buffer)); + processor.ProcessBytes(&stream, 0); + processor.Close(true); + processor.StopThread(); + + EXPECT_EQ(0, receiver.GetResultLength()); + EXPECT_TRUE(receiver.closed()); + EXPECT_TRUE(receiver.success()); +} + +TEST_F(ThreadedStreamProcessorTest, CanForwardOneBufferFullOfBytes) { + uint8 buffer[5] = { 1, 2, 3, 4, 5 }; + TestReceiver receiver(1000); + + ThreadedStreamProcessor processor(&receiver); + MemoryReadStream stream(buffer, sizeof(buffer)); + processor.ProcessBytes(&stream, sizeof(buffer)); + processor.Close(true); + processor.StopThread(); + + EXPECT_EQ(sizeof(buffer), receiver.GetResultLength()); + EXPECT_EQ(0, memcmp(buffer, receiver.GetResultBuffer(), sizeof(buffer))); + EXPECT_TRUE(receiver.closed()); + EXPECT_TRUE(receiver.success()); +} + +TEST_F(ThreadedStreamProcessorTest, CanForwardTwoBuffersFullOfBytes) { + uint8 buffer[5] = { 1, 2, 3, 4, 5 }; + TestReceiver receiver(1000); + + ThreadedStreamProcessor processor(&receiver); + MemoryReadStream stream(buffer, sizeof(buffer)); + processor.ProcessBytes(&stream, 2); + processor.ProcessBytes(&stream, 3); + processor.Close(true); + processor.StopThread(); + + EXPECT_EQ(sizeof(buffer), receiver.GetResultLength()); + EXPECT_EQ(0, memcmp(buffer, receiver.GetResultBuffer(), sizeof(buffer))); + EXPECT_TRUE(receiver.closed()); + EXPECT_TRUE(receiver.success()); +} + +TEST_F(ThreadedStreamProcessorTest, PassesThroughStreamFailed) { + TestReceiver receiver(1000); + ThreadedStreamProcessor processor(&receiver); + processor.Close(false); + processor.StopThread(); + + EXPECT_TRUE(receiver.closed()); + EXPECT_FALSE(receiver.success()); +} + +} // namespace o3d |