diff options
Diffstat (limited to 'remoting')
-rw-r--r-- | remoting/protocol/buffered_socket_writer.cc | 123 | ||||
-rw-r--r-- | remoting/protocol/buffered_socket_writer.h | 66 | ||||
-rw-r--r-- | remoting/protocol/rtp_reader.cc | 37 | ||||
-rw-r--r-- | remoting/protocol/rtp_reader.h | 48 | ||||
-rw-r--r-- | remoting/protocol/rtp_utils.cc | 90 | ||||
-rw-r--r-- | remoting/protocol/rtp_utils.h | 40 | ||||
-rw-r--r-- | remoting/protocol/rtp_writer.cc | 88 | ||||
-rw-r--r-- | remoting/protocol/rtp_writer.h | 40 | ||||
-rw-r--r-- | remoting/protocol/socket_reader_base.cc | 64 | ||||
-rw-r--r-- | remoting/protocol/socket_reader_base.h | 43 | ||||
-rw-r--r-- | remoting/protocol/stream_reader.cc | 59 | ||||
-rw-r--r-- | remoting/protocol/stream_reader.h | 43 | ||||
-rw-r--r-- | remoting/remoting.gyp | 8 |
13 files changed, 607 insertions, 142 deletions
diff --git a/remoting/protocol/buffered_socket_writer.cc b/remoting/protocol/buffered_socket_writer.cc index 6c60a4a..0650cbe 100644 --- a/remoting/protocol/buffered_socket_writer.cc +++ b/remoting/protocol/buffered_socket_writer.cc @@ -9,63 +9,69 @@ namespace remoting { -BufferedSocketWriter::BufferedSocketWriter() - : socket_(NULL), - message_loop_(NULL), - buffer_size_(0), - write_pending_(false), - ALLOW_THIS_IN_INITIALIZER_LIST( - written_callback_(this, &BufferedSocketWriter::OnWritten)), - closed_(false) { +BufferedSocketWriterBase::BufferedSocketWriterBase() + : buffer_size_(0), + socket_(NULL), + message_loop_(NULL), + write_pending_(false), + ALLOW_THIS_IN_INITIALIZER_LIST( + written_callback_(this, &BufferedSocketWriterBase::OnWritten)), + closed_(false) { } -BufferedSocketWriter::~BufferedSocketWriter() { } +BufferedSocketWriterBase::~BufferedSocketWriterBase() { } -void BufferedSocketWriter::Init(net::Socket* socket, +void BufferedSocketWriterBase::Init(net::Socket* socket, WriteFailedCallback* callback) { AutoLock auto_lock(lock_); message_loop_ = MessageLoop::current(); socket_ = socket; } -bool BufferedSocketWriter::Write(scoped_refptr<net::IOBufferWithSize> data) { +bool BufferedSocketWriterBase::Write( + scoped_refptr<net::IOBufferWithSize> data) { AutoLock auto_lock(lock_); if (!socket_) return false; - queue_.push_back(data); + queue_.push(data); buffer_size_ += data->size(); message_loop_->PostTask( - FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriter::DoWrite)); + FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriterBase::DoWrite)); return true; } -void BufferedSocketWriter::DoWrite() { +void BufferedSocketWriterBase::DoWrite() { DCHECK_EQ(message_loop_, MessageLoop::current()); DCHECK(socket_); - // Don't try to write if the writer not initialized or closed, or - // there is already a write pending. - if (write_pending_ || closed_) + // Don't try to write if there is another write pending. + if (write_pending_) return; + // Don't write after Close(). + { + AutoLock auto_lock(lock_); + if (closed_) + return; + } + while (true) { - while (!current_buf_ || current_buf_->BytesRemaining() == 0) { + net::IOBuffer* current_packet; + int current_packet_size; + { AutoLock auto_lock(lock_); - if (queue_.empty()) - return; // Nothing to write. - current_buf_ = - new net::DrainableIOBuffer(queue_.front(), queue_.front()->size()); - queue_.pop_front(); + GetNextPacket_Locked(¤t_packet, ¤t_packet_size); } - int result = socket_->Write(current_buf_, current_buf_->BytesRemaining(), + // Return if the queue is empty. + if (!current_packet) + return; + + int result = socket_->Write(current_packet, current_packet_size, &written_callback_); if (result >= 0) { - { - AutoLock auto_lock(lock_); - buffer_size_ -= result; - } - current_buf_->DidConsume(result); + AutoLock auto_lock(lock_); + AdvanceBufferPosition_Locked(result); } else { if (result == net::ERR_IO_PENDING) { write_pending_ = true; @@ -79,38 +85,83 @@ void BufferedSocketWriter::DoWrite() { } } -void BufferedSocketWriter::OnWritten(int result) { +void BufferedSocketWriterBase::OnWritten(int result) { DCHECK_EQ(message_loop_, MessageLoop::current()); write_pending_ = false; if (result < 0) { if (write_failed_callback_.get()) write_failed_callback_->Run(result); + return; } { AutoLock auto_lock(lock_); - buffer_size_ -= result; + AdvanceBufferPosition_Locked(result); } - current_buf_->DidConsume(result); + // Schedule next write. message_loop_->PostTask( - FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriter::DoWrite)); + FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriterBase::DoWrite)); } -int BufferedSocketWriter::GetBufferSize() { +int BufferedSocketWriterBase::GetBufferSize() { AutoLock auto_lock(lock_); return buffer_size_; } -int BufferedSocketWriter::GetBufferChunks() { +int BufferedSocketWriterBase::GetBufferChunks() { AutoLock auto_lock(lock_); return queue_.size(); } -void BufferedSocketWriter::Close() { +void BufferedSocketWriterBase::Close() { AutoLock auto_lock(lock_); closed_ = true; } +BufferedSocketWriter::BufferedSocketWriter() { } +BufferedSocketWriter::~BufferedSocketWriter() { } + +void BufferedSocketWriter::GetNextPacket_Locked( + net::IOBuffer** buffer, int* size) { + while (!current_buf_ || current_buf_->BytesRemaining() == 0) { + if (queue_.empty()) { + *buffer = NULL; + return; // Nothing to write. + } + current_buf_ = + new net::DrainableIOBuffer(queue_.front(), queue_.front()->size()); + queue_.pop(); + } + + *buffer = current_buf_; + *size = current_buf_->BytesRemaining(); +} + +void BufferedSocketWriter::AdvanceBufferPosition_Locked(int written) { + buffer_size_ -= written; + current_buf_->DidConsume(written); +} + +BufferedDatagramWriter::BufferedDatagramWriter() { } +BufferedDatagramWriter::~BufferedDatagramWriter() { } + +void BufferedDatagramWriter::GetNextPacket_Locked( + net::IOBuffer** buffer, int* size) { + if (queue_.empty()) { + *buffer = NULL; + return; // Nothing to write. + } + *buffer = queue_.front(); + *size = queue_.front()->size(); +} + +void BufferedDatagramWriter::AdvanceBufferPosition_Locked(int written) { + DCHECK_EQ(written, queue_.front()->size()); + buffer_size_ -= queue_.front()->size(); + queue_.pop(); +} + + } // namespace remoting diff --git a/remoting/protocol/buffered_socket_writer.h b/remoting/protocol/buffered_socket_writer.h index 3d4709d..dd1cde8 100644 --- a/remoting/protocol/buffered_socket_writer.h +++ b/remoting/protocol/buffered_socket_writer.h @@ -5,7 +5,7 @@ #ifndef REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_ #define REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_ -#include <deque> +#include <queue> #include "base/lock.h" #include "base/ref_counted.h" @@ -20,13 +20,22 @@ class Socket; namespace remoting { -class BufferedSocketWriter - : public base::RefCountedThreadSafe<BufferedSocketWriter> { +// BufferedSocketWriter and BufferedDatagramWriter implement write data queue +// for stream and datagram sockets. BufferedSocketWriterBase is a base class +// that implements base functionality common for streams and datagrams. +// These classes are particularly useful when data comes from a thread +// that doesn't own the socket, as Write() can be called from any thread. +// Whenever new data is written it is just put in the queue, and then written +// on the thread that owns the socket. GetBufferChunks() and GetBufferSize() +// can be used to throttle writes. + +class BufferedSocketWriterBase + : public base::RefCountedThreadSafe<BufferedSocketWriterBase> { public: typedef Callback1<int>::Type WriteFailedCallback; - BufferedSocketWriter(); - virtual ~BufferedSocketWriter(); + explicit BufferedSocketWriterBase(); + virtual ~BufferedSocketWriterBase(); // Initializes the writer. Must be called on the thread that will be used // to access the socket in the future. |callback| will be called after each @@ -47,28 +56,59 @@ class BufferedSocketWriter // Stops writing and drops current buffers. void Close(); - private: - typedef std::deque<scoped_refptr<net::IOBufferWithSize> > DataQueue; + protected: + typedef std::queue<scoped_refptr<net::IOBufferWithSize> > DataQueue; + + DataQueue queue_; + int buffer_size_; + + // Following two methods must be implemented in child classes. + // GetNextPacket() returns next packet that needs to be written to the + // socket. |buffer| must be set to NULL if there is nothing left in the queue. + virtual void GetNextPacket_Locked(net::IOBuffer** buffer, int* size) = 0; + virtual void AdvanceBufferPosition_Locked(int written) = 0; + private: void DoWrite(); void OnWritten(int result); + // Must be locked when accessing |socket_|, |queue_| and |buffer_size_|; + Lock lock_; + net::Socket* socket_; MessageLoop* message_loop_; scoped_ptr<WriteFailedCallback> write_failed_callback_; - Lock lock_; - - DataQueue queue_; - int buffer_size_; - scoped_refptr<net::DrainableIOBuffer> current_buf_; bool write_pending_; - net::CompletionCallbackImpl<BufferedSocketWriter> written_callback_; + net::CompletionCallbackImpl<BufferedSocketWriterBase> written_callback_; bool closed_; }; +class BufferedSocketWriter : public BufferedSocketWriterBase { + public: + BufferedSocketWriter(); + virtual ~BufferedSocketWriter(); + + protected: + virtual void GetNextPacket_Locked(net::IOBuffer** buffer, int* size); + virtual void AdvanceBufferPosition_Locked(int written); + + private: + scoped_refptr<net::DrainableIOBuffer> current_buf_; +}; + +class BufferedDatagramWriter : public BufferedSocketWriterBase { + public: + BufferedDatagramWriter(); + virtual ~BufferedDatagramWriter(); + + protected: + virtual void GetNextPacket_Locked(net::IOBuffer** buffer, int* size); + virtual void AdvanceBufferPosition_Locked(int written); +}; + } // namespace remoting #endif // REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_ diff --git a/remoting/protocol/rtp_reader.cc b/remoting/protocol/rtp_reader.cc new file mode 100644 index 0000000..4bc0b6c --- /dev/null +++ b/remoting/protocol/rtp_reader.cc @@ -0,0 +1,37 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/rtp_reader.h" + +#include "net/base/completion_callback.h" +#include "net/base/io_buffer.h" + +namespace remoting { + +// RtpReader class. +RtpReader::RtpReader() { } +RtpReader::~RtpReader() { } + +void RtpReader::Init(net::Socket* socket, + OnMessageCallback* on_message_callback) { + on_message_callback_.reset(on_message_callback); + SocketReaderBase::Init(socket); +} + +void RtpReader::OnDataReceived(net::IOBuffer* buffer, int data_size) { + RtpPacket packet; + int header_size = UnpackRtpHeader(reinterpret_cast<uint8*>(buffer->data()), + data_size, &packet.header); + if (header_size < 0) { + LOG(WARNING) << "Received invalid RTP packet."; + return; + } + packet.data = buffer; + packet.payload = buffer->data() + header_size; + packet.payload_size = data_size - header_size; + + on_message_callback_->Run(&packet); +} + +} // namespace remoting diff --git a/remoting/protocol/rtp_reader.h b/remoting/protocol/rtp_reader.h new file mode 100644 index 0000000..4533756 --- /dev/null +++ b/remoting/protocol/rtp_reader.h @@ -0,0 +1,48 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_RTP_READER_H_ +#define REMOTING_PROTOCOL_RTP_READER_H_ + +#include "base/scoped_ptr.h" +#include "remoting/protocol/rtp_utils.h" +#include "remoting/protocol/socket_reader_base.h" + +namespace remoting { + +struct RtpPacket { + RtpHeader header; + scoped_refptr<net::IOBuffer> data; + char* payload; + int payload_size; +}; + +class RtpReader : public SocketReaderBase { + public: + RtpReader(); + ~RtpReader(); + + // The OnMessageCallback is called whenever a new message is received. + // Ownership of the message is passed the callback. + typedef Callback1<RtpPacket*>::Type OnMessageCallback; + + // Initialize the reader and start reading. Must be called on the thread + // |socket| belongs to. The callback will be called when a new message is + // received. RtpReader owns |on_message_callback|, doesn't own + // |socket|. + void Init(net::Socket* socket, OnMessageCallback* on_message_callback); + + protected: + virtual void OnDataReceived(net::IOBuffer* buffer, int data_size); + + private: + scoped_ptr<OnMessageCallback> on_message_callback_; + + DISALLOW_COPY_AND_ASSIGN(RtpReader); +}; + +} // namespace remoting + + +#endif // REMOTING_PROTOCOL_RTP_READER_H_ diff --git a/remoting/protocol/rtp_utils.cc b/remoting/protocol/rtp_utils.cc new file mode 100644 index 0000000..df10af2 --- /dev/null +++ b/remoting/protocol/rtp_utils.cc @@ -0,0 +1,90 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/rtp_utils.h" + +#include "base/logging.h" +#include "third_party/libjingle/source/talk/base/byteorder.h" + +using talk_base::GetBE16; +using talk_base::GetBE32; +using talk_base::SetBE16; +using talk_base::SetBE32; + +namespace remoting { + +namespace { +const int kRtpBaseHeaderSize = 12; +const uint8 kRtpVersionNumber = 2; +const int kRtpMaxSources = 16; +} // namespace + +int GetRtpHeaderSize(int sources) { + DCHECK_GE(sources, 0); + DCHECK_LT(sources, kRtpMaxSources); + return kRtpBaseHeaderSize + sources * 4; +} + +void PackRtpHeader(uint8* buffer, int buffer_size, + const RtpHeader& header) { + DCHECK_LT(header.sources, kRtpMaxSources); + DCHECK_LT(header.payload_type, 1 << 7); + CHECK_GT(buffer_size, GetRtpHeaderSize(header.sources)); + + buffer[0] = (kRtpVersionNumber << 6) + + ((uint8)header.padding << 5) + + ((uint8)header.extension << 4) + + header.sources; + buffer[1] = ((uint8)header.marker << 7) + + header.payload_type; + SetBE16(buffer + 2, header.sequence_number); + SetBE32(buffer + 4, header.timestamp); + SetBE32(buffer + 8, header.sync_source_id); + + for (int i = 0; i < header.sources; i++) { + SetBE32(buffer + i * 4 + 12, header.source_id[i]); + } +} + +static inline uint8 ExtractBits(uint8 byte, int bits_count, int shift) { + return (byte >> shift) && ((1 << bits_count) - 1); +} + +int UnpackRtpHeader(const uint8* buffer, int buffer_size, RtpHeader* header) { + DCHECK_LT(header->sources, 1 << 4); + DCHECK_LT(header->payload_type, 1 << 7); + + if (buffer_size < kRtpBaseHeaderSize) { + return -1; + } + + int version = ExtractBits(buffer[0], 2, 6); + if (version != kRtpVersionNumber) { + return -1; + } + + header->padding = ExtractBits(buffer[0], 1, 5) != 0; + header->extension = ExtractBits(buffer[0], 1, 4) != 0; + header->sources = ExtractBits(buffer[0], 4, 0); + + header->marker = ExtractBits(buffer[1], 1, 7) != 0; + header->sources = ExtractBits(buffer[1], 7, 0); + + header->sequence_number = GetBE16(buffer + 2); + header->timestamp = GetBE32(buffer + 4); + header->sync_source_id = GetBE32(buffer + 8); + + DCHECK_LE(header->sources, 16); + + if (buffer_size < GetRtpHeaderSize(header->sources)) { + return -1; + } + for (int i = 0; i < header->sources; i++) { + header->source_id[i] = GetBE32(buffer + i * 4 + 12); + } + + return GetRtpHeaderSize(header->sources); +} + +} // namespace remoting diff --git a/remoting/protocol/rtp_utils.h b/remoting/protocol/rtp_utils.h new file mode 100644 index 0000000..43f6c66 --- /dev/null +++ b/remoting/protocol/rtp_utils.h @@ -0,0 +1,40 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_RTP_UTILS_H_ +#define REMOTING_PROTOCOL_RTP_UTILS_H_ + +#include "base/basictypes.h" + +namespace remoting { + +struct RtpHeader { + // RTP version is always set to 2. + // version = 2 + bool padding; + bool extension; + uint8 sources; + bool marker; + uint8 payload_type; + uint16 sequence_number; + uint32 timestamp; + uint32 sync_source_id; + uint32 source_id[15]; +}; + +// Returns size of RTP header for the specified number of sources. +int GetRtpHeaderSize(int sources); + +// Packs RTP header into the buffer. +void PackRtpHeader(uint8* buffer, int buffer_size, + const RtpHeader& header); + +// Unpacks RTP header and stores unpacked values in |header|. If the header +// is not valid returns -1, otherwise returns size of the header. +int UnpackRtpHeader(const uint8* buffer, int buffer_size, + RtpHeader* header); + +} // namespace remoting + +#endif // REMOTING_PROTOCOL_RTP_UTILS_H_ diff --git a/remoting/protocol/rtp_writer.cc b/remoting/protocol/rtp_writer.cc new file mode 100644 index 0000000..a9df798 --- /dev/null +++ b/remoting/protocol/rtp_writer.cc @@ -0,0 +1,88 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/rtp_writer.h" + +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "remoting/protocol/rtp_utils.h" + +namespace remoting { + +namespace { +const int kMtu = 1200; +const uint8 kRtpPayloadTypePrivate = 96; +} + +RtpWriter::RtpWriter() + : rtp_socket_(NULL), + rtcp_socket_(NULL), + last_packet_number_(0) { +} + +RtpWriter::~RtpWriter() { } + +// Initializes the writer. Must be called on the thread the sockets belong +// to. +void RtpWriter::Init(net::Socket* rtp_socket, net::Socket* rtcp_socket) { + buffered_rtp_writer_ = new BufferedDatagramWriter(); + buffered_rtp_writer_->Init(rtp_socket, NULL); + rtp_socket_ = rtp_socket; + rtcp_socket_ = rtcp_socket; +} + +void RtpWriter::SendPacket(const char* data, int packet_size, + uint32 timestamp) { + RtpHeader header; + header.padding = false; + header.extension = false; + header.sources = 0; + header.payload_type = kRtpPayloadTypePrivate; + header.timestamp = timestamp; + + // TODO(sergeyu): RTP requires that SSRC is chosen randomly by each + // participant. There are only two participants in chromoting session, + // so SSRC isn't useful. Implement it in future if neccessary. + header.sync_source_id = 0; + + // TODO(sergeyu): Add VP8 payload header. + + int position = 0; + while (position < packet_size) { + // Allocate buffer. + int size = std::max(kMtu, packet_size - position); + int header_size = GetRtpHeaderSize(header.sources) + size; + int total_size = size + header_size; + net::IOBufferWithSize* buffer = new net::IOBufferWithSize(total_size); + + // Set marker if this is the last frame. + header.marker = (position + size) == packet_size; + + // TODO(sergeyu): Handle sequence number wrapping. + header.sequence_number = last_packet_number_; + ++last_packet_number_; + + // Generate header. + PackRtpHeader(reinterpret_cast<uint8*>(buffer->data()), buffer->size(), + header); + + // Copy data to the buffer. + memcpy(buffer->data() + header_size, data + position, size); + + // Send it. + buffered_rtp_writer_->Write(buffer); + + position += size; + } + + DCHECK_EQ(position, packet_size); +} + +// Stop writing and drop pending data. Must be called from the same thread as +// Init(). +void RtpWriter::Close() { + buffered_rtp_writer_->Close(); +} + +} // namespace remoting diff --git a/remoting/protocol/rtp_writer.h b/remoting/protocol/rtp_writer.h new file mode 100644 index 0000000..5ba7501 --- /dev/null +++ b/remoting/protocol/rtp_writer.h @@ -0,0 +1,40 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_RTP_WRITER_H_ +#define REMOTING_PROTOCOL_RTP_WRITER_H_ + +#include "net/socket/socket.h" +#include "remoting/protocol/buffered_socket_writer.h" + +namespace remoting { + +class RtpWriter { + public: + RtpWriter(); + virtual ~RtpWriter(); + + // Initializes the writer. Must be called on the thread the sockets belong + // to. + void Init(net::Socket* rtp_socket, net::Socket* rtcp_socket); + + void SendPacket(const char* buffer, int packet_size, + uint32 timestamp); + + // Stop writing and drop pending data. Must be called from the same thread as + // Init(). + void Close(); + + private: + net::Socket* rtp_socket_; + net::Socket* rtcp_socket_; + + uint32 last_packet_number_; + + scoped_refptr<BufferedDatagramWriter> buffered_rtp_writer_; +}; + +} // namespace remoting + +#endif // REMOTING_PROTOCOL_RTP_WRITER_H_ diff --git a/remoting/protocol/socket_reader_base.cc b/remoting/protocol/socket_reader_base.cc new file mode 100644 index 0000000..41648bc --- /dev/null +++ b/remoting/protocol/socket_reader_base.cc @@ -0,0 +1,64 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/socket_reader_base.h" + +#include "net/base/completion_callback.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "net/socket/socket.h" + +namespace remoting { + +namespace { +int kReadBufferSize = 4096; +} // namespace + +SocketReaderBase::SocketReaderBase() + : socket_(NULL), + closed_(false), + ALLOW_THIS_IN_INITIALIZER_LIST( + read_callback_(this, &SocketReaderBase::OnRead)) { +} + +SocketReaderBase::~SocketReaderBase() { } + +void SocketReaderBase::Close() { + closed_ = true; +} + +void SocketReaderBase::Init(net::Socket* socket) { + DCHECK(socket); + socket_ = socket; + DoRead(); +} + +void SocketReaderBase::DoRead() { + while (true) { + read_buffer_ = new net::IOBuffer(kReadBufferSize); + int result = socket_->Read( + read_buffer_, kReadBufferSize, &read_callback_); + HandleReadResult(result); + if (result < 0) + break; + } +} + +void SocketReaderBase::OnRead(int result) { + if (!closed_) { + HandleReadResult(result); + DoRead(); + } +} + +void SocketReaderBase::HandleReadResult(int result) { + if (result > 0) { + OnDataReceived(read_buffer_, result); + } else { + if (result != net::ERR_IO_PENDING) + LOG(ERROR) << "Read() returned error " << result; + } +} + +} // namespace remoting diff --git a/remoting/protocol/socket_reader_base.h b/remoting/protocol/socket_reader_base.h new file mode 100644 index 0000000..9c3d805 --- /dev/null +++ b/remoting/protocol/socket_reader_base.h @@ -0,0 +1,43 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_SOCKET_READER_BASE_H_ +#define REMOTING_PROTOCOL_SOCKET_READER_BASE_H_ + +#include "base/ref_counted.h" +#include "net/base/completion_callback.h" +#include "remoting/protocol/messages_decoder.h" + +namespace net { +class Socket; +} // namespace net + +namespace remoting { + +class SocketReaderBase { + public: + SocketReaderBase(); + virtual ~SocketReaderBase(); + + // Stops reading. Must be called on the same thread as Init(). + void Close(); + + protected: + void Init(net::Socket* socket); + virtual void OnDataReceived(net::IOBuffer* buffer, int data_size) = 0; + + private: + void DoRead(); + void OnRead(int result); + void HandleReadResult(int result); + + net::Socket* socket_; + bool closed_; + scoped_refptr<net::IOBuffer> read_buffer_; + net::CompletionCallbackImpl<SocketReaderBase> read_callback_; +}; + +} // namespace remoting + +#endif // REMOTING_PROTOCOL_SOCKET_READER_BASE_H_ diff --git a/remoting/protocol/stream_reader.cc b/remoting/protocol/stream_reader.cc index 63786e4..be6a237 100644 --- a/remoting/protocol/stream_reader.cc +++ b/remoting/protocol/stream_reader.cc @@ -4,64 +4,11 @@ #include "remoting/protocol/stream_reader.h" -#include "base/message_loop.h" +#include "net/base/completion_callback.h" #include "net/base/io_buffer.h" -#include "net/base/net_errors.h" -#include "net/socket/socket.h" -#include "remoting/protocol/chromoting_connection.h" namespace remoting { -namespace { -int kReadBufferSize = 4096; -} // namespace - -StreamReaderBase::StreamReaderBase() - : socket_(NULL), - closed_(false), - ALLOW_THIS_IN_INITIALIZER_LIST( - read_callback_(this, &StreamReaderBase::OnRead)) { -} - -StreamReaderBase::~StreamReaderBase() { } - -void StreamReaderBase::Close() { - closed_ = true; -} - -void StreamReaderBase::Init(net::Socket* socket) { - DCHECK(socket); - socket_ = socket; - DoRead(); -} - -void StreamReaderBase::DoRead() { - while (true) { - read_buffer_ = new net::IOBuffer(kReadBufferSize); - int result = socket_->Read( - read_buffer_, kReadBufferSize, &read_callback_); - HandleReadResult(result); - if (result < 0) - break; - } -} - -void StreamReaderBase::OnRead(int result) { - if (!closed_) { - HandleReadResult(result); - DoRead(); - } -} - -void StreamReaderBase::HandleReadResult(int result) { - if (result > 0) { - OnDataReceived(read_buffer_, result); - } else { - if (result != net::ERR_IO_PENDING) - LOG(ERROR) << "Read() returned error " << result; - } -} - // EventsStreamReader class. EventsStreamReader::EventsStreamReader() { } EventsStreamReader::~EventsStreamReader() { } @@ -69,7 +16,7 @@ EventsStreamReader::~EventsStreamReader() { } void EventsStreamReader::Init(net::Socket* socket, OnMessageCallback* on_message_callback) { on_message_callback_.reset(on_message_callback); - StreamReaderBase::Init(socket); + SocketReaderBase::Init(socket); } void EventsStreamReader::OnDataReceived(net::IOBuffer* buffer, int data_size) { @@ -88,7 +35,7 @@ VideoStreamReader::~VideoStreamReader() { } void VideoStreamReader::Init(net::Socket* socket, OnMessageCallback* on_message_callback) { on_message_callback_.reset(on_message_callback); - StreamReaderBase::Init(socket); + SocketReaderBase::Init(socket); } void VideoStreamReader::OnDataReceived(net::IOBuffer* buffer, int data_size) { diff --git a/remoting/protocol/stream_reader.h b/remoting/protocol/stream_reader.h index 6eac586..21c7bda 100644 --- a/remoting/protocol/stream_reader.h +++ b/remoting/protocol/stream_reader.h @@ -2,47 +2,16 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#ifndef REMOTING_PROTOCOL_MESSAGES_STREAM_READER_H_ -#define REMOTING_PROTOCOL_MESSAGES_STREAM_READER_H_ +#ifndef REMOTING_PROTOCOL_STREAM_READER_H_ +#define REMOTING_PROTOCOL_STREAM_READER_H_ #include "base/callback.h" -#include "base/ref_counted.h" #include "base/scoped_ptr.h" -#include "net/base/completion_callback.h" -#include "remoting/protocol/messages_decoder.h" - -namespace net { -class Socket; -} // namespace net +#include "remoting/protocol/socket_reader_base.h" namespace remoting { -class ChromotingConnection; - -class StreamReaderBase { - public: - StreamReaderBase(); - virtual ~StreamReaderBase(); - - // Stops reading. Must be called on the same thread as Init(). - void Close(); - - protected: - void Init(net::Socket* socket); - virtual void OnDataReceived(net::IOBuffer* buffer, int data_size) = 0; - - private: - void DoRead(); - void OnRead(int result); - void HandleReadResult(int result); - - net::Socket* socket_; - bool closed_; - scoped_refptr<net::IOBuffer> read_buffer_; - net::CompletionCallbackImpl<StreamReaderBase> read_callback_; -}; - -class EventsStreamReader : public StreamReaderBase { +class EventsStreamReader : public SocketReaderBase { public: EventsStreamReader(); ~EventsStreamReader(); @@ -67,7 +36,7 @@ class EventsStreamReader : public StreamReaderBase { DISALLOW_COPY_AND_ASSIGN(EventsStreamReader); }; -class VideoStreamReader : public StreamReaderBase { +class VideoStreamReader : public SocketReaderBase { public: VideoStreamReader(); ~VideoStreamReader(); @@ -94,4 +63,4 @@ class VideoStreamReader : public StreamReaderBase { } // namespace remoting -#endif // REMOTING_PROTOCOL_MESSAGES_STREAM_READER_H_ +#endif // REMOTING_PROTOCOL_STREAM_READER_H_ diff --git a/remoting/remoting.gyp b/remoting/remoting.gyp index 916fb93..0dc310d 100644 --- a/remoting/remoting.gyp +++ b/remoting/remoting.gyp @@ -359,6 +359,14 @@ 'protocol/jingle_chromoting_connection.h', 'protocol/jingle_chromoting_server.cc', 'protocol/jingle_chromoting_server.h', + 'protocol/rtp_reader.cc', + 'protocol/rtp_reader.h', + 'protocol/rtp_utils.cc', + 'protocol/rtp_utils.h', + 'protocol/rtp_writer.cc', + 'protocol/rtp_writer.h', + 'protocol/socket_reader_base.cc', + 'protocol/socket_reader_base.h', 'protocol/stream_reader.cc', 'protocol/stream_reader.h', 'protocol/stream_writer.cc', |