diff options
author | kinuko@chromium.org <kinuko@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-04-11 08:02:17 +0000 |
---|---|---|
committer | kinuko@chromium.org <kinuko@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-04-11 08:02:17 +0000 |
commit | cf02541b6dd5a26ce0f9a5664dec0fce365372d2 (patch) | |
tree | a3c55f0583fea567acc4764c8ff170c285f2f917 /net | |
parent | 5c40dce570edf4658a077faff2323e2b1e5936e7 (diff) | |
download | chromium_src-cf02541b6dd5a26ce0f9a5664dec0fce365372d2.zip chromium_src-cf02541b6dd5a26ce0f9a5664dec0fce365372d2.tar.gz chromium_src-cf02541b6dd5a26ce0f9a5664dec0fce365372d2.tar.bz2 |
Make FileStream::Seek async and add FileStream::SeekSync for sync operation
BUG=75548,113300
TEST=existing tests should pass
Review URL: https://chromiumcodereview.appspot.com/9949011
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@131732 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r-- | net/base/completion_callback.h | 4 | ||||
-rw-r--r-- | net/base/file_stream.cc | 9 | ||||
-rw-r--r-- | net/base/file_stream.h | 19 | ||||
-rw-r--r-- | net/base/file_stream_posix.cc | 153 | ||||
-rw-r--r-- | net/base/file_stream_posix.h | 20 | ||||
-rw-r--r-- | net/base/file_stream_unittest.cc | 81 | ||||
-rw-r--r-- | net/base/file_stream_win.cc | 155 | ||||
-rw-r--r-- | net/base/file_stream_win.h | 24 | ||||
-rw-r--r-- | net/base/mock_file_stream.cc | 9 | ||||
-rw-r--r-- | net/base/mock_file_stream.h | 4 | ||||
-rw-r--r-- | net/base/test_completion_callback.cc | 36 | ||||
-rw-r--r-- | net/base/test_completion_callback.h | 68 | ||||
-rw-r--r-- | net/base/upload_data.cc | 2 | ||||
-rw-r--r-- | net/disk_cache/entry_unittest.cc | 2 | ||||
-rw-r--r-- | net/http/http_cache_unittest.cc | 2 | ||||
-rw-r--r-- | net/url_request/url_request_file_job.cc | 2 |
16 files changed, 410 insertions, 180 deletions
diff --git a/net/base/completion_callback.h b/net/base/completion_callback.h index 1d75bce..a519491 100644 --- a/net/base/completion_callback.h +++ b/net/base/completion_callback.h @@ -15,6 +15,10 @@ namespace net { // used to report a byte count or network error code. typedef base::Callback<void(int)> CompletionCallback; +// 64bit version of callback specialization that takes a single int64 parameter. +// Usually this is used to report a file offset, size or network error code. +typedef base::Callback<void(int64)> Int64CompletionCallback; + typedef base::CancelableCallback<void(int)> CancelableCompletionCallback; } // namespace net diff --git a/net/base/file_stream.cc b/net/base/file_stream.cc index 818fb2e..1707fb7 100644 --- a/net/base/file_stream.cc +++ b/net/base/file_stream.cc @@ -39,8 +39,13 @@ bool FileStream::IsOpen() const { return impl_.IsOpen(); } -int64 FileStream::Seek(Whence whence, int64 offset) { - return impl_.Seek(whence, offset); +int FileStream::Seek(Whence whence, int64 offset, + const Int64CompletionCallback& callback) { + return impl_.Seek(whence, offset, callback); +} + +int64 FileStream::SeekSync(Whence whence, int64 offset) { + return impl_.SeekSync(whence, offset); } int64 FileStream::Available() { diff --git a/net/base/file_stream.h b/net/base/file_stream.h index c0db903..4b84ec0 100644 --- a/net/base/file_stream.h +++ b/net/base/file_stream.h @@ -94,11 +94,20 @@ class NET_EXPORT FileStream { // Returns true if Open succeeded and Close has not been called. virtual bool IsOpen() const; - // Adjust the position from where data is read. Upon success, the stream - // position relative to the start of the file is returned. Otherwise, an - // error code is returned. It is not valid to call Seek while a Read call - // has a pending completion. - virtual int64 Seek(Whence whence, int64 offset); + // Adjust the position from where data is read asynchronously. + // Upon success, ERR_IO_PENDING is returned and |callback| will be run + // on the thread where Seek() was called with the the stream position + // relative to the start of the file. Otherwise, an error code is returned. + // It is invalid to request any asynchronous operations while there is an + // in-flight asynchronous operation. + virtual int Seek(Whence whence, int64 offset, + const Int64CompletionCallback& callback); + + // Adjust the position from where data is read synchronously. + // Upon success, the stream position relative to the start of the file is + // returned. Otherwise, an error code is returned. It is not valid to + // call SeekSync while a Read call has a pending completion. + virtual int64 SeekSync(Whence whence, int64 offset); // Returns the number of bytes available to read from the current stream // position until the end of the file. Otherwise, an error code is returned. diff --git a/net/base/file_stream_posix.cc b/net/base/file_stream_posix.cc index 6c9b135..0fe801d 100644 --- a/net/base/file_stream_posix.cc +++ b/net/base/file_stream_posix.cc @@ -125,6 +125,37 @@ void CloseFileAndSignal(base::PlatformFile* file, on_io_complete->Signal(); } +// Adjusts the position from where the data is read. +void SeekFile(base::PlatformFile file, + Whence whence, + int64 offset, + int64* result, + bool record_uma, + const net::BoundNetLog& bound_net_log) { + off_t res = lseek(file, static_cast<off_t>(offset), + static_cast<int>(whence)); + if (res == static_cast<off_t>(-1)) { + *result = RecordAndMapError(errno, + FILE_ERROR_SOURCE_SEEK, + record_uma, + bound_net_log); + return; + } + *result = res; +} + +// Seeks a file by calling SeekSync() and signals the completion. +void SeekFileAndSignal(base::PlatformFile file, + Whence whence, + int64 offset, + int64* result, + bool record_uma, + base::WaitableEvent* on_io_complete, + const net::BoundNetLog& bound_net_log) { + SeekFile(file, whence, offset, result, record_uma, bound_net_log); + on_io_complete->Signal(); +} + // ReadFile() is a simple wrapper around read() that handles EINTR signals and // calls MapSystemError() to map errno to net error codes. void ReadFile(base::PlatformFile file, @@ -203,6 +234,20 @@ int FlushFile(base::PlatformFile file, return res; } +// Called when Read(), Write() or Seek() is completed. +// |result| contains the result or a network error code. +template <typename R> +void OnIOComplete(const base::WeakPtr<FileStreamPosix>& stream, + const base::Callback<void(R)>& callback, + R* result) { + if (!stream.get()) + return; + + // Reset this before Run() as Run() may issue a new async operation. + stream->ResetOnIOComplete(); + callback.Run(*result); +} + } // namespace // FileStreamPosix ------------------------------------------------------------ @@ -257,10 +302,8 @@ FileStreamPosix::~FileStreamPosix() { void FileStreamPosix::Close(const CompletionCallback& callback) { DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC); - DCHECK(callback_.is_null()); - - callback_ = callback; + DCHECK(!weak_ptr_factory_.HasWeakPtrs()); DCHECK(!on_io_complete_.get()); on_io_complete_.reset(new base::WaitableEvent( false /* manual_reset */, false /* initially_signaled */)); @@ -273,7 +316,8 @@ void FileStreamPosix::Close(const CompletionCallback& callback) { base::Bind(&CloseFileAndSignal, &file_, on_io_complete_.get(), bound_net_log_), base::Bind(&FileStreamPosix::OnClosed, - weak_ptr_factory_.GetWeakPtr()), + weak_ptr_factory_.GetWeakPtr(), + callback), true /* task_is_slow */); DCHECK(posted); @@ -301,12 +345,10 @@ int FileStreamPosix::Open(const FilePath& path, int open_flags, return ERR_UNEXPECTED; } - DCHECK(callback_.is_null()); - callback_ = callback; - open_flags_ = open_flags; DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC); + DCHECK(!weak_ptr_factory_.HasWeakPtrs()); DCHECK(!on_io_complete_.get()); on_io_complete_.reset(new base::WaitableEvent( false /* manual_reset */, false /* initially_signaled */)); @@ -320,9 +362,8 @@ int FileStreamPosix::Open(const FilePath& path, int open_flags, base::Bind(&OpenFileAndSignal, path, open_flags, record_uma_, &file_, result, on_io_complete_.get(), bound_net_log_), - base::Bind(&FileStreamPosix::OnIOComplete, - weak_ptr_factory_.GetWeakPtr(), - base::Owned(result)), + base::Bind(&OnIOComplete<int>, weak_ptr_factory_.GetWeakPtr(), + callback, base::Owned(result)), true /* task_is_slow */); DCHECK(posted); return ERR_IO_PENDING; @@ -349,7 +390,33 @@ bool FileStreamPosix::IsOpen() const { return file_ != base::kInvalidPlatformFileValue; } -int64 FileStreamPosix::Seek(Whence whence, int64 offset) { +int FileStreamPosix::Seek(Whence whence, int64 offset, + const Int64CompletionCallback& callback) { + if (!IsOpen()) + return ERR_UNEXPECTED; + + // Make sure we're async and we have no other in-flight async operations. + DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC); + DCHECK(!weak_ptr_factory_.HasWeakPtrs()); + DCHECK(!on_io_complete_.get()); + + on_io_complete_.reset(new base::WaitableEvent( + false /* manual_reset */, false /* initially_signaled */)); + + int64* result = new int64(-1); + const bool posted = base::WorkerPool::PostTaskAndReply( + FROM_HERE, + base::Bind(&SeekFileAndSignal, file_, whence, offset, result, + record_uma_, on_io_complete_.get(), bound_net_log_), + base::Bind(&OnIOComplete<int64>, + weak_ptr_factory_.GetWeakPtr(), + callback, base::Owned(result)), + true /* task is slow */); + DCHECK(posted); + return ERR_IO_PENDING; +} + +int64 FileStreamPosix::SeekSync(Whence whence, int64 offset) { base::ThreadRestrictions::AssertIOAllowed(); if (!IsOpen()) @@ -357,18 +424,11 @@ int64 FileStreamPosix::Seek(Whence whence, int64 offset) { // If we're in async, make sure we don't have a request in flight. DCHECK(!(open_flags_ & base::PLATFORM_FILE_ASYNC) || - callback_.is_null()); - - off_t res = lseek(file_, static_cast<off_t>(offset), - static_cast<int>(whence)); - if (res == static_cast<off_t>(-1)) { - return RecordAndMapError(errno, - FILE_ERROR_SOURCE_SEEK, - record_uma_, - bound_net_log_); - } + !on_io_complete_.get()); - return res; + off_t result = -1; + SeekFile(file_, whence, offset, &result, record_uma_, bound_net_log_); + return result; } int64 FileStreamPosix::Available() { @@ -377,7 +437,7 @@ int64 FileStreamPosix::Available() { if (!IsOpen()) return ERR_UNEXPECTED; - int64 cur_pos = Seek(FROM_CURRENT, 0); + int64 cur_pos = SeekSync(FROM_CURRENT, 0); if (cur_pos < 0) return cur_pos; @@ -404,22 +464,23 @@ int FileStreamPosix::Read( DCHECK_GT(buf_len, 0); DCHECK(open_flags_ & base::PLATFORM_FILE_READ); DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC); - // Make sure we don't have a request in flight. - DCHECK(callback_.is_null()); - callback_ = callback; - int* result = new int(OK); - scoped_refptr<IOBuffer> buf = in_buf; + // Make sure we don't have a request in flight. + DCHECK(!weak_ptr_factory_.HasWeakPtrs()); DCHECK(!on_io_complete_.get()); + on_io_complete_.reset(new base::WaitableEvent( false /* manual_reset */, false /* initially_signaled */)); + + int* result = new int(OK); + scoped_refptr<IOBuffer> buf = in_buf; const bool posted = base::WorkerPool::PostTaskAndReply( FROM_HERE, base::Bind(&ReadFileAndSignal, file_, buf, buf_len, record_uma_, result, on_io_complete_.get(), bound_net_log_), - base::Bind(&FileStreamPosix::OnIOComplete, + base::Bind(&OnIOComplete<int>, weak_ptr_factory_.GetWeakPtr(), - base::Owned(result)), + callback, base::Owned(result)), true /* task is slow */); DCHECK(posted); return ERR_IO_PENDING; @@ -469,22 +530,22 @@ int FileStreamPosix::Write( DCHECK(open_flags_ & base::PLATFORM_FILE_WRITE); // write(..., 0) will return 0, which indicates end-of-file. DCHECK_GT(buf_len, 0); - // Make sure we don't have a request in flight. - DCHECK(callback_.is_null()); - callback_ = callback; - int* result = new int(OK); - scoped_refptr<IOBuffer> buf = in_buf; + // Make sure we don't have a request in flight. + DCHECK(!weak_ptr_factory_.HasWeakPtrs()); DCHECK(!on_io_complete_.get()); on_io_complete_.reset(new base::WaitableEvent( false /* manual_reset */, false /* initially_signaled */)); + + int* result = new int(OK); + scoped_refptr<IOBuffer> buf = in_buf; const bool posted = base::WorkerPool::PostTaskAndReply( FROM_HERE, base::Bind(&WriteFileAndSignal, file_, buf, buf_len, record_uma_, result, on_io_complete_.get(), bound_net_log_), - base::Bind(&FileStreamPosix::OnIOComplete, + base::Bind(&OnIOComplete<int>, weak_ptr_factory_.GetWeakPtr(), - base::Owned(result)), + callback, base::Owned(result)), true /* task is slow */); DCHECK(posted); return ERR_IO_PENDING; @@ -515,7 +576,7 @@ int64 FileStreamPosix::Truncate(int64 bytes) { DCHECK(open_flags_ & base::PLATFORM_FILE_WRITE); // Seek to the position to truncate from. - int64 seek_position = Seek(FROM_BEGIN, bytes); + int64 seek_position = SeekSync(FROM_BEGIN, bytes); if (seek_position != bytes) return ERR_UNEXPECTED; @@ -569,19 +630,17 @@ base::PlatformFile FileStreamPosix::GetPlatformFileForTesting() { return file_; } -void FileStreamPosix::OnClosed() { - file_ = base::kInvalidPlatformFileValue; - int result = OK; - OnIOComplete(&result); +void FileStreamPosix::ResetOnIOComplete() { + on_io_complete_.reset(); + weak_ptr_factory_.InvalidateWeakPtrs(); } -void FileStreamPosix::OnIOComplete(int* result) { - CompletionCallback temp_callback = callback_; - callback_.Reset(); +void FileStreamPosix::OnClosed(const CompletionCallback& callback) { + file_ = base::kInvalidPlatformFileValue; // Reset this before Run() as Run() may issue a new async operation. - on_io_complete_.reset(); - temp_callback.Run(*result); + ResetOnIOComplete(); + callback.Run(OK); } void FileStreamPosix::WaitForIOCompletion() { diff --git a/net/base/file_stream_posix.h b/net/base/file_stream_posix.h index a8b919f..8187bad 100644 --- a/net/base/file_stream_posix.h +++ b/net/base/file_stream_posix.h @@ -10,7 +10,6 @@ #include "base/memory/scoped_ptr.h" #include "base/memory/weak_ptr.h" -#include "base/synchronization/waitable_event.h" #include "base/platform_file.h" #include "net/base/completion_callback.h" #include "net/base/file_stream_whence.h" @@ -19,6 +18,10 @@ class FilePath; +namespace base { +class WaitableEvent; +} + namespace net { class IOBuffer; @@ -36,7 +39,9 @@ class NET_EXPORT FileStreamPosix { const CompletionCallback& callback); int OpenSync(const FilePath& path, int open_flags); bool IsOpen() const; - int64 Seek(Whence whence, int64 offset); + int Seek(Whence whence, int64 offset, + const Int64CompletionCallback& callback); + int64 SeekSync(Whence whence, int64 offset); int64 Available(); int Read(IOBuffer* buf, int buf_len, const CompletionCallback& callback); int ReadSync(char* buf, int buf_len); @@ -50,13 +55,13 @@ class NET_EXPORT FileStreamPosix { const net::BoundNetLog& owner_bound_net_log); base::PlatformFile GetPlatformFileForTesting(); + // Resets on_io_complete_ and WeakPtr's. + // Called when Read() or Write() is completed. + void ResetOnIOComplete(); + private: // Called when the file_ is closed asynchronously. - void OnClosed(); - - // Called when Read() or Write() is completed (used only for POSIX). - // |result| contains the result as a network error code. - void OnIOComplete(int* result); + void OnClosed(const CompletionCallback& callback); // Waits until the in-flight async open/close/read/write operation is // complete. @@ -68,7 +73,6 @@ class NET_EXPORT FileStreamPosix { bool record_uma_; net::BoundNetLog bound_net_log_; base::WeakPtrFactory<FileStreamPosix> weak_ptr_factory_; - CompletionCallback callback_; scoped_ptr<base::WaitableEvent> on_io_complete_; DISALLOW_COPY_AND_ASSIGN(FileStreamPosix); diff --git a/net/base/file_stream_unittest.cc b/net/base/file_stream_unittest.cc index b55e22e..d3853ca 100644 --- a/net/base/file_stream_unittest.cc +++ b/net/base/file_stream_unittest.cc @@ -155,7 +155,7 @@ TEST_F(FileStreamTest, UseFileHandle) { // Seek to the beginning of the file and read. FileStream read_stream(file, flags, NULL); - ASSERT_EQ(0, read_stream.Seek(FROM_BEGIN, 0)); + ASSERT_EQ(0, read_stream.SeekSync(FROM_BEGIN, 0)); ASSERT_EQ(kTestDataSize, read_stream.Available()); // Read into buffer and compare. char buffer[kTestDataSize]; @@ -170,7 +170,7 @@ TEST_F(FileStreamTest, UseFileHandle) { file = base::CreatePlatformFile(temp_file_path(), flags, &created, NULL); FileStream write_stream(file, flags, NULL); - ASSERT_EQ(0, write_stream.Seek(FROM_BEGIN, 0)); + ASSERT_EQ(0, write_stream.SeekSync(FROM_BEGIN, 0)); ASSERT_EQ(kTestDataSize, write_stream.WriteSync(kTestData, kTestDataSize)); write_stream.CloseSync(); @@ -187,7 +187,7 @@ TEST_F(FileStreamTest, UseClosedStream) { EXPECT_FALSE(stream.IsOpen()); // Try seeking... - int64 new_offset = stream.Seek(FROM_BEGIN, 5); + int64 new_offset = stream.SeekSync(FROM_BEGIN, 5); EXPECT_EQ(ERR_UNEXPECTED, new_offset); // Try available... @@ -380,7 +380,7 @@ TEST_F(FileStreamTest, BasicRead_FromOffset) { EXPECT_EQ(OK, rv); const int64 kOffset = 3; - int64 new_offset = stream.Seek(FROM_BEGIN, kOffset); + int64 new_offset = stream.SeekSync(FROM_BEGIN, kOffset); EXPECT_EQ(kOffset, new_offset); int64 total_bytes_avail = stream.Available(); @@ -415,8 +415,11 @@ TEST_F(FileStreamTest, AsyncRead_FromOffset) { int rv = stream.OpenSync(temp_file_path(), flags); EXPECT_EQ(OK, rv); + TestInt64CompletionCallback callback64; const int64 kOffset = 3; - int64 new_offset = stream.Seek(FROM_BEGIN, kOffset); + rv = stream.Seek(FROM_BEGIN, kOffset, callback64.callback()); + ASSERT_EQ(ERR_IO_PENDING, rv); + int64 new_offset = callback64.WaitForResult(); EXPECT_EQ(kOffset, new_offset); int64 total_bytes_avail = stream.Available(); @@ -450,18 +453,52 @@ TEST_F(FileStreamTest, SeekAround) { EXPECT_EQ(OK, rv); const int64 kOffset = 3; - int64 new_offset = stream.Seek(FROM_BEGIN, kOffset); + int64 new_offset = stream.SeekSync(FROM_BEGIN, kOffset); EXPECT_EQ(kOffset, new_offset); - new_offset = stream.Seek(FROM_CURRENT, kOffset); + new_offset = stream.SeekSync(FROM_CURRENT, kOffset); EXPECT_EQ(2 * kOffset, new_offset); - new_offset = stream.Seek(FROM_CURRENT, -kOffset); + new_offset = stream.SeekSync(FROM_CURRENT, -kOffset); EXPECT_EQ(kOffset, new_offset); const int kTestDataLen = arraysize(kTestData) - 1; - new_offset = stream.Seek(FROM_END, -kTestDataLen); + new_offset = stream.SeekSync(FROM_END, -kTestDataLen); + EXPECT_EQ(0, new_offset); +} + +TEST_F(FileStreamTest, AsyncSeekAround) { + FileStream stream(NULL); + int flags = base::PLATFORM_FILE_OPEN | + base::PLATFORM_FILE_ASYNC | + base::PLATFORM_FILE_READ; + int rv = stream.OpenSync(temp_file_path(), flags); + EXPECT_EQ(OK, rv); + + TestInt64CompletionCallback callback; + + const int64 kOffset = 3; + rv = stream.Seek(FROM_BEGIN, kOffset, callback.callback()); + ASSERT_EQ(ERR_IO_PENDING, rv); + int64 new_offset = callback.WaitForResult(); + EXPECT_EQ(kOffset, new_offset); + + rv = stream.Seek(FROM_CURRENT, kOffset, callback.callback()); + ASSERT_EQ(ERR_IO_PENDING, rv); + new_offset = callback.WaitForResult(); + EXPECT_EQ(2 * kOffset, new_offset); + + rv = stream.Seek(FROM_CURRENT, -kOffset, callback.callback()); + ASSERT_EQ(ERR_IO_PENDING, rv); + new_offset = callback.WaitForResult(); + EXPECT_EQ(kOffset, new_offset); + + const int kTestDataLen = arraysize(kTestData) - 1; + + rv = stream.Seek(FROM_END, -kTestDataLen, callback.callback()); + ASSERT_EQ(ERR_IO_PENDING, rv); + new_offset = callback.WaitForResult(); EXPECT_EQ(0, new_offset); } @@ -564,7 +601,7 @@ TEST_F(FileStreamTest, BasicWrite_FromOffset) { EXPECT_EQ(kTestDataSize, file_size); const int64 kOffset = 0; - int64 new_offset = stream.Seek(FROM_END, kOffset); + int64 new_offset = stream.SeekSync(FROM_END, kOffset); EXPECT_EQ(kTestDataSize, new_offset); rv = stream.WriteSync(kTestData, kTestDataSize); @@ -588,8 +625,11 @@ TEST_F(FileStreamTest, AsyncWrite_FromOffset) { int rv = stream.OpenSync(temp_file_path(), flags); EXPECT_EQ(OK, rv); + TestInt64CompletionCallback callback64; const int64 kOffset = 0; - int64 new_offset = stream.Seek(FROM_END, kOffset); + rv = stream.Seek(FROM_END, kOffset, callback64.callback()); + ASSERT_EQ(ERR_IO_PENDING, rv); + int64 new_offset = callback64.WaitForResult(); EXPECT_EQ(kTestDataSize, new_offset); TestCompletionCallback callback; @@ -668,13 +708,13 @@ TEST_F(FileStreamTest, BasicWriteRead) { int64 total_bytes_avail = stream.Available(); EXPECT_EQ(file_size, total_bytes_avail); - int64 offset = stream.Seek(FROM_END, 0); + int64 offset = stream.SeekSync(FROM_END, 0); EXPECT_EQ(offset, file_size); rv = stream.WriteSync(kTestData, kTestDataSize); EXPECT_EQ(kTestDataSize, rv); - offset = stream.Seek(FROM_BEGIN, 0); + offset = stream.SeekSync(FROM_BEGIN, 0); EXPECT_EQ(0, offset); int64 total_bytes_read = 0; @@ -775,7 +815,10 @@ TEST_F(FileStreamTest, BasicAsyncWriteRead) { int64 total_bytes_avail = stream.Available(); EXPECT_EQ(file_size, total_bytes_avail); - int64 offset = stream.Seek(FROM_END, 0); + TestInt64CompletionCallback callback64; + rv = stream.Seek(FROM_END, 0, callback64.callback()); + ASSERT_EQ(ERR_IO_PENDING, rv); + int64 offset = callback64.WaitForResult(); EXPECT_EQ(offset, file_size); TestCompletionCallback callback; @@ -798,7 +841,9 @@ TEST_F(FileStreamTest, BasicAsyncWriteRead) { EXPECT_EQ(kTestDataSize, total_bytes_written); - offset = stream.Seek(FROM_BEGIN, 0); + rv = stream.Seek(FROM_BEGIN, 0, callback64.callback()); + ASSERT_EQ(ERR_IO_PENDING, rv); + offset = callback64.WaitForResult(); EXPECT_EQ(0, offset); int total_bytes_read = 0; @@ -882,7 +927,7 @@ class TestWriteReadCompletionCallback { *total_bytes_read_ += total_bytes_read; *data_read_ += data_read; } else { // We're done writing all data. Start reading the data. - stream_->Seek(FROM_BEGIN, 0); + stream_->SeekSync(FROM_BEGIN, 0); TestCompletionCallback callback; for (;;) { @@ -936,7 +981,7 @@ TEST_F(FileStreamTest, AsyncWriteRead) { int64 total_bytes_avail = stream.Available(); EXPECT_EQ(file_size, total_bytes_avail); - int64 offset = stream.Seek(FROM_END, 0); + int64 offset = stream.SeekSync(FROM_END, 0); EXPECT_EQ(offset, file_size); int total_bytes_written = 0; @@ -1046,7 +1091,7 @@ TEST_F(FileStreamTest, AsyncWriteClose) { int64 total_bytes_avail = stream.Available(); EXPECT_EQ(file_size, total_bytes_avail); - int64 offset = stream.Seek(FROM_END, 0); + int64 offset = stream.SeekSync(FROM_END, 0); EXPECT_EQ(offset, file_size); int total_bytes_written = 0; diff --git a/net/base/file_stream_win.cc b/net/base/file_stream_win.cc index 713e488..97b1975 100644 --- a/net/base/file_stream_win.cc +++ b/net/base/file_stream_win.cc @@ -10,6 +10,7 @@ #include "base/logging.h" #include "base/message_loop.h" #include "base/metrics/histogram.h" +#include "base/synchronization/waitable_event.h" #include "base/threading/thread_restrictions.h" #include "base/threading/worker_pool.h" #include "net/base/file_stream_metrics.h" @@ -24,12 +25,14 @@ COMPILE_ASSERT(FROM_BEGIN == FILE_BEGIN, bad_whence_begin); COMPILE_ASSERT(FROM_CURRENT == FILE_CURRENT, bad_whence_current); COMPILE_ASSERT(FROM_END == FILE_END, bad_whence_end); -static void SetOffset(OVERLAPPED* overlapped, const LARGE_INTEGER& offset) { +namespace { + +void SetOffset(OVERLAPPED* overlapped, const LARGE_INTEGER& offset) { overlapped->Offset = offset.LowPart; overlapped->OffsetHigh = offset.HighPart; } -static void IncrementOffset(OVERLAPPED* overlapped, DWORD count) { +void IncrementOffset(OVERLAPPED* overlapped, DWORD count) { LARGE_INTEGER offset; offset.LowPart = overlapped->Offset; offset.HighPart = overlapped->OffsetHigh; @@ -37,8 +40,6 @@ static void IncrementOffset(OVERLAPPED* overlapped, DWORD count) { SetOffset(overlapped, offset); } -namespace { - int RecordAndMapError(int error, FileErrorSource source, bool record_uma, @@ -84,18 +85,6 @@ void OpenFile(const FilePath& path, } } -// Opens a file using OpenFile() and signals the completion. -void OpenFileAndSignal(const FilePath& path, - int open_flags, - bool record_uma, - base::PlatformFile* file, - int* result, - base::WaitableEvent* on_io_complete, - const net::BoundNetLog& bound_net_log) { - OpenFile(path, open_flags, record_uma, file, result, bound_net_log); - on_io_complete->Signal(); -} - // Closes a file with some network logging. void CloseFile(base::PlatformFile file, const net::BoundNetLog& bound_net_log) { @@ -119,6 +108,13 @@ void CloseFileAndSignal(base::PlatformFile* file, on_io_complete->Signal(); } +// Invokes a given closure and signals the completion. +void InvokeAndSignal(const base::Closure& closure, + base::WaitableEvent* on_io_complete) { + closure.Run(); + on_io_complete->Signal(); +} + } // namespace // FileStreamWin::AsyncContext ---------------------------------------------- @@ -270,11 +266,8 @@ FileStreamWin::~FileStreamWin() { } void FileStreamWin::Close(const CompletionCallback& callback) { - DCHECK(callback_.is_null()); - callback_ = callback; - DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC); - + DCHECK(!weak_ptr_factory_.HasWeakPtrs()); DCHECK(!on_io_complete_.get()); on_io_complete_.reset(new base::WaitableEvent( false /* manual_reset */, false /* initially_signaled */)); @@ -286,7 +279,9 @@ void FileStreamWin::Close(const CompletionCallback& callback) { FROM_HERE, base::Bind(&CloseFileAndSignal, &file_, on_io_complete_.get(), bound_net_log_), - base::Bind(&FileStreamWin::OnClosed, weak_ptr_factory_.GetWeakPtr()), + base::Bind(&FileStreamWin::OnClosed, + weak_ptr_factory_.GetWeakPtr(), + callback), true /* task_is_slow */); DCHECK(posted); } @@ -323,12 +318,9 @@ int FileStreamWin::Open(const FilePath& path, int open_flags, return ERR_UNEXPECTED; } - DCHECK(callback_.is_null()); - callback_ = callback; - open_flags_ = open_flags; DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC); - + DCHECK(!weak_ptr_factory_.HasWeakPtrs()); DCHECK(!on_io_complete_.get()); on_io_complete_.reset(new base::WaitableEvent( false /* manual_reset */, false /* initially_signaled */)); @@ -339,12 +331,13 @@ int FileStreamWin::Open(const FilePath& path, int open_flags, int* result = new int(OK); const bool posted = base::WorkerPool::PostTaskAndReply( FROM_HERE, - base::Bind(&OpenFileAndSignal, - path, open_flags, record_uma_, &file_, result, - on_io_complete_.get(), bound_net_log_), + base::Bind(&InvokeAndSignal, + base::Bind(&OpenFile, path, open_flags, record_uma_, &file_, + result, bound_net_log_), + on_io_complete_.get()), base::Bind(&FileStreamWin::OnOpened, weak_ptr_factory_.GetWeakPtr(), - base::Owned(result)), + callback, base::Owned(result)), true /* task_is_slow */); DCHECK(posted); return ERR_IO_PENDING; @@ -380,28 +373,44 @@ bool FileStreamWin::IsOpen() const { return file_ != base::kInvalidPlatformFileValue; } -int64 FileStreamWin::Seek(Whence whence, int64 offset) { +int FileStreamWin::Seek(Whence whence, int64 offset, + const Int64CompletionCallback& callback) { if (!IsOpen()) return ERR_UNEXPECTED; - DCHECK(!async_context_.get() || async_context_->callback().is_null()); + // Make sure we're async and we have no other in-flight async operations. + DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC); + DCHECK(!weak_ptr_factory_.HasWeakPtrs()); + DCHECK(!on_io_complete_.get()); - LARGE_INTEGER distance, result; - distance.QuadPart = offset; - DWORD move_method = static_cast<DWORD>(whence); - if (!SetFilePointerEx(file_, distance, &result, move_method)) { - DWORD error = GetLastError(); - LOG(WARNING) << "SetFilePointerEx failed: " << error; - return RecordAndMapError(error, - FILE_ERROR_SOURCE_SEEK, - record_uma_, - bound_net_log_); - } - if (async_context_.get()) { - async_context_->set_error_source(FILE_ERROR_SOURCE_SEEK); - SetOffset(async_context_->overlapped(), result); - } - return result.QuadPart; + int64* result = new int64(-1); + on_io_complete_.reset(new base::WaitableEvent( + false /* manual_reset */, false /* initially_signaled */)); + + const bool posted = base::WorkerPool::PostTaskAndReply( + FROM_HERE, + base::Bind(&InvokeAndSignal, + // Unretained should be fine as we wait for a signal on + // on_io_complete_ at the destructor. + base::Bind(&FileStreamWin::SeekFile, base::Unretained(this), + whence, offset, result), + on_io_complete_.get()), + base::Bind(&FileStreamWin::OnSeeked, + weak_ptr_factory_.GetWeakPtr(), + callback, base::Owned(result)), + true /* task is slow */); + DCHECK(posted); + return ERR_IO_PENDING; +} + +int64 FileStreamWin::SeekSync(Whence whence, int64 offset) { + if (!IsOpen()) + return ERR_UNEXPECTED; + + DCHECK(!async_context_.get() || async_context_->callback().is_null()); + int64 result = -1; + SeekFile(whence, offset, &result); + return result; } int64 FileStreamWin::Available() { @@ -410,7 +419,7 @@ int64 FileStreamWin::Available() { if (!IsOpen()) return ERR_UNEXPECTED; - int64 cur_pos = Seek(FROM_CURRENT, 0); + int64 cur_pos = SeekSync(FROM_CURRENT, 0); if (cur_pos < 0) return cur_pos; @@ -608,7 +617,7 @@ int64 FileStreamWin::Truncate(int64 bytes) { DCHECK(open_flags_ & base::PLATFORM_FILE_WRITE); // Seek to the position to truncate from. - int64 seek_position = Seek(FROM_BEGIN, bytes); + int64 seek_position = SeekSync(FROM_BEGIN, bytes); if (seek_position != bytes) return ERR_UNEXPECTED; @@ -662,19 +671,35 @@ base::PlatformFile FileStreamWin::GetPlatformFileForTesting() { return file_; } -void FileStreamWin::OnClosed() { +void FileStreamWin::OnClosed(const CompletionCallback& callback) { file_ = base::kInvalidPlatformFileValue; - CompletionCallback temp = callback_; - callback_.Reset(); + // Reset this before Run() as Run() may issue a new async operation. + ResetOnIOComplete(); + callback.Run(OK); +} - // Reset this before Run(). Run() should not issue a new async operation - // here, but just to keep it consistent with OnOpened(). - on_io_complete_.reset(); - temp.Run(OK); +void FileStreamWin::SeekFile(Whence whence, int64 offset, int64* result) { + LARGE_INTEGER distance, res; + distance.QuadPart = offset; + DWORD move_method = static_cast<DWORD>(whence); + if (!SetFilePointerEx(file_, distance, &res, move_method)) { + DWORD error = GetLastError(); + LOG(WARNING) << "SetFilePointerEx failed: " << error; + *result = RecordAndMapError(error, + FILE_ERROR_SOURCE_SEEK, + record_uma_, + bound_net_log_); + return; + } + if (async_context_.get()) { + async_context_->set_error_source(FILE_ERROR_SOURCE_SEEK); + SetOffset(async_context_->overlapped(), res); + } + *result = res.QuadPart; } -void FileStreamWin::OnOpened(int* result) { +void FileStreamWin::OnOpened(const CompletionCallback& callback, int* result) { if (*result == OK) { async_context_.reset(new AsyncContext(bound_net_log_)); if (record_uma_) @@ -683,12 +708,22 @@ void FileStreamWin::OnOpened(int* result) { async_context_.get()); } - CompletionCallback temp = callback_; - callback_.Reset(); + // Reset this before Run() as Run() may issue a new async operation. + ResetOnIOComplete(); + callback.Run(*result); +} +void FileStreamWin::OnSeeked( + const Int64CompletionCallback& callback, + int64* result) { // Reset this before Run() as Run() may issue a new async operation. + ResetOnIOComplete(); + callback.Run(*result); +} + +void FileStreamWin::ResetOnIOComplete() { on_io_complete_.reset(); - temp.Run(*result); + weak_ptr_factory_.InvalidateWeakPtrs(); } void FileStreamWin::WaitForIOCompletion() { diff --git a/net/base/file_stream_win.h b/net/base/file_stream_win.h index 5b4dd7e..8ccf5f2 100644 --- a/net/base/file_stream_win.h +++ b/net/base/file_stream_win.h @@ -19,6 +19,10 @@ class FilePath; +namespace base { +class WaitableEvent; +} + namespace net { class IOBuffer; @@ -36,7 +40,9 @@ class NET_EXPORT FileStreamWin { const CompletionCallback& callback); int OpenSync(const FilePath& path, int open_flags); bool IsOpen() const; - int64 Seek(Whence whence, int64 offset); + int Seek(Whence whence, int64 offset, + const Int64CompletionCallback& callback); + int64 SeekSync(Whence whence, int64 offset); int64 Available(); int Read(IOBuffer* buf, int buf_len, const CompletionCallback& callback); int ReadSync(char* buf, int buf_len); @@ -51,14 +57,23 @@ class NET_EXPORT FileStreamWin { private: class AsyncContext; - friend class AsyncContext; + + // A helper method for Seek. + void SeekFile(Whence whence, int64 offset, int64* result); // Called when the file_ is opened asynchronously. |result| contains the // result as a network error code. - void OnOpened(int* result); + void OnOpened(const CompletionCallback& callback, int* result); // Called when the file_ is closed asynchronously. - void OnClosed(); + void OnClosed(const CompletionCallback& callback); + + // Called when the file_ is seeked asynchronously. + void OnSeeked(const Int64CompletionCallback& callback, int64* result); + + // Resets on_io_complete_ and WeakPtr's. + // Called in OnOpened, OnClosed and OnSeeked. + void ResetOnIOComplete(); // Waits until the in-flight async open/close operation is complete. void WaitForIOCompletion(); @@ -73,7 +88,6 @@ class NET_EXPORT FileStreamWin { bool record_uma_; net::BoundNetLog bound_net_log_; base::WeakPtrFactory<FileStreamWin> weak_ptr_factory_; - CompletionCallback callback_; scoped_ptr<base::WaitableEvent> on_io_complete_; DISALLOW_COPY_AND_ASSIGN(FileStreamWin); diff --git a/net/base/mock_file_stream.cc b/net/base/mock_file_stream.cc index 419f65f..42c23cb 100644 --- a/net/base/mock_file_stream.cc +++ b/net/base/mock_file_stream.cc @@ -13,8 +13,13 @@ int MockFileStream::OpenSync(const FilePath& path, int open_flags) { return ReturnError(FileStream::OpenSync(path, open_flags)); } -int64 MockFileStream::Seek(Whence whence, int64 offset) { - return ReturnError64(FileStream::Seek(whence, offset)); +int MockFileStream::Seek(Whence whence, int64 offset, + const Int64CompletionCallback& callback) { + return ReturnError(FileStream::Seek(whence, offset, callback)); +} + +int64 MockFileStream::SeekSync(Whence whence, int64 offset) { + return ReturnError64(FileStream::SeekSync(whence, offset)); } int64 MockFileStream::Available() { diff --git a/net/base/mock_file_stream.h b/net/base/mock_file_stream.h index 61c9db8..020dc9f 100644 --- a/net/base/mock_file_stream.h +++ b/net/base/mock_file_stream.h @@ -30,7 +30,9 @@ class MockFileStream : public net::FileStream { // FileStream methods. virtual int OpenSync(const FilePath& path, int open_flags) OVERRIDE; - virtual int64 Seek(net::Whence whence, int64 offset) OVERRIDE; + virtual int Seek(net::Whence whence, int64 offset, + const Int64CompletionCallback& callback) OVERRIDE; + virtual int64 SeekSync(net::Whence whence, int64 offset) OVERRIDE; virtual int64 Available() OVERRIDE; virtual int Read(IOBuffer* buf, int buf_len, diff --git a/net/base/test_completion_callback.cc b/net/base/test_completion_callback.cc index d68678b..c226478 100644 --- a/net/base/test_completion_callback.cc +++ b/net/base/test_completion_callback.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -8,42 +8,33 @@ #include "base/bind_helpers.h" #include "base/compiler_specific.h" #include "base/message_loop.h" -#include "net/base/net_errors.h" -void TestCompletionCallbackBase::SetResult(int result) { - result_ = result; +namespace net { + +namespace internal { + +void TestCompletionCallbackBaseInternal::DidSetResult() { have_result_ = true; if (waiting_for_result_) MessageLoop::current()->Quit(); } -int TestCompletionCallbackBase::WaitForResult() { +void TestCompletionCallbackBaseInternal::WaitForResult() { DCHECK(!waiting_for_result_); - while (!have_result_) { waiting_for_result_ = true; MessageLoop::current()->Run(); waiting_for_result_ = false; } - have_result_ = false; // Auto-reset for next callback. - return result_; } -int TestCompletionCallbackBase::GetResult(int result) { - if (net::ERR_IO_PENDING != result) - return result; - - return WaitForResult(); -} - -TestCompletionCallbackBase::TestCompletionCallbackBase() - : result_(0), - have_result_(false), +TestCompletionCallbackBaseInternal::TestCompletionCallbackBaseInternal() + : have_result_(false), waiting_for_result_(false) { } -namespace net { +} // namespace internal TestCompletionCallback::TestCompletionCallback() : ALLOW_THIS_IN_INITIALIZER_LIST(callback_( @@ -53,5 +44,12 @@ TestCompletionCallback::TestCompletionCallback() TestCompletionCallback::~TestCompletionCallback() {} +TestInt64CompletionCallback::TestInt64CompletionCallback() + : ALLOW_THIS_IN_INITIALIZER_LIST(callback_( + base::Bind(&TestInt64CompletionCallback::SetResult, + base::Unretained(this)))) { +} + +TestInt64CompletionCallback::~TestInt64CompletionCallback() {} } // namespace net diff --git a/net/base/test_completion_callback.h b/net/base/test_completion_callback.h index 1519e10..25c9676 100644 --- a/net/base/test_completion_callback.h +++ b/net/base/test_completion_callback.h @@ -9,6 +9,7 @@ #include "base/compiler_specific.h" #include "base/tuple.h" #include "net/base/completion_callback.h" +#include "net/base/net_errors.h" //----------------------------------------------------------------------------- // completion callback helper @@ -22,26 +23,62 @@ // reason, this class is probably not ideal for a general application. // -// Base class overridden by custom implementations of TestCompletionCallback. -class TestCompletionCallbackBase { +namespace net { + +namespace internal { + +class TestCompletionCallbackBaseInternal { public: - void SetResult(int result); - int WaitForResult(); - int GetResult(int result); bool have_result() const { return have_result_; } protected: - TestCompletionCallbackBase(); + TestCompletionCallbackBaseInternal(); + void DidSetResult(); + void WaitForResult(); - int result_; bool have_result_; bool waiting_for_result_; private: - DISALLOW_COPY_AND_ASSIGN(TestCompletionCallbackBase); + DISALLOW_COPY_AND_ASSIGN(TestCompletionCallbackBaseInternal); }; -namespace net { +template <typename R> +class TestCompletionCallbackTemplate + : public TestCompletionCallbackBaseInternal { + public: + void SetResult(R result) { + result_ = result; + DidSetResult(); + } + + R WaitForResult() { + TestCompletionCallbackBaseInternal::WaitForResult(); + return result_; + } + + R GetResult(R result) { + if (net::ERR_IO_PENDING != result) + return result; + return WaitForResult(); + } + + protected: + TestCompletionCallbackTemplate() : result_(R()) {} + R result_; + + private: + DISALLOW_COPY_AND_ASSIGN(TestCompletionCallbackTemplate); +}; + +} // namespace internal + +// Base class overridden by custom implementations of TestCompletionCallback. +typedef internal::TestCompletionCallbackTemplate<int> + TestCompletionCallbackBase; + +typedef internal::TestCompletionCallbackTemplate<int64> + TestInt64CompletionCallbackBase; class TestCompletionCallback : public TestCompletionCallbackBase { public: @@ -56,6 +93,19 @@ class TestCompletionCallback : public TestCompletionCallbackBase { DISALLOW_COPY_AND_ASSIGN(TestCompletionCallback); }; +class TestInt64CompletionCallback : public TestInt64CompletionCallbackBase { + public: + TestInt64CompletionCallback(); + ~TestInt64CompletionCallback(); + + const Int64CompletionCallback& callback() const { return callback_; } + + private: + const Int64CompletionCallback callback_; + + DISALLOW_COPY_AND_ASSIGN(TestInt64CompletionCallback); +}; + } // namespace net #endif // NET_BASE_TEST_COMPLETION_CALLBACK_H_ diff --git a/net/base/upload_data.cc b/net/base/upload_data.cc index 40d600c..71d77ae 100644 --- a/net/base/upload_data.cc +++ b/net/base/upload_data.cc @@ -126,7 +126,7 @@ FileStream* UploadData::Element::OpenFileStream() { return NULL; } if (file_range_offset_) { - rv = file->Seek(FROM_BEGIN, file_range_offset_); + rv = file->SeekSync(FROM_BEGIN, file_range_offset_); if (rv < 0) { DLOG(WARNING) << "Failed to seek \"" << file_path_.value() << "\" to offset: " << file_range_offset_ << " (" << rv diff --git a/net/disk_cache/entry_unittest.cc b/net/disk_cache/entry_unittest.cc index 21a5bb1..8168ae8 100644 --- a/net/disk_cache/entry_unittest.cc +++ b/net/disk_cache/entry_unittest.cc @@ -1786,7 +1786,7 @@ TEST_F(DiskCacheEntryTest, MemoryOnlyDoomSparseEntry) { // The way an CompletionCallback works means that all tasks (even new ones) // are executed by the message loop before returning to the caller so the only // way to simulate a race is to execute what we want on the callback. -class SparseTestCompletionCallback: public TestCompletionCallbackBase { +class SparseTestCompletionCallback: public net::TestCompletionCallbackBase { public: explicit SparseTestCompletionCallback(disk_cache::Backend* cache) : cache_(cache), diff --git a/net/http/http_cache_unittest.cc b/net/http/http_cache_unittest.cc index 0f4b8e7..c0b6b36 100644 --- a/net/http/http_cache_unittest.cc +++ b/net/http/http_cache_unittest.cc @@ -33,7 +33,7 @@ using base::Time; namespace { -class DeleteCacheCompletionCallback : public TestCompletionCallbackBase { +class DeleteCacheCompletionCallback : public net::TestCompletionCallbackBase { public: explicit DeleteCacheCompletionCallback(MockHttpCache* cache) : cache_(cache), diff --git a/net/url_request/url_request_file_job.cc b/net/url_request/url_request_file_job.cc index 1788d98..3c8deae 100644 --- a/net/url_request/url_request_file_job.cc +++ b/net/url_request/url_request_file_job.cc @@ -340,7 +340,7 @@ void URLRequestFileJob::DidResolve( if (remaining_bytes_ > 0 && byte_range_.first_byte_position() != 0 && byte_range_.first_byte_position() != - stream_.Seek(FROM_BEGIN, byte_range_.first_byte_position())) { + stream_.SeekSync(FROM_BEGIN, byte_range_.first_byte_position())) { NotifyDone(URLRequestStatus(URLRequestStatus::FAILED, ERR_REQUEST_RANGE_NOT_SATISFIABLE)); return; |