summaryrefslogtreecommitdiffstats
path: root/remoting
diff options
context:
space:
mode:
Diffstat (limited to 'remoting')
-rw-r--r--remoting/protocol/buffered_socket_writer.cc123
-rw-r--r--remoting/protocol/buffered_socket_writer.h66
-rw-r--r--remoting/protocol/rtp_reader.cc37
-rw-r--r--remoting/protocol/rtp_reader.h48
-rw-r--r--remoting/protocol/rtp_utils.cc90
-rw-r--r--remoting/protocol/rtp_utils.h40
-rw-r--r--remoting/protocol/rtp_writer.cc88
-rw-r--r--remoting/protocol/rtp_writer.h40
-rw-r--r--remoting/protocol/socket_reader_base.cc64
-rw-r--r--remoting/protocol/socket_reader_base.h43
-rw-r--r--remoting/protocol/stream_reader.cc59
-rw-r--r--remoting/protocol/stream_reader.h43
-rw-r--r--remoting/remoting.gyp8
13 files changed, 607 insertions, 142 deletions
diff --git a/remoting/protocol/buffered_socket_writer.cc b/remoting/protocol/buffered_socket_writer.cc
index 6c60a4a..0650cbe 100644
--- a/remoting/protocol/buffered_socket_writer.cc
+++ b/remoting/protocol/buffered_socket_writer.cc
@@ -9,63 +9,69 @@
namespace remoting {
-BufferedSocketWriter::BufferedSocketWriter()
- : socket_(NULL),
- message_loop_(NULL),
- buffer_size_(0),
- write_pending_(false),
- ALLOW_THIS_IN_INITIALIZER_LIST(
- written_callback_(this, &BufferedSocketWriter::OnWritten)),
- closed_(false) {
+BufferedSocketWriterBase::BufferedSocketWriterBase()
+ : buffer_size_(0),
+ socket_(NULL),
+ message_loop_(NULL),
+ write_pending_(false),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ written_callback_(this, &BufferedSocketWriterBase::OnWritten)),
+ closed_(false) {
}
-BufferedSocketWriter::~BufferedSocketWriter() { }
+BufferedSocketWriterBase::~BufferedSocketWriterBase() { }
-void BufferedSocketWriter::Init(net::Socket* socket,
+void BufferedSocketWriterBase::Init(net::Socket* socket,
WriteFailedCallback* callback) {
AutoLock auto_lock(lock_);
message_loop_ = MessageLoop::current();
socket_ = socket;
}
-bool BufferedSocketWriter::Write(scoped_refptr<net::IOBufferWithSize> data) {
+bool BufferedSocketWriterBase::Write(
+ scoped_refptr<net::IOBufferWithSize> data) {
AutoLock auto_lock(lock_);
if (!socket_)
return false;
- queue_.push_back(data);
+ queue_.push(data);
buffer_size_ += data->size();
message_loop_->PostTask(
- FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriter::DoWrite));
+ FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriterBase::DoWrite));
return true;
}
-void BufferedSocketWriter::DoWrite() {
+void BufferedSocketWriterBase::DoWrite() {
DCHECK_EQ(message_loop_, MessageLoop::current());
DCHECK(socket_);
- // Don't try to write if the writer not initialized or closed, or
- // there is already a write pending.
- if (write_pending_ || closed_)
+ // Don't try to write if there is another write pending.
+ if (write_pending_)
return;
+ // Don't write after Close().
+ {
+ AutoLock auto_lock(lock_);
+ if (closed_)
+ return;
+ }
+
while (true) {
- while (!current_buf_ || current_buf_->BytesRemaining() == 0) {
+ net::IOBuffer* current_packet;
+ int current_packet_size;
+ {
AutoLock auto_lock(lock_);
- if (queue_.empty())
- return; // Nothing to write.
- current_buf_ =
- new net::DrainableIOBuffer(queue_.front(), queue_.front()->size());
- queue_.pop_front();
+ GetNextPacket_Locked(&current_packet, &current_packet_size);
}
- int result = socket_->Write(current_buf_, current_buf_->BytesRemaining(),
+ // Return if the queue is empty.
+ if (!current_packet)
+ return;
+
+ int result = socket_->Write(current_packet, current_packet_size,
&written_callback_);
if (result >= 0) {
- {
- AutoLock auto_lock(lock_);
- buffer_size_ -= result;
- }
- current_buf_->DidConsume(result);
+ AutoLock auto_lock(lock_);
+ AdvanceBufferPosition_Locked(result);
} else {
if (result == net::ERR_IO_PENDING) {
write_pending_ = true;
@@ -79,38 +85,83 @@ void BufferedSocketWriter::DoWrite() {
}
}
-void BufferedSocketWriter::OnWritten(int result) {
+void BufferedSocketWriterBase::OnWritten(int result) {
DCHECK_EQ(message_loop_, MessageLoop::current());
write_pending_ = false;
if (result < 0) {
if (write_failed_callback_.get())
write_failed_callback_->Run(result);
+ return;
}
{
AutoLock auto_lock(lock_);
- buffer_size_ -= result;
+ AdvanceBufferPosition_Locked(result);
}
- current_buf_->DidConsume(result);
+
// Schedule next write.
message_loop_->PostTask(
- FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriter::DoWrite));
+ FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriterBase::DoWrite));
}
-int BufferedSocketWriter::GetBufferSize() {
+int BufferedSocketWriterBase::GetBufferSize() {
AutoLock auto_lock(lock_);
return buffer_size_;
}
-int BufferedSocketWriter::GetBufferChunks() {
+int BufferedSocketWriterBase::GetBufferChunks() {
AutoLock auto_lock(lock_);
return queue_.size();
}
-void BufferedSocketWriter::Close() {
+void BufferedSocketWriterBase::Close() {
AutoLock auto_lock(lock_);
closed_ = true;
}
+BufferedSocketWriter::BufferedSocketWriter() { }
+BufferedSocketWriter::~BufferedSocketWriter() { }
+
+void BufferedSocketWriter::GetNextPacket_Locked(
+ net::IOBuffer** buffer, int* size) {
+ while (!current_buf_ || current_buf_->BytesRemaining() == 0) {
+ if (queue_.empty()) {
+ *buffer = NULL;
+ return; // Nothing to write.
+ }
+ current_buf_ =
+ new net::DrainableIOBuffer(queue_.front(), queue_.front()->size());
+ queue_.pop();
+ }
+
+ *buffer = current_buf_;
+ *size = current_buf_->BytesRemaining();
+}
+
+void BufferedSocketWriter::AdvanceBufferPosition_Locked(int written) {
+ buffer_size_ -= written;
+ current_buf_->DidConsume(written);
+}
+
+BufferedDatagramWriter::BufferedDatagramWriter() { }
+BufferedDatagramWriter::~BufferedDatagramWriter() { }
+
+void BufferedDatagramWriter::GetNextPacket_Locked(
+ net::IOBuffer** buffer, int* size) {
+ if (queue_.empty()) {
+ *buffer = NULL;
+ return; // Nothing to write.
+ }
+ *buffer = queue_.front();
+ *size = queue_.front()->size();
+}
+
+void BufferedDatagramWriter::AdvanceBufferPosition_Locked(int written) {
+ DCHECK_EQ(written, queue_.front()->size());
+ buffer_size_ -= queue_.front()->size();
+ queue_.pop();
+}
+
+
} // namespace remoting
diff --git a/remoting/protocol/buffered_socket_writer.h b/remoting/protocol/buffered_socket_writer.h
index 3d4709d..dd1cde8 100644
--- a/remoting/protocol/buffered_socket_writer.h
+++ b/remoting/protocol/buffered_socket_writer.h
@@ -5,7 +5,7 @@
#ifndef REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_
#define REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_
-#include <deque>
+#include <queue>
#include "base/lock.h"
#include "base/ref_counted.h"
@@ -20,13 +20,22 @@ class Socket;
namespace remoting {
-class BufferedSocketWriter
- : public base::RefCountedThreadSafe<BufferedSocketWriter> {
+// BufferedSocketWriter and BufferedDatagramWriter implement write data queue
+// for stream and datagram sockets. BufferedSocketWriterBase is a base class
+// that implements base functionality common for streams and datagrams.
+// These classes are particularly useful when data comes from a thread
+// that doesn't own the socket, as Write() can be called from any thread.
+// Whenever new data is written it is just put in the queue, and then written
+// on the thread that owns the socket. GetBufferChunks() and GetBufferSize()
+// can be used to throttle writes.
+
+class BufferedSocketWriterBase
+ : public base::RefCountedThreadSafe<BufferedSocketWriterBase> {
public:
typedef Callback1<int>::Type WriteFailedCallback;
- BufferedSocketWriter();
- virtual ~BufferedSocketWriter();
+ explicit BufferedSocketWriterBase();
+ virtual ~BufferedSocketWriterBase();
// Initializes the writer. Must be called on the thread that will be used
// to access the socket in the future. |callback| will be called after each
@@ -47,28 +56,59 @@ class BufferedSocketWriter
// Stops writing and drops current buffers.
void Close();
- private:
- typedef std::deque<scoped_refptr<net::IOBufferWithSize> > DataQueue;
+ protected:
+ typedef std::queue<scoped_refptr<net::IOBufferWithSize> > DataQueue;
+
+ DataQueue queue_;
+ int buffer_size_;
+
+ // Following two methods must be implemented in child classes.
+ // GetNextPacket() returns next packet that needs to be written to the
+ // socket. |buffer| must be set to NULL if there is nothing left in the queue.
+ virtual void GetNextPacket_Locked(net::IOBuffer** buffer, int* size) = 0;
+ virtual void AdvanceBufferPosition_Locked(int written) = 0;
+ private:
void DoWrite();
void OnWritten(int result);
+ // Must be locked when accessing |socket_|, |queue_| and |buffer_size_|;
+ Lock lock_;
+
net::Socket* socket_;
MessageLoop* message_loop_;
scoped_ptr<WriteFailedCallback> write_failed_callback_;
- Lock lock_;
-
- DataQueue queue_;
- int buffer_size_;
- scoped_refptr<net::DrainableIOBuffer> current_buf_;
bool write_pending_;
- net::CompletionCallbackImpl<BufferedSocketWriter> written_callback_;
+ net::CompletionCallbackImpl<BufferedSocketWriterBase> written_callback_;
bool closed_;
};
+class BufferedSocketWriter : public BufferedSocketWriterBase {
+ public:
+ BufferedSocketWriter();
+ virtual ~BufferedSocketWriter();
+
+ protected:
+ virtual void GetNextPacket_Locked(net::IOBuffer** buffer, int* size);
+ virtual void AdvanceBufferPosition_Locked(int written);
+
+ private:
+ scoped_refptr<net::DrainableIOBuffer> current_buf_;
+};
+
+class BufferedDatagramWriter : public BufferedSocketWriterBase {
+ public:
+ BufferedDatagramWriter();
+ virtual ~BufferedDatagramWriter();
+
+ protected:
+ virtual void GetNextPacket_Locked(net::IOBuffer** buffer, int* size);
+ virtual void AdvanceBufferPosition_Locked(int written);
+};
+
} // namespace remoting
#endif // REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_
diff --git a/remoting/protocol/rtp_reader.cc b/remoting/protocol/rtp_reader.cc
new file mode 100644
index 0000000..4bc0b6c
--- /dev/null
+++ b/remoting/protocol/rtp_reader.cc
@@ -0,0 +1,37 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/rtp_reader.h"
+
+#include "net/base/completion_callback.h"
+#include "net/base/io_buffer.h"
+
+namespace remoting {
+
+// RtpReader class.
+RtpReader::RtpReader() { }
+RtpReader::~RtpReader() { }
+
+void RtpReader::Init(net::Socket* socket,
+ OnMessageCallback* on_message_callback) {
+ on_message_callback_.reset(on_message_callback);
+ SocketReaderBase::Init(socket);
+}
+
+void RtpReader::OnDataReceived(net::IOBuffer* buffer, int data_size) {
+ RtpPacket packet;
+ int header_size = UnpackRtpHeader(reinterpret_cast<uint8*>(buffer->data()),
+ data_size, &packet.header);
+ if (header_size < 0) {
+ LOG(WARNING) << "Received invalid RTP packet.";
+ return;
+ }
+ packet.data = buffer;
+ packet.payload = buffer->data() + header_size;
+ packet.payload_size = data_size - header_size;
+
+ on_message_callback_->Run(&packet);
+}
+
+} // namespace remoting
diff --git a/remoting/protocol/rtp_reader.h b/remoting/protocol/rtp_reader.h
new file mode 100644
index 0000000..4533756
--- /dev/null
+++ b/remoting/protocol/rtp_reader.h
@@ -0,0 +1,48 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_RTP_READER_H_
+#define REMOTING_PROTOCOL_RTP_READER_H_
+
+#include "base/scoped_ptr.h"
+#include "remoting/protocol/rtp_utils.h"
+#include "remoting/protocol/socket_reader_base.h"
+
+namespace remoting {
+
+struct RtpPacket {
+ RtpHeader header;
+ scoped_refptr<net::IOBuffer> data;
+ char* payload;
+ int payload_size;
+};
+
+class RtpReader : public SocketReaderBase {
+ public:
+ RtpReader();
+ ~RtpReader();
+
+ // The OnMessageCallback is called whenever a new message is received.
+ // Ownership of the message is passed the callback.
+ typedef Callback1<RtpPacket*>::Type OnMessageCallback;
+
+ // Initialize the reader and start reading. Must be called on the thread
+ // |socket| belongs to. The callback will be called when a new message is
+ // received. RtpReader owns |on_message_callback|, doesn't own
+ // |socket|.
+ void Init(net::Socket* socket, OnMessageCallback* on_message_callback);
+
+ protected:
+ virtual void OnDataReceived(net::IOBuffer* buffer, int data_size);
+
+ private:
+ scoped_ptr<OnMessageCallback> on_message_callback_;
+
+ DISALLOW_COPY_AND_ASSIGN(RtpReader);
+};
+
+} // namespace remoting
+
+
+#endif // REMOTING_PROTOCOL_RTP_READER_H_
diff --git a/remoting/protocol/rtp_utils.cc b/remoting/protocol/rtp_utils.cc
new file mode 100644
index 0000000..df10af2
--- /dev/null
+++ b/remoting/protocol/rtp_utils.cc
@@ -0,0 +1,90 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/rtp_utils.h"
+
+#include "base/logging.h"
+#include "third_party/libjingle/source/talk/base/byteorder.h"
+
+using talk_base::GetBE16;
+using talk_base::GetBE32;
+using talk_base::SetBE16;
+using talk_base::SetBE32;
+
+namespace remoting {
+
+namespace {
+const int kRtpBaseHeaderSize = 12;
+const uint8 kRtpVersionNumber = 2;
+const int kRtpMaxSources = 16;
+} // namespace
+
+int GetRtpHeaderSize(int sources) {
+ DCHECK_GE(sources, 0);
+ DCHECK_LT(sources, kRtpMaxSources);
+ return kRtpBaseHeaderSize + sources * 4;
+}
+
+void PackRtpHeader(uint8* buffer, int buffer_size,
+ const RtpHeader& header) {
+ DCHECK_LT(header.sources, kRtpMaxSources);
+ DCHECK_LT(header.payload_type, 1 << 7);
+ CHECK_GT(buffer_size, GetRtpHeaderSize(header.sources));
+
+ buffer[0] = (kRtpVersionNumber << 6) +
+ ((uint8)header.padding << 5) +
+ ((uint8)header.extension << 4) +
+ header.sources;
+ buffer[1] = ((uint8)header.marker << 7) +
+ header.payload_type;
+ SetBE16(buffer + 2, header.sequence_number);
+ SetBE32(buffer + 4, header.timestamp);
+ SetBE32(buffer + 8, header.sync_source_id);
+
+ for (int i = 0; i < header.sources; i++) {
+ SetBE32(buffer + i * 4 + 12, header.source_id[i]);
+ }
+}
+
+static inline uint8 ExtractBits(uint8 byte, int bits_count, int shift) {
+ return (byte >> shift) && ((1 << bits_count) - 1);
+}
+
+int UnpackRtpHeader(const uint8* buffer, int buffer_size, RtpHeader* header) {
+ DCHECK_LT(header->sources, 1 << 4);
+ DCHECK_LT(header->payload_type, 1 << 7);
+
+ if (buffer_size < kRtpBaseHeaderSize) {
+ return -1;
+ }
+
+ int version = ExtractBits(buffer[0], 2, 6);
+ if (version != kRtpVersionNumber) {
+ return -1;
+ }
+
+ header->padding = ExtractBits(buffer[0], 1, 5) != 0;
+ header->extension = ExtractBits(buffer[0], 1, 4) != 0;
+ header->sources = ExtractBits(buffer[0], 4, 0);
+
+ header->marker = ExtractBits(buffer[1], 1, 7) != 0;
+ header->sources = ExtractBits(buffer[1], 7, 0);
+
+ header->sequence_number = GetBE16(buffer + 2);
+ header->timestamp = GetBE32(buffer + 4);
+ header->sync_source_id = GetBE32(buffer + 8);
+
+ DCHECK_LE(header->sources, 16);
+
+ if (buffer_size < GetRtpHeaderSize(header->sources)) {
+ return -1;
+ }
+ for (int i = 0; i < header->sources; i++) {
+ header->source_id[i] = GetBE32(buffer + i * 4 + 12);
+ }
+
+ return GetRtpHeaderSize(header->sources);
+}
+
+} // namespace remoting
diff --git a/remoting/protocol/rtp_utils.h b/remoting/protocol/rtp_utils.h
new file mode 100644
index 0000000..43f6c66
--- /dev/null
+++ b/remoting/protocol/rtp_utils.h
@@ -0,0 +1,40 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_RTP_UTILS_H_
+#define REMOTING_PROTOCOL_RTP_UTILS_H_
+
+#include "base/basictypes.h"
+
+namespace remoting {
+
+struct RtpHeader {
+ // RTP version is always set to 2.
+ // version = 2
+ bool padding;
+ bool extension;
+ uint8 sources;
+ bool marker;
+ uint8 payload_type;
+ uint16 sequence_number;
+ uint32 timestamp;
+ uint32 sync_source_id;
+ uint32 source_id[15];
+};
+
+// Returns size of RTP header for the specified number of sources.
+int GetRtpHeaderSize(int sources);
+
+// Packs RTP header into the buffer.
+void PackRtpHeader(uint8* buffer, int buffer_size,
+ const RtpHeader& header);
+
+// Unpacks RTP header and stores unpacked values in |header|. If the header
+// is not valid returns -1, otherwise returns size of the header.
+int UnpackRtpHeader(const uint8* buffer, int buffer_size,
+ RtpHeader* header);
+
+} // namespace remoting
+
+#endif // REMOTING_PROTOCOL_RTP_UTILS_H_
diff --git a/remoting/protocol/rtp_writer.cc b/remoting/protocol/rtp_writer.cc
new file mode 100644
index 0000000..a9df798
--- /dev/null
+++ b/remoting/protocol/rtp_writer.cc
@@ -0,0 +1,88 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/rtp_writer.h"
+
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "remoting/protocol/rtp_utils.h"
+
+namespace remoting {
+
+namespace {
+const int kMtu = 1200;
+const uint8 kRtpPayloadTypePrivate = 96;
+}
+
+RtpWriter::RtpWriter()
+ : rtp_socket_(NULL),
+ rtcp_socket_(NULL),
+ last_packet_number_(0) {
+}
+
+RtpWriter::~RtpWriter() { }
+
+// Initializes the writer. Must be called on the thread the sockets belong
+// to.
+void RtpWriter::Init(net::Socket* rtp_socket, net::Socket* rtcp_socket) {
+ buffered_rtp_writer_ = new BufferedDatagramWriter();
+ buffered_rtp_writer_->Init(rtp_socket, NULL);
+ rtp_socket_ = rtp_socket;
+ rtcp_socket_ = rtcp_socket;
+}
+
+void RtpWriter::SendPacket(const char* data, int packet_size,
+ uint32 timestamp) {
+ RtpHeader header;
+ header.padding = false;
+ header.extension = false;
+ header.sources = 0;
+ header.payload_type = kRtpPayloadTypePrivate;
+ header.timestamp = timestamp;
+
+ // TODO(sergeyu): RTP requires that SSRC is chosen randomly by each
+ // participant. There are only two participants in chromoting session,
+ // so SSRC isn't useful. Implement it in future if neccessary.
+ header.sync_source_id = 0;
+
+ // TODO(sergeyu): Add VP8 payload header.
+
+ int position = 0;
+ while (position < packet_size) {
+ // Allocate buffer.
+ int size = std::max(kMtu, packet_size - position);
+ int header_size = GetRtpHeaderSize(header.sources) + size;
+ int total_size = size + header_size;
+ net::IOBufferWithSize* buffer = new net::IOBufferWithSize(total_size);
+
+ // Set marker if this is the last frame.
+ header.marker = (position + size) == packet_size;
+
+ // TODO(sergeyu): Handle sequence number wrapping.
+ header.sequence_number = last_packet_number_;
+ ++last_packet_number_;
+
+ // Generate header.
+ PackRtpHeader(reinterpret_cast<uint8*>(buffer->data()), buffer->size(),
+ header);
+
+ // Copy data to the buffer.
+ memcpy(buffer->data() + header_size, data + position, size);
+
+ // Send it.
+ buffered_rtp_writer_->Write(buffer);
+
+ position += size;
+ }
+
+ DCHECK_EQ(position, packet_size);
+}
+
+// Stop writing and drop pending data. Must be called from the same thread as
+// Init().
+void RtpWriter::Close() {
+ buffered_rtp_writer_->Close();
+}
+
+} // namespace remoting
diff --git a/remoting/protocol/rtp_writer.h b/remoting/protocol/rtp_writer.h
new file mode 100644
index 0000000..5ba7501
--- /dev/null
+++ b/remoting/protocol/rtp_writer.h
@@ -0,0 +1,40 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_RTP_WRITER_H_
+#define REMOTING_PROTOCOL_RTP_WRITER_H_
+
+#include "net/socket/socket.h"
+#include "remoting/protocol/buffered_socket_writer.h"
+
+namespace remoting {
+
+class RtpWriter {
+ public:
+ RtpWriter();
+ virtual ~RtpWriter();
+
+ // Initializes the writer. Must be called on the thread the sockets belong
+ // to.
+ void Init(net::Socket* rtp_socket, net::Socket* rtcp_socket);
+
+ void SendPacket(const char* buffer, int packet_size,
+ uint32 timestamp);
+
+ // Stop writing and drop pending data. Must be called from the same thread as
+ // Init().
+ void Close();
+
+ private:
+ net::Socket* rtp_socket_;
+ net::Socket* rtcp_socket_;
+
+ uint32 last_packet_number_;
+
+ scoped_refptr<BufferedDatagramWriter> buffered_rtp_writer_;
+};
+
+} // namespace remoting
+
+#endif // REMOTING_PROTOCOL_RTP_WRITER_H_
diff --git a/remoting/protocol/socket_reader_base.cc b/remoting/protocol/socket_reader_base.cc
new file mode 100644
index 0000000..41648bc
--- /dev/null
+++ b/remoting/protocol/socket_reader_base.cc
@@ -0,0 +1,64 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "remoting/protocol/socket_reader_base.h"
+
+#include "net/base/completion_callback.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "net/socket/socket.h"
+
+namespace remoting {
+
+namespace {
+int kReadBufferSize = 4096;
+} // namespace
+
+SocketReaderBase::SocketReaderBase()
+ : socket_(NULL),
+ closed_(false),
+ ALLOW_THIS_IN_INITIALIZER_LIST(
+ read_callback_(this, &SocketReaderBase::OnRead)) {
+}
+
+SocketReaderBase::~SocketReaderBase() { }
+
+void SocketReaderBase::Close() {
+ closed_ = true;
+}
+
+void SocketReaderBase::Init(net::Socket* socket) {
+ DCHECK(socket);
+ socket_ = socket;
+ DoRead();
+}
+
+void SocketReaderBase::DoRead() {
+ while (true) {
+ read_buffer_ = new net::IOBuffer(kReadBufferSize);
+ int result = socket_->Read(
+ read_buffer_, kReadBufferSize, &read_callback_);
+ HandleReadResult(result);
+ if (result < 0)
+ break;
+ }
+}
+
+void SocketReaderBase::OnRead(int result) {
+ if (!closed_) {
+ HandleReadResult(result);
+ DoRead();
+ }
+}
+
+void SocketReaderBase::HandleReadResult(int result) {
+ if (result > 0) {
+ OnDataReceived(read_buffer_, result);
+ } else {
+ if (result != net::ERR_IO_PENDING)
+ LOG(ERROR) << "Read() returned error " << result;
+ }
+}
+
+} // namespace remoting
diff --git a/remoting/protocol/socket_reader_base.h b/remoting/protocol/socket_reader_base.h
new file mode 100644
index 0000000..9c3d805
--- /dev/null
+++ b/remoting/protocol/socket_reader_base.h
@@ -0,0 +1,43 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef REMOTING_PROTOCOL_SOCKET_READER_BASE_H_
+#define REMOTING_PROTOCOL_SOCKET_READER_BASE_H_
+
+#include "base/ref_counted.h"
+#include "net/base/completion_callback.h"
+#include "remoting/protocol/messages_decoder.h"
+
+namespace net {
+class Socket;
+} // namespace net
+
+namespace remoting {
+
+class SocketReaderBase {
+ public:
+ SocketReaderBase();
+ virtual ~SocketReaderBase();
+
+ // Stops reading. Must be called on the same thread as Init().
+ void Close();
+
+ protected:
+ void Init(net::Socket* socket);
+ virtual void OnDataReceived(net::IOBuffer* buffer, int data_size) = 0;
+
+ private:
+ void DoRead();
+ void OnRead(int result);
+ void HandleReadResult(int result);
+
+ net::Socket* socket_;
+ bool closed_;
+ scoped_refptr<net::IOBuffer> read_buffer_;
+ net::CompletionCallbackImpl<SocketReaderBase> read_callback_;
+};
+
+} // namespace remoting
+
+#endif // REMOTING_PROTOCOL_SOCKET_READER_BASE_H_
diff --git a/remoting/protocol/stream_reader.cc b/remoting/protocol/stream_reader.cc
index 63786e4..be6a237 100644
--- a/remoting/protocol/stream_reader.cc
+++ b/remoting/protocol/stream_reader.cc
@@ -4,64 +4,11 @@
#include "remoting/protocol/stream_reader.h"
-#include "base/message_loop.h"
+#include "net/base/completion_callback.h"
#include "net/base/io_buffer.h"
-#include "net/base/net_errors.h"
-#include "net/socket/socket.h"
-#include "remoting/protocol/chromoting_connection.h"
namespace remoting {
-namespace {
-int kReadBufferSize = 4096;
-} // namespace
-
-StreamReaderBase::StreamReaderBase()
- : socket_(NULL),
- closed_(false),
- ALLOW_THIS_IN_INITIALIZER_LIST(
- read_callback_(this, &StreamReaderBase::OnRead)) {
-}
-
-StreamReaderBase::~StreamReaderBase() { }
-
-void StreamReaderBase::Close() {
- closed_ = true;
-}
-
-void StreamReaderBase::Init(net::Socket* socket) {
- DCHECK(socket);
- socket_ = socket;
- DoRead();
-}
-
-void StreamReaderBase::DoRead() {
- while (true) {
- read_buffer_ = new net::IOBuffer(kReadBufferSize);
- int result = socket_->Read(
- read_buffer_, kReadBufferSize, &read_callback_);
- HandleReadResult(result);
- if (result < 0)
- break;
- }
-}
-
-void StreamReaderBase::OnRead(int result) {
- if (!closed_) {
- HandleReadResult(result);
- DoRead();
- }
-}
-
-void StreamReaderBase::HandleReadResult(int result) {
- if (result > 0) {
- OnDataReceived(read_buffer_, result);
- } else {
- if (result != net::ERR_IO_PENDING)
- LOG(ERROR) << "Read() returned error " << result;
- }
-}
-
// EventsStreamReader class.
EventsStreamReader::EventsStreamReader() { }
EventsStreamReader::~EventsStreamReader() { }
@@ -69,7 +16,7 @@ EventsStreamReader::~EventsStreamReader() { }
void EventsStreamReader::Init(net::Socket* socket,
OnMessageCallback* on_message_callback) {
on_message_callback_.reset(on_message_callback);
- StreamReaderBase::Init(socket);
+ SocketReaderBase::Init(socket);
}
void EventsStreamReader::OnDataReceived(net::IOBuffer* buffer, int data_size) {
@@ -88,7 +35,7 @@ VideoStreamReader::~VideoStreamReader() { }
void VideoStreamReader::Init(net::Socket* socket,
OnMessageCallback* on_message_callback) {
on_message_callback_.reset(on_message_callback);
- StreamReaderBase::Init(socket);
+ SocketReaderBase::Init(socket);
}
void VideoStreamReader::OnDataReceived(net::IOBuffer* buffer, int data_size) {
diff --git a/remoting/protocol/stream_reader.h b/remoting/protocol/stream_reader.h
index 6eac586..21c7bda 100644
--- a/remoting/protocol/stream_reader.h
+++ b/remoting/protocol/stream_reader.h
@@ -2,47 +2,16 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#ifndef REMOTING_PROTOCOL_MESSAGES_STREAM_READER_H_
-#define REMOTING_PROTOCOL_MESSAGES_STREAM_READER_H_
+#ifndef REMOTING_PROTOCOL_STREAM_READER_H_
+#define REMOTING_PROTOCOL_STREAM_READER_H_
#include "base/callback.h"
-#include "base/ref_counted.h"
#include "base/scoped_ptr.h"
-#include "net/base/completion_callback.h"
-#include "remoting/protocol/messages_decoder.h"
-
-namespace net {
-class Socket;
-} // namespace net
+#include "remoting/protocol/socket_reader_base.h"
namespace remoting {
-class ChromotingConnection;
-
-class StreamReaderBase {
- public:
- StreamReaderBase();
- virtual ~StreamReaderBase();
-
- // Stops reading. Must be called on the same thread as Init().
- void Close();
-
- protected:
- void Init(net::Socket* socket);
- virtual void OnDataReceived(net::IOBuffer* buffer, int data_size) = 0;
-
- private:
- void DoRead();
- void OnRead(int result);
- void HandleReadResult(int result);
-
- net::Socket* socket_;
- bool closed_;
- scoped_refptr<net::IOBuffer> read_buffer_;
- net::CompletionCallbackImpl<StreamReaderBase> read_callback_;
-};
-
-class EventsStreamReader : public StreamReaderBase {
+class EventsStreamReader : public SocketReaderBase {
public:
EventsStreamReader();
~EventsStreamReader();
@@ -67,7 +36,7 @@ class EventsStreamReader : public StreamReaderBase {
DISALLOW_COPY_AND_ASSIGN(EventsStreamReader);
};
-class VideoStreamReader : public StreamReaderBase {
+class VideoStreamReader : public SocketReaderBase {
public:
VideoStreamReader();
~VideoStreamReader();
@@ -94,4 +63,4 @@ class VideoStreamReader : public StreamReaderBase {
} // namespace remoting
-#endif // REMOTING_PROTOCOL_MESSAGES_STREAM_READER_H_
+#endif // REMOTING_PROTOCOL_STREAM_READER_H_
diff --git a/remoting/remoting.gyp b/remoting/remoting.gyp
index 916fb93..0dc310d 100644
--- a/remoting/remoting.gyp
+++ b/remoting/remoting.gyp
@@ -359,6 +359,14 @@
'protocol/jingle_chromoting_connection.h',
'protocol/jingle_chromoting_server.cc',
'protocol/jingle_chromoting_server.h',
+ 'protocol/rtp_reader.cc',
+ 'protocol/rtp_reader.h',
+ 'protocol/rtp_utils.cc',
+ 'protocol/rtp_utils.h',
+ 'protocol/rtp_writer.cc',
+ 'protocol/rtp_writer.h',
+ 'protocol/socket_reader_base.cc',
+ 'protocol/socket_reader_base.h',
'protocol/stream_reader.cc',
'protocol/stream_reader.h',
'protocol/stream_writer.cc',