summaryrefslogtreecommitdiffstats
path: root/remoting/base
diff options
context:
space:
mode:
authorsergeyu <sergeyu@chromium.org>2015-07-20 12:41:13 -0700
committerCommit bot <commit-bot@chromium.org>2015-07-20 19:42:14 +0000
commitaa22c0858c81fe75f72c1706cf052af85a82b364 (patch)
tree70f6d5f799424f2717e3ba8f2be5e49920760cbf /remoting/base
parent2f8fcec9046f09f299be21aca1145b82bf41cde3 (diff)
downloadchromium_src-aa22c0858c81fe75f72c1706cf052af85a82b364.zip
chromium_src-aa22c0858c81fe75f72c1706cf052af85a82b364.tar.gz
chromium_src-aa22c0858c81fe75f72c1706cf052af85a82b364.tar.bz2
Add P2PDatagramSocket and P2PStreamSocket interfaces.
Previously remoting code was using net::Socket and net::StreamSocket for datagram and stream socket. Problem is that net::StreamSocket interface contains a lot of methods that are not relevant for peer-to-peer connections in remoting. Added P2PDatagramSocket and P2PStreamSocket interfaces independent of net::Socket. This allowed to remove a lot of the redundant code needed for net::StreamSocket implementations. There are two new adapters required in SslHmacChannelAuthenticator for the SSL layer, but these won't be necessary after we migrate to QUIC. Review URL: https://codereview.chromium.org/1197853003 Cr-Commit-Position: refs/heads/master@{#339489}
Diffstat (limited to 'remoting/base')
-rw-r--r--remoting/base/buffered_socket_writer.cc244
-rw-r--r--remoting/base/buffered_socket_writer.h111
-rw-r--r--remoting/base/buffered_socket_writer_unittest.cc11
3 files changed, 116 insertions, 250 deletions
diff --git a/remoting/base/buffered_socket_writer.cc b/remoting/base/buffered_socket_writer.cc
index 8213f40..c8a8302 100644
--- a/remoting/base/buffered_socket_writer.cc
+++ b/remoting/base/buffered_socket_writer.cc
@@ -5,218 +5,130 @@
#include "remoting/base/buffered_socket_writer.h"
#include "base/bind.h"
-#include "base/location.h"
-#include "base/single_thread_task_runner.h"
#include "base/stl_util.h"
-#include "base/thread_task_runner_handle.h"
+#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
+#include "net/socket/socket.h"
namespace remoting {
-struct BufferedSocketWriterBase::PendingPacket {
- PendingPacket(scoped_refptr<net::IOBufferWithSize> data,
+namespace {
+
+int WriteNetSocket(net::Socket* socket,
+ const scoped_refptr<net::IOBuffer>& buf,
+ int buf_len,
+ const net::CompletionCallback& callback) {
+ return socket->Write(buf.get(), buf_len, callback);
+}
+
+} // namespace
+
+struct BufferedSocketWriter::PendingPacket {
+ PendingPacket(scoped_refptr<net::DrainableIOBuffer> data,
const base::Closure& done_task)
: data(data),
done_task(done_task) {
}
- scoped_refptr<net::IOBufferWithSize> data;
+ scoped_refptr<net::DrainableIOBuffer> data;
base::Closure done_task;
};
-BufferedSocketWriterBase::BufferedSocketWriterBase()
- : socket_(nullptr),
- write_pending_(false),
- closed_(false),
- destroyed_flag_(nullptr) {
+// static
+scoped_ptr<BufferedSocketWriter> BufferedSocketWriter::CreateForSocket(
+ net::Socket* socket,
+ const WriteFailedCallback& write_failed_callback) {
+ scoped_ptr<BufferedSocketWriter> result(new BufferedSocketWriter());
+ result->Init(base::Bind(&WriteNetSocket, socket), write_failed_callback);
+ return result.Pass();
}
-void BufferedSocketWriterBase::Init(net::Socket* socket,
- const WriteFailedCallback& callback) {
- DCHECK(CalledOnValidThread());
- DCHECK(socket);
- socket_ = socket;
- write_failed_callback_ = callback;
+BufferedSocketWriter::BufferedSocketWriter() : weak_factory_(this) {
+}
+
+BufferedSocketWriter::~BufferedSocketWriter() {
+ STLDeleteElements(&queue_);
}
-bool BufferedSocketWriterBase::Write(
- scoped_refptr<net::IOBufferWithSize> data, const base::Closure& done_task) {
- DCHECK(CalledOnValidThread());
- DCHECK(socket_);
+void BufferedSocketWriter::Init(
+ const WriteCallback& write_callback,
+ const WriteFailedCallback& write_failed_callback) {
+ write_callback_ = write_callback;
+ write_failed_callback_ = write_failed_callback;
+}
+
+bool BufferedSocketWriter::Write(
+ const scoped_refptr<net::IOBufferWithSize>& data,
+ const base::Closure& done_task) {
+ DCHECK(thread_checker_.CalledOnValidThread());
DCHECK(data.get());
- // Don't write after Close().
- if (closed_)
+ // Don't write after error.
+ if (is_closed())
return false;
- queue_.push_back(new PendingPacket(data, done_task));
+ queue_.push_back(new PendingPacket(
+ new net::DrainableIOBuffer(data.get(), data->size()), done_task));
DoWrite();
- // DoWrite() may trigger OnWriteError() to be called.
- return !closed_;
+ return !is_closed();
}
-void BufferedSocketWriterBase::DoWrite() {
- DCHECK(CalledOnValidThread());
- DCHECK(socket_);
-
- // Don't try to write if there is another write pending.
- if (write_pending_)
- return;
+bool BufferedSocketWriter::is_closed() {
+ return write_callback_.is_null();
+}
- // Don't write after Close().
- if (closed_)
- return;
+void BufferedSocketWriter::DoWrite() {
+ DCHECK(thread_checker_.CalledOnValidThread());
- while (true) {
- net::IOBuffer* current_packet;
- int current_packet_size;
- GetNextPacket(&current_packet, &current_packet_size);
-
- // Return if the queue is empty.
- if (!current_packet)
- return;
-
- int result = socket_->Write(
- current_packet, current_packet_size,
- base::Bind(&BufferedSocketWriterBase::OnWritten,
- base::Unretained(this)));
- bool write_again = false;
- HandleWriteResult(result, &write_again);
- if (!write_again)
- return;
+ base::WeakPtr<BufferedSocketWriter> self = weak_factory_.GetWeakPtr();
+ while (self && !write_pending_ && !is_closed() && !queue_.empty()) {
+ int result = write_callback_.Run(
+ queue_.front()->data.get(), queue_.front()->data->BytesRemaining(),
+ base::Bind(&BufferedSocketWriter::OnWritten,
+ weak_factory_.GetWeakPtr()));
+ HandleWriteResult(result);
}
}
-void BufferedSocketWriterBase::HandleWriteResult(int result,
- bool* write_again) {
- *write_again = false;
+void BufferedSocketWriter::HandleWriteResult(int result) {
if (result < 0) {
if (result == net::ERR_IO_PENDING) {
write_pending_ = true;
} else {
- HandleError(result);
- if (!write_failed_callback_.is_null())
- write_failed_callback_.Run(result);
+ write_callback_.Reset();
+ if (!write_failed_callback_.is_null()) {
+ WriteFailedCallback callback = write_failed_callback_;
+ callback.Run(result);
+ }
}
return;
}
- base::Closure done_task = AdvanceBufferPosition(result);
- if (!done_task.is_null()) {
- bool destroyed = false;
- destroyed_flag_ = &destroyed;
- done_task.Run();
- if (destroyed) {
- // Stop doing anything if we've been destroyed by the callback.
- return;
- }
- destroyed_flag_ = nullptr;
- }
-
- *write_again = true;
-}
-
-void BufferedSocketWriterBase::OnWritten(int result) {
- DCHECK(CalledOnValidThread());
- DCHECK(write_pending_);
- write_pending_ = false;
-
- bool write_again;
- HandleWriteResult(result, &write_again);
- if (write_again)
- DoWrite();
-}
-
-void BufferedSocketWriterBase::HandleError(int result) {
- DCHECK(CalledOnValidThread());
-
- closed_ = true;
-
- STLDeleteElements(&queue_);
-
- // Notify subclass that an error is received.
- OnError(result);
-}
-
-void BufferedSocketWriterBase::Close() {
- DCHECK(CalledOnValidThread());
- closed_ = true;
-}
-
-BufferedSocketWriterBase::~BufferedSocketWriterBase() {
- if (destroyed_flag_)
- *destroyed_flag_ = true;
-
- STLDeleteElements(&queue_);
-}
-
-base::Closure BufferedSocketWriterBase::PopQueue() {
- base::Closure result = queue_.front()->done_task;
- delete queue_.front();
- queue_.pop_front();
- return result;
-}
-
-BufferedSocketWriter::BufferedSocketWriter() {
-}
-
-void BufferedSocketWriter::GetNextPacket(
- net::IOBuffer** buffer, int* size) {
- if (!current_buf_.get()) {
- if (queue_.empty()) {
- *buffer = nullptr;
- return; // Nothing to write.
- }
- current_buf_ = new net::DrainableIOBuffer(queue_.front()->data.get(),
- queue_.front()->data->size());
- }
-
- *buffer = current_buf_.get();
- *size = current_buf_->BytesRemaining();
-}
-
-base::Closure BufferedSocketWriter::AdvanceBufferPosition(int written) {
- current_buf_->DidConsume(written);
-
- if (current_buf_->BytesRemaining() == 0) {
- current_buf_ = nullptr;
- return PopQueue();
- }
- return base::Closure();
-}
-
-void BufferedSocketWriter::OnError(int result) {
- current_buf_ = nullptr;
-}
+ DCHECK(!queue_.empty());
-BufferedSocketWriter::~BufferedSocketWriter() {
-}
+ queue_.front()->data->DidConsume(result);
-BufferedDatagramWriter::BufferedDatagramWriter() {
-}
+ if (queue_.front()->data->BytesRemaining() == 0) {
+ base::Closure done_task = queue_.front()->done_task;
+ delete queue_.front();
+ queue_.pop_front();
-void BufferedDatagramWriter::GetNextPacket(
- net::IOBuffer** buffer, int* size) {
- if (queue_.empty()) {
- *buffer = nullptr;
- return; // Nothing to write.
+ if (!done_task.is_null())
+ done_task.Run();
}
- *buffer = queue_.front()->data.get();
- *size = queue_.front()->data->size();
-}
-
-base::Closure BufferedDatagramWriter::AdvanceBufferPosition(int written) {
- DCHECK_EQ(written, queue_.front()->data->size());
- return PopQueue();
}
-void BufferedDatagramWriter::OnError(int result) {
- // Nothing to do here.
-}
+void BufferedSocketWriter::OnWritten(int result) {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ DCHECK(write_pending_);
+ write_pending_ = false;
-BufferedDatagramWriter::~BufferedDatagramWriter() {
+ base::WeakPtr<BufferedSocketWriter> self = weak_factory_.GetWeakPtr();
+ HandleWriteResult(result);
+ if (self)
+ DoWrite();
}
} // namespace remoting
diff --git a/remoting/base/buffered_socket_writer.h b/remoting/base/buffered_socket_writer.h
index c6012e3..255b2e5 100644
--- a/remoting/base/buffered_socket_writer.h
+++ b/remoting/base/buffered_socket_writer.h
@@ -8,10 +8,11 @@
#include <list>
#include "base/callback.h"
+#include "base/memory/weak_ptr.h"
#include "base/synchronization/lock.h"
-#include "base/threading/non_thread_safe.h"
+#include "base/threading/thread_checker.h"
+#include "net/base/completion_callback.h"
#include "net/base/io_buffer.h"
-#include "net/socket/socket.h"
namespace net {
class Socket;
@@ -19,105 +20,57 @@ class Socket;
namespace remoting {
-// BufferedSocketWriter and BufferedDatagramWriter implement write data queue
-// for stream and datagram sockets. BufferedSocketWriterBase is a base class
-// that implements base functionality common for streams and datagrams.
-// These classes are particularly useful when data comes from a thread
-// that doesn't own the socket, as Write() can be called from any thread.
-// Whenever new data is written it is just put in the queue, and then written
-// on the thread that owns the socket. GetBufferChunks() and GetBufferSize()
-// can be used to throttle writes.
-
-class BufferedSocketWriterBase : public base::NonThreadSafe {
+// BufferedSocketWriter implement write data queue for stream sockets.
+class BufferedSocketWriter {
public:
+ typedef base::Callback<int(const scoped_refptr<net::IOBuffer>& buf,
+ int buf_len,
+ const net::CompletionCallback& callback)>
+ WriteCallback;
typedef base::Callback<void(int)> WriteFailedCallback;
- BufferedSocketWriterBase();
- virtual ~BufferedSocketWriterBase();
+ static scoped_ptr<BufferedSocketWriter> CreateForSocket(
+ net::Socket* socket,
+ const WriteFailedCallback& write_failed_callback);
+
+ BufferedSocketWriter();
+ virtual ~BufferedSocketWriter();
- // Initializes the writer. Must be called on the thread that will be used
- // to access the socket in the future. |callback| will be called after each
- // failed write. Caller retains ownership of |socket|.
- // TODO(sergeyu): Change it so that it take ownership of |socket|.
- void Init(net::Socket* socket, const WriteFailedCallback& callback);
+ // Initializes the writer. |write_callback| is called to write data to the
+ // socket. |write_failed_callback| is called when write operation fails.
+ // Writing stops after the first failed write.
+ void Init(const WriteCallback& write_callback,
+ const WriteFailedCallback& write_failed_callback);
- // Puts a new data chunk in the buffer. Returns false and doesn't enqueue
- // the data if called before Init(). Can be called on any thread.
- bool Write(scoped_refptr<net::IOBufferWithSize> buffer,
+ // Puts a new data chunk in the buffer. Returns false if writing has stopped
+ // because of an error.
+ bool Write(const scoped_refptr<net::IOBufferWithSize>& buffer,
const base::Closure& done_task);
// Returns true when there is data waiting to be written.
bool has_data_pending() { return !queue_.empty(); }
- // Stops writing and drops current buffers. Must be called on the
- // network thread.
- void Close();
-
- protected:
+ private:
struct PendingPacket;
typedef std::list<PendingPacket*> DataQueue;
- DataQueue queue_;
-
- // Removes element from the front of the queue and returns |done_task| for
- // that element. Called from AdvanceBufferPosition() implementation, which
- // then returns result of this function to its caller.
- base::Closure PopQueue();
-
- // Following three methods must be implemented in child classes.
-
- // Returns next packet that needs to be written to the socket. Implementation
- // must set |*buffer| to nullptr if there is nothing left in the queue.
- virtual void GetNextPacket(net::IOBuffer** buffer, int* size) = 0;
+ // Returns true if the writer is closed due to an error.
+ bool is_closed();
- // Returns closure that must be executed or null closure if the last write
- // didn't complete any messages.
- virtual base::Closure AdvanceBufferPosition(int written) = 0;
-
- // This method is called whenever there is an error writing to the socket.
- virtual void OnError(int result) = 0;
-
- private:
void DoWrite();
- void HandleWriteResult(int result, bool* write_again);
+ void HandleWriteResult(int result);
void OnWritten(int result);
- // This method is called when an error is encountered.
- void HandleError(int result);
+ base::ThreadChecker thread_checker_;
- net::Socket* socket_;
+ WriteCallback write_callback_;
WriteFailedCallback write_failed_callback_;
- bool write_pending_;
-
- bool closed_;
-
- bool* destroyed_flag_;
-};
-
-class BufferedSocketWriter : public BufferedSocketWriterBase {
- public:
- BufferedSocketWriter();
- ~BufferedSocketWriter() override;
-
- protected:
- void GetNextPacket(net::IOBuffer** buffer, int* size) override;
- base::Closure AdvanceBufferPosition(int written) override;
- void OnError(int result) override;
-
- private:
- scoped_refptr<net::DrainableIOBuffer> current_buf_;
-};
+ DataQueue queue_;
-class BufferedDatagramWriter : public BufferedSocketWriterBase {
- public:
- BufferedDatagramWriter();
- ~BufferedDatagramWriter() override;
+ bool write_pending_ = false;
- protected:
- void GetNextPacket(net::IOBuffer** buffer, int* size) override;
- base::Closure AdvanceBufferPosition(int written) override;
- void OnError(int result) override;
+ base::WeakPtrFactory<BufferedSocketWriter> weak_factory_;
};
} // namespace remoting
diff --git a/remoting/base/buffered_socket_writer_unittest.cc b/remoting/base/buffered_socket_writer_unittest.cc
index 07bb2ec..8e5a1a5 100644
--- a/remoting/base/buffered_socket_writer_unittest.cc
+++ b/remoting/base/buffered_socket_writer_unittest.cc
@@ -18,7 +18,8 @@
namespace remoting {
namespace {
-const int kTestBufferSize = 10 * 1024; // 10k;
+
+const int kTestBufferSize = 10000;
const size_t kWriteChunkSize = 1024U;
class SocketDataProvider: public net::SocketDataProvider {
@@ -93,9 +94,9 @@ class BufferedSocketWriterTest : public testing::Test {
net::MockConnect(net::SYNCHRONOUS, net::OK));
EXPECT_EQ(net::OK, socket_->Connect(net::CompletionCallback()));
- writer_.reset(new BufferedSocketWriter());
- writer_->Init(socket_.get(), base::Bind(
- &BufferedSocketWriterTest::OnWriteFailed, base::Unretained(this)));
+ writer_ = BufferedSocketWriter::CreateForSocket(
+ socket_.get(), base::Bind(&BufferedSocketWriterTest::OnWriteFailed,
+ base::Unretained(this)));
test_buffer_ = new net::IOBufferWithSize(kTestBufferSize);
test_buffer_2_ = new net::IOBufferWithSize(kTestBufferSize);
for (int i = 0; i< kTestBufferSize; ++i) {
@@ -126,7 +127,7 @@ class BufferedSocketWriterTest : public testing::Test {
void TestAppendInCallback() {
writer_->Write(test_buffer_, base::Bind(
- base::IgnoreResult(&BufferedSocketWriterBase::Write),
+ base::IgnoreResult(&BufferedSocketWriter::Write),
base::Unretained(writer_.get()), test_buffer_2_,
base::Closure()));
base::RunLoop().RunUntilIdle();