diff options
author | sergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-09-28 20:59:56 +0000 |
---|---|---|
committer | sergeyu@chromium.org <sergeyu@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-09-28 20:59:56 +0000 |
commit | 2fc3b4267fd4f28dd864c2858cea0e84f55cbb0a (patch) | |
tree | 726f9dc89d195927c151dd969c68051b0c135a02 /remoting | |
parent | e393b9fc0267deaf55f3624aa7b0bfb08707dfda (diff) | |
download | chromium_src-2fc3b4267fd4f28dd864c2858cea0e84f55cbb0a.zip chromium_src-2fc3b4267fd4f28dd864c2858cea0e84f55cbb0a.tar.gz chromium_src-2fc3b4267fd4f28dd864c2858cea0e84f55cbb0a.tar.bz2 |
Implemented basic support Chromotocol connection.
New code supports multiple PseudoTCP and UDP channels. Client and host still use old JjngleChannel for connection.
BUG=53986
TEST=None
Review URL: http://codereview.chromium.org/3319021
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@60842 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'remoting')
-rw-r--r-- | remoting/jingle_glue/channel_socket_adapter.cc | 171 | ||||
-rw-r--r-- | remoting/jingle_glue/channel_socket_adapter.h | 67 | ||||
-rw-r--r-- | remoting/jingle_glue/jingle_client.cc | 17 | ||||
-rw-r--r-- | remoting/jingle_glue/jingle_client.h | 7 | ||||
-rw-r--r-- | remoting/jingle_glue/ssl_socket_adapter.cc | 47 | ||||
-rw-r--r-- | remoting/jingle_glue/stream_socket_adapter.cc | 195 | ||||
-rw-r--r-- | remoting/jingle_glue/stream_socket_adapter.h | 70 | ||||
-rw-r--r-- | remoting/jingle_glue/utils.cc | 53 | ||||
-rw-r--r-- | remoting/jingle_glue/utils.h | 15 | ||||
-rw-r--r-- | remoting/protocol/chromoting_connection.h | 61 | ||||
-rw-r--r-- | remoting/protocol/chromoting_server.h | 40 | ||||
-rw-r--r-- | remoting/protocol/jingle_chromoting_connection.cc | 233 | ||||
-rw-r--r-- | remoting/protocol/jingle_chromoting_connection.h | 97 | ||||
-rw-r--r-- | remoting/protocol/jingle_chromoting_server.cc | 216 | ||||
-rw-r--r-- | remoting/protocol/jingle_chromoting_server.h | 111 | ||||
-rw-r--r-- | remoting/protocol/protocol_test_client.cc | 380 | ||||
-rw-r--r-- | remoting/remoting.gyp | 38 |
17 files changed, 1771 insertions, 47 deletions
diff --git a/remoting/jingle_glue/channel_socket_adapter.cc b/remoting/jingle_glue/channel_socket_adapter.cc new file mode 100644 index 0000000..583eed1 --- /dev/null +++ b/remoting/jingle_glue/channel_socket_adapter.cc @@ -0,0 +1,171 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/jingle_glue/channel_socket_adapter.h" + +#include <limits> + +#include "base/logging.h" +#include "base/message_loop.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "remoting/jingle_glue/utils.h" +#include "third_party/libjingle/source/talk/p2p/base/transportchannel.h" + +namespace remoting { + +TransportChannelSocketAdapter::TransportChannelSocketAdapter( + cricket::TransportChannel* channel) + : channel_(channel), + read_pending_(false), + write_pending_(false), + closed_error_code_(net::OK) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + DCHECK(channel_); + + channel_->SignalReadPacket.connect( + this, &TransportChannelSocketAdapter::OnNewPacket); + channel_->SignalWritableState.connect( + this, &TransportChannelSocketAdapter::OnWritableState); + channel_->SignalDestroyed.connect( + this, &TransportChannelSocketAdapter::OnChannelDestroyed); +} + +TransportChannelSocketAdapter::~TransportChannelSocketAdapter() { +} + +int TransportChannelSocketAdapter::Read( + net::IOBuffer* buf, int buffer_size, net::CompletionCallback* callback) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + DCHECK(buf); + CHECK(!read_pending_); + + if (!channel_) { + DCHECK(closed_error_code_ != net::OK); + return closed_error_code_; + } + + read_callback_ = callback; + read_buffer_ = buf; + read_buffer_size_ = buffer_size; + read_pending_ = true; + + return net::ERR_IO_PENDING; +} + +int TransportChannelSocketAdapter::Write( + net::IOBuffer* buffer, int buffer_size, net::CompletionCallback* callback) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + DCHECK(buffer); + CHECK(!write_pending_); + + if (!channel_) { + DCHECK(closed_error_code_ != net::OK); + return closed_error_code_; + } + + int result = channel_->SendPacket(buffer->data(), buffer_size); + if (result < 0) { + result = MapPosixToChromeError(channel_->GetError()); + if (result == net::ERR_IO_PENDING) { + write_pending_ = true; + write_callback_ = callback; + write_buffer_ = buffer; + write_buffer_size_ = buffer_size; + } + } + return result; +} + +bool TransportChannelSocketAdapter::SetReceiveBufferSize(int32 size) { + NOTIMPLEMENTED(); + return false; +} + +bool TransportChannelSocketAdapter::SetSendBufferSize(int32 size) { + NOTIMPLEMENTED(); + return false; +} + +void TransportChannelSocketAdapter::Close(int error_code) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + + if (!channel_) // Already closed. + return; + + DCHECK(error_code != net::OK); + closed_error_code_ = error_code; + channel_->SignalReadPacket.disconnect(this); + channel_->SignalDestroyed.disconnect(this); + channel_ = NULL; + + if (read_pending_) { + net::CompletionCallback* callback = read_callback_; + read_pending_ = false; + read_buffer_ = NULL; + callback->Run(error_code); + } + + if (write_pending_) { + net::CompletionCallback* callback = write_callback_; + write_pending_ = false; + write_buffer_ = NULL; + callback->Run(error_code); + } +} + +void TransportChannelSocketAdapter::OnNewPacket( + cricket::TransportChannel* channel, const char* data, size_t data_size) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + DCHECK_EQ(channel, channel_); + if (read_pending_) { + DCHECK(read_buffer_); + CHECK_LT(data_size, static_cast<size_t>(std::numeric_limits<int>::max())); + + if (read_buffer_size_ < static_cast<int>(data_size)) { + LOG(WARNING) << "Data buffer is smaller than the received packet. " + << "Dropping the data that doesn't fit."; + data_size = read_buffer_size_; + } + + memcpy(read_buffer_->data(), data, data_size); + + net::CompletionCallback* callback = read_callback_; + read_pending_ = false; + read_buffer_ = NULL; + + callback->Run(data_size); + } else { + LOG(WARNING) + << "Data was received without a callback. Dropping the packet."; + } +} + +void TransportChannelSocketAdapter::OnWritableState( + cricket::TransportChannel* channel) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + // Try to send the packet if there is a pending write. + if (write_pending_) { + int result = channel_->SendPacket(write_buffer_->data(), + write_buffer_size_); + if (result < 0) + result = MapPosixToChromeError(channel_->GetError()); + + if (result != net::ERR_IO_PENDING) { + net::CompletionCallback* callback = write_callback_; + write_pending_ = false; + write_buffer_ = NULL; + callback->Run(result); + } + } +} + +void TransportChannelSocketAdapter::OnChannelDestroyed( + cricket::TransportChannel* channel) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + DCHECK_EQ(channel, channel_); + Close(net::ERR_CONNECTION_ABORTED); +} + +} // namespace remoting diff --git a/remoting/jingle_glue/channel_socket_adapter.h b/remoting/jingle_glue/channel_socket_adapter.h new file mode 100644 index 0000000..1b808f6 --- /dev/null +++ b/remoting/jingle_glue/channel_socket_adapter.h @@ -0,0 +1,67 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_JINGLE_GLUE_CHANNEL_SOCKET_ADAPTER_H_ +#define REMOTING_JINGLE_GLUE_CHANNEL_SOCKET_ADAPTER_H_ + +#include "net/socket/socket.h" +#include "third_party/libjingle/source/talk/base/socketaddress.h" +#include "third_party/libjingle/source/talk/base/sigslot.h" + +namespace cricket { +class TransportChannel; +} // namespace cricket + +namespace remoting { + +// TransportChannelSocketAdapter implements net::Socket interface on +// top of libjingle's TransportChannel. It is used by JingleChromotingConnection +// to provide net::Socket interface for channels. +class TransportChannelSocketAdapter : public net::Socket, + public sigslot::has_slots<> { + public: + // TransportChannel object is always owned by the corresponding session. + explicit TransportChannelSocketAdapter(cricket::TransportChannel* channel); + virtual ~TransportChannelSocketAdapter(); + + // Closes the stream. |error_code| specifies error code that will + // be returned by Read() and Write() after the stream is closed. + // Must be called before the session and the channel are destroyed. + void Close(int error_code); + + // Socket interface. + virtual int Read(net::IOBuffer* buf, int buf_len, + net::CompletionCallback* callback); + virtual int Write(net::IOBuffer* buf, int buf_len, + net::CompletionCallback* callback); + + virtual bool SetReceiveBufferSize(int32 size); + virtual bool SetSendBufferSize(int32 size); + + private: + void OnNewPacket(cricket::TransportChannel* channel, + const char* data, size_t data_size); + void OnWritableState(cricket::TransportChannel* channel); + void OnChannelDestroyed(cricket::TransportChannel* channel); + + cricket::TransportChannel* channel_; + + bool read_pending_; + net::CompletionCallback* read_callback_; // Not owned. + scoped_refptr<net::IOBuffer> read_buffer_; + int read_buffer_size_; + + bool write_pending_; + net::CompletionCallback* write_callback_; // Not owned. + scoped_refptr<net::IOBuffer> write_buffer_; + int write_buffer_size_; + + int closed_error_code_; + + DISALLOW_COPY_AND_ASSIGN(TransportChannelSocketAdapter); +}; + +} // namespace remoting + +#endif // REMOTING_JINGLE_GLUE_CHANNEL_SOCKET_ADAPTER_H_ diff --git a/remoting/jingle_glue/jingle_client.cc b/remoting/jingle_glue/jingle_client.cc index d3ca67e..c6206f7 100644 --- a/remoting/jingle_glue/jingle_client.cc +++ b/remoting/jingle_glue/jingle_client.cc @@ -200,6 +200,11 @@ MessageLoop* JingleClient::message_loop() { return thread_->message_loop(); } +cricket::SessionManager* JingleClient::session_manager() { + DCHECK_EQ(message_loop(), MessageLoop::current()); + return session_manager_.get(); +} + void JingleClient::OnConnectionStateChanged(buzz::XmppEngine::State state) { switch (state) { case buzz::XmppEngine::STATE_START: @@ -209,10 +214,7 @@ void JingleClient::OnConnectionStateChanged(buzz::XmppEngine::State state) { UpdateState(CONNECTING); break; case buzz::XmppEngine::STATE_OPEN: - { - AutoLock auto_lock(full_jid_lock_); - full_jid_ = client_->jid().Str(); - } + SetFullJid(client_->jid().Str()); UpdateState(CONNECTED); break; case buzz::XmppEngine::STATE_CLOSED: @@ -248,10 +250,17 @@ void JingleClient::OnIncomingTunnel( } } +void JingleClient::SetFullJid(const std::string& full_jid) { + AutoLock auto_lock(full_jid_lock_); + full_jid_ = full_jid; +} + void JingleClient::UpdateState(State new_state) { if (new_state != state_) { state_ = new_state; { + // We have to have the lock held, otherwise we cannot be sure that + // the client hasn't been closed when we call the callback. AutoLock auto_lock(state_lock_); if (!closed_) callback_->OnStateChange(this, new_state); diff --git a/remoting/jingle_glue/jingle_client.h b/remoting/jingle_glue/jingle_client.h index 9d7b844..e9bf84d 100644 --- a/remoting/jingle_glue/jingle_client.h +++ b/remoting/jingle_glue/jingle_client.h @@ -7,7 +7,6 @@ #include <string> -#include "base/waitable_event.h" #include "remoting/jingle_glue/jingle_channel.h" #include "third_party/libjingle/source/talk/xmpp/xmppclient.h" @@ -106,6 +105,10 @@ class JingleClient : public base::RefCountedThreadSafe<JingleClient>, // Message loop used by this object to execute tasks. MessageLoop* message_loop(); + // The session manager used by this client. Must be called from the + // jingle thread only. Returns NULL if the client is not active. + cricket::SessionManager* session_manager(); + private: friend class HeartbeatSenderTest; friend class JingleClientTest; @@ -127,6 +130,8 @@ class JingleClient : public base::RefCountedThreadSafe<JingleClient>, // Used by Close(). void DoClose(); + void SetFullJid(const std::string& full_jid); + // Updates current state of the connection. Must be called only in // the jingle thread. void UpdateState(State new_state); diff --git a/remoting/jingle_glue/ssl_socket_adapter.cc b/remoting/jingle_glue/ssl_socket_adapter.cc index 3ea404b..6eca04b 100644 --- a/remoting/jingle_glue/ssl_socket_adapter.cc +++ b/remoting/jingle_glue/ssl_socket_adapter.cc @@ -12,47 +12,10 @@ #include "net/base/sys_addrinfo.h" #include "net/socket/client_socket_factory.h" #include "net/url_request/url_request_context.h" +#include "remoting/jingle_glue/utils.h" namespace remoting { -namespace { - -// Convert values from <errno.h> to values from "net/base/net_errors.h" -int MapPosixError(int err) { - // There are numerous posix error codes, but these are the ones we thus far - // find interesting. - switch (err) { - case EAGAIN: -#if EWOULDBLOCK != EAGAIN - case EWOULDBLOCK: -#endif - return net::ERR_IO_PENDING; - case ENETDOWN: - return net::ERR_INTERNET_DISCONNECTED; - case ETIMEDOUT: - return net::ERR_TIMED_OUT; - case ECONNRESET: - case ENETRESET: // Related to keep-alive - return net::ERR_CONNECTION_RESET; - case ECONNABORTED: - return net::ERR_CONNECTION_ABORTED; - case ECONNREFUSED: - return net::ERR_CONNECTION_REFUSED; - case EHOSTUNREACH: - case ENETUNREACH: - return net::ERR_ADDRESS_UNREACHABLE; - case EADDRNOTAVAIL: - return net::ERR_ADDRESS_INVALID; - case 0: - return net::OK; - default: - LOG(WARNING) << "Unknown error " << err << " mapped to net::ERR_FAILED"; - return net::ERR_FAILED; - } -} - -} // namespace - SSLSocketAdapter* SSLSocketAdapter::Create(AsyncSocket* socket) { return new SSLSocketAdapter(socket); } @@ -290,7 +253,7 @@ int TransportSocket::Read(net::IOBuffer* buf, int buf_len, DCHECK(!read_buffer_.get()); int result = socket_->Recv(buf->data(), buf_len); if (result < 0) { - result = MapPosixError(socket_->GetError()); + result = MapPosixToChromeError(socket_->GetError()); if (result == net::ERR_IO_PENDING) { read_callback_ = callback; read_buffer_ = buf; @@ -309,7 +272,7 @@ int TransportSocket::Write(net::IOBuffer* buf, int buf_len, DCHECK(!write_buffer_.get()); int result = socket_->Send(buf->data(), buf_len); if (result < 0) { - result = MapPosixError(socket_->GetError()); + result = MapPosixToChromeError(socket_->GetError()); if (result == net::ERR_IO_PENDING) { write_callback_ = callback; write_buffer_ = buf; @@ -344,7 +307,7 @@ void TransportSocket::OnReadEvent(talk_base::AsyncSocket* socket) { int result = socket_->Recv(buffer->data(), buffer_len); if (result < 0) { - result = MapPosixError(socket_->GetError()); + result = MapPosixToChromeError(socket_->GetError()); if (result == net::ERR_IO_PENDING) { read_callback_ = callback; read_buffer_ = buffer; @@ -370,7 +333,7 @@ void TransportSocket::OnWriteEvent(talk_base::AsyncSocket* socket) { int result = socket_->Send(buffer->data(), buffer_len); if (result < 0) { - result = MapPosixError(socket_->GetError()); + result = MapPosixToChromeError(socket_->GetError()); if (result == net::ERR_IO_PENDING) { write_callback_ = callback; write_buffer_ = buffer; diff --git a/remoting/jingle_glue/stream_socket_adapter.cc b/remoting/jingle_glue/stream_socket_adapter.cc new file mode 100644 index 0000000..cb84941 --- /dev/null +++ b/remoting/jingle_glue/stream_socket_adapter.cc @@ -0,0 +1,195 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/jingle_glue/stream_socket_adapter.h" + +#include "base/logging.h" +#include "base/message_loop.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "remoting/jingle_glue/utils.h" +#include "third_party/libjingle/source/talk/base/stream.h" + +namespace remoting { + +StreamSocketAdapter::StreamSocketAdapter(talk_base::StreamInterface* stream) + : stream_(stream), + read_pending_(false), + write_pending_(false), + closed_error_code_(net::OK) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + DCHECK(stream); + stream_->SignalEvent.connect(this, &StreamSocketAdapter::OnStreamEvent); +} + +StreamSocketAdapter::~StreamSocketAdapter() { +} + +int StreamSocketAdapter::Read( + net::IOBuffer* buffer, int buffer_size, net::CompletionCallback* callback) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + DCHECK(buffer); + CHECK(!read_pending_); + + if (!stream_.get()) { + DCHECK(closed_error_code_ != net::OK); + return closed_error_code_; + } + + int result = ReadStream(buffer, buffer_size); + if (result == net::ERR_CONNECTION_CLOSED && + stream_->GetState() == talk_base::SS_OPENING) + result = net::ERR_IO_PENDING; + if (result == net::ERR_IO_PENDING) { + read_pending_ = true; + read_callback_ = callback; + read_buffer_ = buffer; + read_buffer_size_ = buffer_size; + } + return result; +} + +int StreamSocketAdapter::Write( + net::IOBuffer* buffer, int buffer_size, net::CompletionCallback* callback) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + DCHECK(buffer); + CHECK(!write_pending_); + + if (!stream_.get()) { + DCHECK(closed_error_code_ != net::OK); + return closed_error_code_; + } + + int result = WriteStream(buffer, buffer_size); + if (result == net::ERR_CONNECTION_CLOSED && + stream_->GetState() == talk_base::SS_OPENING) + result = net::ERR_IO_PENDING; + if (result == net::ERR_IO_PENDING) { + write_pending_ = true; + write_callback_ = callback; + write_buffer_ = buffer; + write_buffer_size_ = buffer_size; + } + return result; +} + +bool StreamSocketAdapter::SetReceiveBufferSize(int32 size) { + NOTIMPLEMENTED(); + return false; +} + +bool StreamSocketAdapter::SetSendBufferSize(int32 size) { + NOTIMPLEMENTED(); + return false; +} + +void StreamSocketAdapter::Close(int error_code) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + + if (!stream_.get()) // Already closed. + return; + + DCHECK(error_code != net::OK); + closed_error_code_ = error_code; + stream_->SignalEvent.disconnect(this); + stream_->Close(); + stream_.reset(NULL); + + if (read_pending_) { + net::CompletionCallback* callback = read_callback_; + read_pending_ = false; + read_buffer_ = NULL; + callback->Run(error_code); + } + + if (write_pending_) { + net::CompletionCallback* callback = write_callback_; + write_pending_ = false; + write_buffer_ = NULL; + callback->Run(error_code); + } +} + +void StreamSocketAdapter::OnStreamEvent( + talk_base::StreamInterface* stream, int events, int error) { + DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()); + + if (events & talk_base::SE_WRITE) + DoWrite(); + + if (events & talk_base::SE_READ) + DoRead(); +} + +void StreamSocketAdapter::DoWrite() { + // Write if there is a pending read. + if (write_buffer_) { + int result = WriteStream(write_buffer_, write_buffer_size_); + if (result != net::ERR_IO_PENDING) { + net::CompletionCallback* callback = write_callback_; + write_pending_ = false; + write_buffer_ = NULL; + callback->Run(result); + } + } +} + +void StreamSocketAdapter::DoRead() { + // Read if there is a pending read. + if (read_pending_) { + int result = ReadStream(read_buffer_, read_buffer_size_); + if (result != net::ERR_IO_PENDING) { + net::CompletionCallback* callback = read_callback_;\ + read_pending_ = false; + read_buffer_ = NULL; + callback->Run(result); + } + } +} + +int StreamSocketAdapter::ReadStream(net::IOBuffer* buffer, int buffer_size) { + size_t bytes_read; + int error; + talk_base::StreamResult result = stream_->Read( + buffer->data(), buffer_size, &bytes_read, &error); + switch (result) { + case talk_base::SR_SUCCESS: + return bytes_read; + + case talk_base::SR_BLOCK: + return net::ERR_IO_PENDING; + + case talk_base::SR_EOS: + return net::ERR_CONNECTION_CLOSED; + + case talk_base::SR_ERROR: + return MapPosixToChromeError(error); + } + NOTREACHED(); + return net::ERR_FAILED; +} + +int StreamSocketAdapter::WriteStream(net::IOBuffer* buffer, int buffer_size) { + size_t bytes_written; + int error; + talk_base::StreamResult result = stream_->Write( + buffer->data(), buffer_size, &bytes_written, &error); + switch (result) { + case talk_base::SR_SUCCESS: + return bytes_written; + + case talk_base::SR_BLOCK: + return net::ERR_IO_PENDING; + + case talk_base::SR_EOS: + return net::ERR_CONNECTION_CLOSED; + + case talk_base::SR_ERROR: + return MapPosixToChromeError(error); + } + NOTREACHED(); + return net::ERR_FAILED; +} + +} // namespace remoting diff --git a/remoting/jingle_glue/stream_socket_adapter.h b/remoting/jingle_glue/stream_socket_adapter.h new file mode 100644 index 0000000..53ac76b --- /dev/null +++ b/remoting/jingle_glue/stream_socket_adapter.h @@ -0,0 +1,70 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_JINGLE_GLUE_STREAM_SOCKET_ADAPTER_H_ +#define REMOTING_JINGLE_GLUE_STREAM_SOCKET_ADAPTER_H_ + +#include "base/scoped_ptr.h" +#include "net/socket/socket.h" +#include "third_party/libjingle/source/talk/base/sigslot.h" + +namespace talk_base { +class StreamInterface; +} // namespace talk_base + +namespace remoting { + +// StreamSocketAdapter implements net::Socket interface on top of +// libjingle's StreamInterface. It is used by JingleChromotingConnection +// to provide net::Socket interface for channels. +class StreamSocketAdapter : public net::Socket, + public sigslot::has_slots<> { + public: + // Ownership of the stream is passed to the adapter. + explicit StreamSocketAdapter(talk_base::StreamInterface* stream); + virtual ~StreamSocketAdapter(); + + // Closes the stream. |error_code| specifies error code that will + // be returned by Read() and Write() after the stream is closed. + void Close(int error_code); + + // Socket interface. + virtual int Read(net::IOBuffer* buffer, int buffer_size, + net::CompletionCallback* callback); + virtual int Write(net::IOBuffer* buffer, int buffer_size, + net::CompletionCallback* callback); + + virtual bool SetReceiveBufferSize(int32 size); + virtual bool SetSendBufferSize(int32 size); + + private: + void OnStreamEvent(talk_base::StreamInterface* stream, + int events, int error); + + void DoWrite(); + void DoRead(); + + int ReadStream(net::IOBuffer* buffer, int buffer_size); + int WriteStream(net::IOBuffer* buffer, int buffer_size); + + scoped_ptr<talk_base::StreamInterface> stream_; + + bool read_pending_; + net::CompletionCallback* read_callback_; + scoped_refptr<net::IOBuffer> read_buffer_; + int read_buffer_size_; + + bool write_pending_; + net::CompletionCallback* write_callback_; + scoped_refptr<net::IOBuffer> write_buffer_; + int write_buffer_size_; + + int closed_error_code_; + + DISALLOW_COPY_AND_ASSIGN(StreamSocketAdapter); +}; + +} // namespace remoting + +#endif // REMOTING_JINGLE_GLUE_STREAM_SOCKET_ADAPTER_H_ diff --git a/remoting/jingle_glue/utils.cc b/remoting/jingle_glue/utils.cc new file mode 100644 index 0000000..b37d734 --- /dev/null +++ b/remoting/jingle_glue/utils.cc @@ -0,0 +1,53 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/jingle_glue/utils.h" + +#include "base/logging.h" +#include "net/base/net_errors.h" +#include "third_party/libjingle/source/talk/base/socket.h" + +namespace remoting { + +// TODO(sergeyu): This is a clone of MapPosixError() from +// net/socket/tcp_client_socket_libevent.cc. Move MapPosixError() to +// net/base/net_errors.cc and use it here. + +// Convert values from <errno.h> to values from "net/base/net_errors.h" +int MapPosixToChromeError(int err) { + // There are numerous posix error codes, but these are the ones we thus far + // find interesting. + switch (err) { + case EAGAIN: +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif + return net::ERR_IO_PENDING; + case ENETDOWN: + return net::ERR_INTERNET_DISCONNECTED; + case ETIMEDOUT: + return net::ERR_TIMED_OUT; + case ENOTCONN: + return net::ERR_CONNECTION_CLOSED; + case ECONNRESET: + case ENETRESET: // Related to keep-alive + return net::ERR_CONNECTION_RESET; + case ECONNABORTED: + return net::ERR_CONNECTION_ABORTED; + case ECONNREFUSED: + return net::ERR_CONNECTION_REFUSED; + case EHOSTUNREACH: + case ENETUNREACH: + return net::ERR_ADDRESS_UNREACHABLE; + case EADDRNOTAVAIL: + return net::ERR_ADDRESS_INVALID; + case 0: + return net::OK; + default: + LOG(WARNING) << "Unknown error " << err << " mapped to net::ERR_FAILED"; + return net::ERR_FAILED; + } +} + +} // namespace remoting diff --git a/remoting/jingle_glue/utils.h b/remoting/jingle_glue/utils.h new file mode 100644 index 0000000..044a9ed --- /dev/null +++ b/remoting/jingle_glue/utils.h @@ -0,0 +1,15 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_JINGLE_GLUE_UTILS_H_ +#define REMOTING_JINGLE_GLUE_UTILS_H_ + +namespace remoting { + +// Convert values from <errno.h> to values from "net/base/net_errors.h" +int MapPosixToChromeError(int err); + +} // namespace remoting + +#endif // REMOTING_JINGLE_GLUE_UTILS_H_ diff --git a/remoting/protocol/chromoting_connection.h b/remoting/protocol/chromoting_connection.h new file mode 100644 index 0000000..e9c0f42d --- /dev/null +++ b/remoting/protocol/chromoting_connection.h @@ -0,0 +1,61 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_CHROMOTING_CONNECTION_H_ +#define REMOTING_PROTOCOL_CHROMOTING_CONNECTION_H_ + +#include <string> + +#include "base/callback.h" + +class Task; + +namespace net { +class Socket; +} // namespace net + +namespace remoting { + +// Generic interface for Chromoting connection. +class ChromotingConnection + : public base::RefCountedThreadSafe<ChromotingConnection> { + public: + enum State { + INITIALIZING, + CONNECTING, + CONNECTED, + CLOSED, + FAILED, + }; + + typedef Callback1<State>::Type StateChangeCallback; + + // Set callback that is called when state of the connection is changed. + // Must be called on the jingle thread only. + virtual void SetStateChangeCallback(StateChangeCallback* callback) = 0; + + // Reliable PseudoTCP channels for this connection. + // TODO(sergeyu): Remove VideoChannel, and use RTP channels instead. + virtual net::Socket* GetVideoChannel() = 0; + virtual net::Socket* GetEventsChannel() = 0; + + // Unreliable channels for this connection. + virtual net::Socket* GetVideoRtpChannel() = 0; + virtual net::Socket* GetVideoRtcpChannel() = 0; + + // JID of the other side. + virtual const std::string& jid() = 0; + + // Closed connection. Callbacks are guaranteed not to be called after + // |closed_task| is executed. + virtual void Close(Task* closed_task) = 0; + + protected: + friend class base::RefCountedThreadSafe<ChromotingConnection>; + virtual ~ChromotingConnection() { } +}; + +} // namespace remoting + +#endif // REMOTING_PROTOCOL_CHROMOTING_CONNECTION_H_ diff --git a/remoting/protocol/chromoting_server.h b/remoting/protocol/chromoting_server.h new file mode 100644 index 0000000..5b93534 --- /dev/null +++ b/remoting/protocol/chromoting_server.h @@ -0,0 +1,40 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_CHROMOTING_SERVER_H_ +#define REMOTING_PROTOCOL_CHROMOTING_SERVER_H_ + +#include <string> + +#include "base/callback.h" +#include "base/ref_counted.h" +#include "remoting/protocol/chromoting_connection.h" + +class Task; + +namespace remoting { + +// Generic interface for Chromoting server. +class ChromotingServer : public base::RefCountedThreadSafe<ChromotingServer> { + public: + typedef Callback2<ChromotingConnection*, bool*>::Type NewConnectionCallback; + + // Initializes connection to the host |jid|. + virtual scoped_refptr<ChromotingConnection> Connect( + const std::string& jid, + ChromotingConnection::StateChangeCallback* state_change_callback) = 0; + + // Close connection. |close_task| is executed after the session client + // is actually closed. No callbacks are called after |closed_task| is + // executed. + virtual void Close(Task* closed_task) = 0; + + protected: + friend class base::RefCountedThreadSafe<ChromotingServer>; + virtual ~ChromotingServer() { } +}; + +} // namespace remoting + +#endif // REMOTING_PROTOCOL_CHROMOTING_SERVER_H_ diff --git a/remoting/protocol/jingle_chromoting_connection.cc b/remoting/protocol/jingle_chromoting_connection.cc new file mode 100644 index 0000000..48e3ec0 --- /dev/null +++ b/remoting/protocol/jingle_chromoting_connection.cc @@ -0,0 +1,233 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/jingle_chromoting_connection.h" + +#include "base/message_loop.h" +#include "net/base/net_errors.h" +#include "remoting/jingle_glue/channel_socket_adapter.h" +#include "remoting/jingle_glue/stream_socket_adapter.h" +#include "remoting/protocol/jingle_chromoting_server.h" +#include "third_party/libjingle/source/talk/base/thread.h" +#include "third_party/libjingle/source/talk/p2p/base/session.h" +#include "third_party/libjingle/source/talk/session/tunnel/pseudotcpchannel.h" + +using cricket::BaseSession; +using cricket::PseudoTcpChannel; +using cricket::Session; + +namespace remoting { + +namespace { +const char kVideoChannelName[] = "video"; +const char kVideoRtpChannelName[] = "videortp"; +const char kVideoRtcpChannelName[] = "videortcp"; +const char kEventsChannelName[] = "events"; +} // namespace + +JingleChromotingConnection::JingleChromotingConnection( + JingleChromotingServer* session_client) + : session_client_(session_client), + state_(INITIALIZING), + closed_(false), + session_(NULL), + events_channel_(NULL), + video_channel_(NULL) { +} + +JingleChromotingConnection::~JingleChromotingConnection() { + DCHECK(closed_); +} + +void JingleChromotingConnection::Init(Session* session) { + DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + + session_ = session; + jid_ = session_->remote_name(); + session_->SignalState.connect( + this, &JingleChromotingConnection::OnSessionState); +} + +bool JingleChromotingConnection::HasSession(cricket::Session* session) { + return session_ == session; +} + +Session* JingleChromotingConnection::ReleaseSession() { + DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + + SetState(CLOSED); + Session* session = session_; + if (session_) + session_->SignalState.disconnect(this); + session_ = NULL; + closed_ = true; + return session; +} + +void JingleChromotingConnection::SetStateChangeCallback( + StateChangeCallback* callback) { + DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + DCHECK(callback); + state_change_callback_.reset(callback); +} + +// TODO(sergeyu): Remove this method after we switch to RTP. +net::Socket* JingleChromotingConnection::GetVideoChannel() { + DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + return video_channel_adapter_.get(); +} + +net::Socket* JingleChromotingConnection::GetEventsChannel() { + DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + return events_channel_adapter_.get(); +} + +net::Socket* JingleChromotingConnection::GetVideoRtpChannel() { + DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + return video_rtp_channel_.get(); +} + +net::Socket* JingleChromotingConnection::GetVideoRtcpChannel() { + DCHECK_EQ(session_client_->message_loop(), MessageLoop::current()); + return video_rtcp_channel_.get(); +} + +void JingleChromotingConnection::Close(Task* closed_task) { + if (MessageLoop::current() != session_client_->message_loop()) { + session_client_->message_loop()->PostTask( + FROM_HERE, NewRunnableMethod(this, &JingleChromotingConnection::Close, + closed_task)); + return; + } + + if (!closed_) { + if (events_channel_adapter_.get()) + events_channel_adapter_->Close(net::ERR_CONNECTION_CLOSED); + + if (events_channel_) { + events_channel_->OnSessionTerminate(session_); + events_channel_ = NULL; + } + + if (video_channel_adapter_.get()) + video_channel_adapter_->Close(net::ERR_CONNECTION_CLOSED); + + if (video_channel_) { + video_channel_->OnSessionTerminate(session_); + video_channel_ = NULL; + } + + if (video_rtp_channel_.get()) + video_rtp_channel_->Close(net::ERR_CONNECTION_CLOSED); + if (video_rtcp_channel_.get()) + video_rtcp_channel_->Close(net::ERR_CONNECTION_CLOSED); + + if (session_) + session_->Terminate(); + + closed_ = true; + } + + closed_task->Run(); + delete closed_task; +} + +void JingleChromotingConnection::OnSessionState( + BaseSession* session, BaseSession::State state) { + DCHECK_EQ(session_, session); + + switch (state) { + case Session::STATE_SENTINITIATE: + OnInitiate(false); + break; + + case Session::STATE_RECEIVEDINITIATE: + OnInitiate(true); + break; + + case Session::STATE_SENTACCEPT: + case Session::STATE_RECEIVEDACCEPT: + OnAccept(); + break; + + case Session::STATE_RECEIVEDTERMINATE: + OnTerminate(); + break; + + case Session::STATE_DEINIT: + // Close() must have been called before this. + NOTREACHED(); + break; + + default: + break; + } +} + +void JingleChromotingConnection::OnInitiate(bool incoming) { + jid_ = session_->remote_name(); + if (incoming) + session_client_->AcceptConnection(this, session_); + SetState(CONNECTING); +} + +void JingleChromotingConnection::OnAccept() { + // Create video RTP channels. + video_rtp_channel_.reset(new TransportChannelSocketAdapter( + session_->CreateChannel(kVideoRtpChannelName))); + video_rtcp_channel_.reset(new TransportChannelSocketAdapter( + session_->CreateChannel(kVideoRtcpChannelName))); + + // Create events channel. + events_channel_ = + new PseudoTcpChannel(talk_base::Thread::Current(), session_); + events_channel_->Connect(kEventsChannelName); + events_channel_adapter_.reset(new StreamSocketAdapter( + events_channel_->GetStream())); + + // Create video channel. + // TODO(sergeyu): Remove video channel when we are ready to switch to RTP. + video_channel_ = + new PseudoTcpChannel(talk_base::Thread::Current(), session_); + video_channel_->Connect(kVideoChannelName); + video_channel_adapter_.reset(new StreamSocketAdapter( + video_channel_->GetStream())); + + SetState(CONNECTED); +} + +void JingleChromotingConnection::OnTerminate() { + if (events_channel_adapter_.get()) + events_channel_adapter_->Close(net::ERR_CONNECTION_ABORTED); + if (events_channel_) { + events_channel_->OnSessionTerminate(session_); + events_channel_ = NULL; + } + + if (video_channel_adapter_.get()) + video_channel_adapter_->Close(net::ERR_CONNECTION_ABORTED); + if (video_channel_) { + video_channel_->OnSessionTerminate(session_); + video_channel_ = NULL; + } + + if (video_rtp_channel_.get()) + video_rtp_channel_->Close(net::ERR_CONNECTION_ABORTED); + if (video_rtcp_channel_.get()) + video_rtcp_channel_->Close(net::ERR_CONNECTION_ABORTED); + + SetState(CLOSED); + + closed_ = true; +} + +void JingleChromotingConnection::SetState(State new_state) { + if (new_state != state_) { + state_ = new_state; + if (!closed_ && state_change_callback_.get()) + state_change_callback_->Run(new_state); + } +} + +} // namespace remoting diff --git a/remoting/protocol/jingle_chromoting_connection.h b/remoting/protocol/jingle_chromoting_connection.h new file mode 100644 index 0000000..ada4ead --- /dev/null +++ b/remoting/protocol/jingle_chromoting_connection.h @@ -0,0 +1,97 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_JINGLE_CHROMOTING_CONNECTION_H_ +#define REMOTING_PROTOCOL_JINGLE_CHROMOTING_CONNECTION_H_ + +#include "base/lock.h" +#include "base/ref_counted.h" +#include "remoting/protocol/chromoting_connection.h" +#include "third_party/libjingle/source/talk/base/sigslot.h" +#include "third_party/libjingle/source/talk/p2p/base/session.h" + +namespace cricket { +class PseudoTcpChannel; +} // namespace cricket + +namespace net { +class Socket; +} // namespace net + +namespace remoting { + +class JingleChromotingServer; +class StreamSocketAdapter; +class TransportChannelSocketAdapter; + +// Implements ChromotingConnection that work over libjingle session (the +// cricket::Session object is passed to Init() method). Created +// by JingleChromotingServer for incoming and outgoing connections. +class JingleChromotingConnection : public ChromotingConnection, + public sigslot::has_slots<> { + public: + explicit JingleChromotingConnection(JingleChromotingServer* client); + + // ChromotingConnection interface. + virtual void SetStateChangeCallback(StateChangeCallback* callback) ; + + virtual net::Socket* GetVideoChannel(); + virtual net::Socket* GetEventsChannel(); + virtual net::Socket* GetVideoRtpChannel(); + virtual net::Socket* GetVideoRtcpChannel(); + + // No synchronization is needed because jid_ is not changed + // after new connection is passed to JingleChromotingServer callback. + virtual const std::string& jid() { return jid_; }; + + virtual void Close(Task* closed_task); + + protected: + virtual ~JingleChromotingConnection(); + + private: + friend class JingleChromotingServer; + + // Called by JingleChromotingServer. + void Init(cricket::Session* session); + bool HasSession(cricket::Session* session); + cricket::Session* ReleaseSession(); + + // Used for Session.SignalState sigslot. + void OnSessionState(cricket::BaseSession* session, + cricket::BaseSession::State state); + void OnInitiate(bool incoming); + void OnAccept(); + void OnTerminate(); + + void SetState(State new_state); + + // JingleChromotingServer that created this connection. + scoped_refptr<JingleChromotingServer> session_client_; + + State state_; + scoped_ptr<StateChangeCallback> state_change_callback_; + + bool closed_; + + // JID of the other side. Set when the connection is initialized, + // and never changed after that. + std::string jid_; + + // The corresponding libjingle session. + cricket::Session* session_; + + cricket::PseudoTcpChannel* events_channel_; + scoped_ptr<StreamSocketAdapter> events_channel_adapter_; + cricket::PseudoTcpChannel* video_channel_; + scoped_ptr<StreamSocketAdapter> video_channel_adapter_; + scoped_ptr<TransportChannelSocketAdapter> video_rtp_channel_; + scoped_ptr<TransportChannelSocketAdapter> video_rtcp_channel_; + + DISALLOW_COPY_AND_ASSIGN(JingleChromotingConnection); +}; + +} // namespace remoting + +#endif // REMOTING_PROTOCOL_JINGLE_CHROMOTING_CONNECTION_H_ diff --git a/remoting/protocol/jingle_chromoting_server.cc b/remoting/protocol/jingle_chromoting_server.cc new file mode 100644 index 0000000..851221f --- /dev/null +++ b/remoting/protocol/jingle_chromoting_server.cc @@ -0,0 +1,216 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "remoting/protocol/jingle_chromoting_server.h" + +#include "base/message_loop.h" +#include "remoting/base/constants.h" +#include "third_party/libjingle/source/talk/p2p/base/transport.h" +#include "third_party/libjingle/source/talk/xmllite/xmlelement.h" + +using cricket::ContentDescription; +using cricket::SessionDescription; +using cricket::Session; +using cricket::SessionManager; +using buzz::QName; + +namespace remoting { + +namespace { +const char kDescriptionTag[] = "description"; +const char kTypeTag[] = "type"; +} // namespace + +ChromotingContentDescription::ChromotingContentDescription( + const std::string& description) + : description_(description) { +} + +JingleChromotingServer::JingleChromotingServer( + MessageLoop* message_loop) + : message_loop_(message_loop), + session_manager_(NULL), + allow_local_ips_(false), + closed_(false) { + DCHECK(message_loop_); +} + +void JingleChromotingServer::Init( + const std::string& local_jid, + cricket::SessionManager* session_manager, + NewConnectionCallback* new_connection_callback) { + if (MessageLoop::current() != message_loop()) { + message_loop()->PostTask( + FROM_HERE, NewRunnableMethod( + this, &JingleChromotingServer::Init, + local_jid, session_manager, new_connection_callback)); + return; + } + + DCHECK(session_manager); + DCHECK(new_connection_callback); + + local_jid_ = local_jid; + new_connection_callback_.reset(new_connection_callback); + session_manager_ = session_manager; + session_manager_->AddClient(kChromotingXmlNamespace, this); +} + +JingleChromotingServer::~JingleChromotingServer() { + DCHECK(closed_); +} + +void JingleChromotingServer::Close(Task* closed_task) { + if (MessageLoop::current() != message_loop()) { + message_loop()->PostTask( + FROM_HERE, NewRunnableMethod(this, &JingleChromotingServer::Close, + closed_task)); + return; + } + + if (!closed_) { + // Close all connections. + session_manager_->RemoveClient(kChromotingXmlNamespace); + while (!connections_.empty()) { + Session* session = connections_.front()->ReleaseSession(); + session_manager_->DestroySession(session); + connections_.pop_front(); + } + closed_ = true; + } + + closed_task->Run(); + delete closed_task; +} + +void JingleChromotingServer::set_allow_local_ips(bool allow_local_ips) { + allow_local_ips_ = allow_local_ips; +} + +scoped_refptr<ChromotingConnection> JingleChromotingServer::Connect( + const std::string& jid, + ChromotingConnection::StateChangeCallback* state_change_callback) { + // Can be called from any thread. + scoped_refptr<JingleChromotingConnection> connection = + new JingleChromotingConnection(this); + message_loop()->PostTask( + FROM_HERE, NewRunnableMethod(this, &JingleChromotingServer::DoConnect, + connection, jid, state_change_callback)); + return connection; +} + +void JingleChromotingServer::DoConnect( + scoped_refptr<JingleChromotingConnection> connection, + const std::string& jid, + ChromotingConnection::StateChangeCallback* state_change_callback) { + DCHECK_EQ(message_loop(), MessageLoop::current()); + Session* session = session_manager_->CreateSession( + local_jid_, kChromotingXmlNamespace); + + // Initialize connection object before we send initiate stanza. + connection->SetStateChangeCallback(state_change_callback); + connection->Init(session); + connections_.push_back(connection); + + session->Initiate(jid, CreateSessionDescription()); +} + +MessageLoop* JingleChromotingServer::message_loop() { + return message_loop_; +} + +void JingleChromotingServer::OnSessionCreate( + Session* session, bool incoming) { + DCHECK_EQ(message_loop(), MessageLoop::current()); + + // Allow local connections if neccessary. + session->transport()->set_allow_local_ips(allow_local_ips_); + + // If this is an outcoming session, the the connection object is already + // created. + if (incoming) { + JingleChromotingConnection* connection = + new JingleChromotingConnection(this); + connections_.push_back(connection); + connection->Init(session); + } +} + +void JingleChromotingServer::OnSessionDestroy(Session* session) { + DCHECK_EQ(message_loop(), MessageLoop::current()); + + std::list<scoped_refptr<JingleChromotingConnection> >::iterator it; + for (it = connections_.begin(); it != connections_.end(); ++it) { + if ((*it)->HasSession(session)) { + (*it)->ReleaseSession(); + connections_.erase(it); + return; + } + } +} + +void JingleChromotingServer::AcceptConnection( + JingleChromotingConnection* connection, + Session* session) { + bool accept = false; + // Always reject connection if we are closed or there is no callback. + if (!closed_ && new_connection_callback_.get()) + new_connection_callback_->Run(connection, &accept); + + if (accept) + session->Accept(CreateSessionDescription()); + else + session->Reject(); +} + +bool JingleChromotingServer::ParseContent( + const buzz::XmlElement* element, + const cricket::ContentDescription** content, + cricket::ParseError* error) { + if (element->Name() == QName(kChromotingXmlNamespace, kDescriptionTag)) { + const buzz::XmlElement* type_elem = + element->FirstNamed(QName(kChromotingXmlNamespace, kTypeTag)); + if (type_elem != NULL) { + *content = new ChromotingContentDescription(type_elem->BodyText()); + return true; + } + } + LOG(ERROR) << "Invalid description: " << element->Str(); + return false; +} + +// WriteContent creates content description for chromoting session. The +// description looks as follows: +// <description xmlns="google:remoting"> +// <type>description_text</type> +// </description> +// Currently description_text is always empty. +// +// TODO(sergeyu): Add more information to the content description. E.g. +// protocol version, etc. +bool JingleChromotingServer::WriteContent( + const cricket::ContentDescription* content, + buzz::XmlElement** elem, + cricket::WriteError* error) { + const ChromotingContentDescription* desc = + static_cast<const ChromotingContentDescription*>(content); + + QName desc_tag(kChromotingXmlNamespace, kDescriptionTag); + buzz::XmlElement* root = new buzz::XmlElement(desc_tag, true); + QName type_tag(kChromotingXmlNamespace, kTypeTag); + buzz::XmlElement* type_elem = new buzz::XmlElement(type_tag); + type_elem->SetBodyText(desc->description()); + root->AddElement(type_elem); + *elem = root; + return true; +} + +SessionDescription* JingleChromotingServer::CreateSessionDescription() { + SessionDescription* desc = new SessionDescription(); + desc->AddContent("chromoting", kChromotingXmlNamespace, + new ChromotingContentDescription("")); + return desc; +} + +} // namespace remoting diff --git a/remoting/protocol/jingle_chromoting_server.h b/remoting/protocol/jingle_chromoting_server.h new file mode 100644 index 0000000..7a6fd84 --- /dev/null +++ b/remoting/protocol/jingle_chromoting_server.h @@ -0,0 +1,111 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef REMOTING_PROTOCOL_JINGLE_CHROMOTING_SERVER_H_ +#define REMOTING_PROTOCOL_JINGLE_CHROMOTING_SERVER_H_ + +#include <list> +#include <string> + +#include "base/lock.h" +#include "base/ref_counted.h" +#include "remoting/protocol/chromoting_server.h" +#include "remoting/protocol/jingle_chromoting_connection.h" +#include "third_party/libjingle/source/talk/p2p/base/session.h" +#include "third_party/libjingle/source/talk/p2p/base/sessionclient.h" +#include "third_party/libjingle/source/talk/p2p/base/sessiondescription.h" + +class MessageLoop; + +namespace cricket { +class SessionManager; +} // namespace cricket + +namespace remoting { + +// ContentDescription used for chromoting sessions. +// +// TODO(sergeyu): Do we need host_id or some other information in the content +// description? +class ChromotingContentDescription : public cricket::ContentDescription { + public: + ChromotingContentDescription(const std::string& description); + const std::string& description() const { return description_; } + private: + std::string description_; +}; + +// This class implements SessionClient for Chromoting sessions. It acts as a +// server that accepts chromoting connections and can also make new connections +// to other hosts. +class JingleChromotingServer + : public ChromotingServer, + public cricket::SessionClient { + public: + explicit JingleChromotingServer(MessageLoop* message_loop); + + // Initializes the session client. Doesn't accept ownership of the + // |session_manager|. Close() must be called _before_ the |session_manager| + // is destroyed. + virtual void Init(const std::string& local_jid, + cricket::SessionManager* session_manager, + NewConnectionCallback* new_connection_callback); + + // ChromotingServer interface. + virtual scoped_refptr<ChromotingConnection> Connect( + const std::string& jid, + ChromotingConnection::StateChangeCallback* state_change_callback); + virtual void Close(Task* closed_task); + + void set_allow_local_ips(bool allow_local_ips); + + protected: + virtual ~JingleChromotingServer(); + + private: + friend class JingleChromotingConnection; + + // Message loop used by this session client. + MessageLoop* message_loop(); + + // Called by JingleChromotingConnection when a new connection is initiated. + void AcceptConnection(JingleChromotingConnection* connection, + cricket::Session* session); + + void DoConnect( + scoped_refptr<JingleChromotingConnection> connection, + const std::string& jid, + ChromotingConnection::StateChangeCallback* state_change_callback); + + // Creates outgoing session description for an incoming connection. + cricket::SessionDescription* CreateSessionDescription(); + + // cricket::SessionClient interface. + virtual void OnSessionCreate(cricket::Session* session, + bool received_initiate); + virtual void OnSessionDestroy(cricket::Session* session); + + virtual bool ParseContent(const buzz::XmlElement* elem, + const cricket::ContentDescription** content, + cricket::ParseError* error); + virtual bool WriteContent(const cricket::ContentDescription* content, + buzz::XmlElement** elem, + cricket::WriteError* error); + + std::string local_jid_; + MessageLoop* message_loop_; + cricket::SessionManager* session_manager_; + scoped_ptr<NewConnectionCallback> new_connection_callback_; + bool allow_local_ips_; + + bool closed_; + + std::list<scoped_refptr<JingleChromotingConnection> > connections_; + + DISALLOW_COPY_AND_ASSIGN(JingleChromotingServer); +}; + +} // namespace remoting + +#endif // REMOTING_PROTOCOL_JINGLE_CHROMOTING_SERVER_H_ diff --git a/remoting/protocol/protocol_test_client.cc b/remoting/protocol/protocol_test_client.cc new file mode 100644 index 0000000..804b008 --- /dev/null +++ b/remoting/protocol/protocol_test_client.cc @@ -0,0 +1,380 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "build/build_config.h" + +#if !defined(OS_WIN) +extern "C" { +#include <unistd.h> +} +#endif // !defined(OS_WIN) + +#include <iostream> +#include <list> + +#include "base/at_exit.h" +#include "base/command_line.h" +#include "base/nss_util.h" +#include "base/time.h" +#include "net/base/completion_callback.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "net/socket/socket.h" +#include "remoting/base/constants.h" +#include "remoting/jingle_glue/jingle_client.h" +#include "remoting/jingle_glue/jingle_thread.h" +#include "remoting/protocol/jingle_chromoting_server.h" + +using remoting::kChromotingTokenServiceName; + +namespace remoting { + +namespace { +const int kBufferSize = 4096; +} // namespace + +class ProtocolTestClient; + +class ProtocolTestConnection + : public base::RefCountedThreadSafe<ProtocolTestConnection> { + public: + ProtocolTestConnection(ProtocolTestClient* client, MessageLoop* message_loop) + : client_(client), + message_loop_(message_loop), + connection_(NULL), + ALLOW_THIS_IN_INITIALIZER_LIST( + write_cb_(this, &ProtocolTestConnection::OnWritten)), + pending_write_(false), + ALLOW_THIS_IN_INITIALIZER_LIST( + read_cb_(this, &ProtocolTestConnection::OnRead)), + closed_event_(true, false) { + } + + void Init(ChromotingConnection* connection); + void Write(const std::string& str); + void Read(); + void Close(); + + // ChromotingConnection::Callback interface. + virtual void OnStateChange(ChromotingConnection::State state); + private: + void DoWrite(scoped_refptr<net::IOBuffer> buf, int size); + void DoRead(); + + void HandleReadResult(int result); + + void OnWritten(int result); + void OnRead(int result); + + void OnFinishedClosing(); + + ProtocolTestClient* client_; + MessageLoop* message_loop_; + scoped_refptr<ChromotingConnection> connection_; + net::CompletionCallbackImpl<ProtocolTestConnection> write_cb_; + bool pending_write_; + net::CompletionCallbackImpl<ProtocolTestConnection> read_cb_; + scoped_refptr<net::IOBuffer> read_buffer_; + base::WaitableEvent closed_event_; +}; + +class ProtocolTestClient + : public JingleClient::Callback, + public base::RefCountedThreadSafe<ProtocolTestClient> { + public: + ProtocolTestClient() + : closed_event_(true, false) { + } + + virtual ~ProtocolTestClient() {} + + void Run(const std::string& username, const std::string& auth_token, + const std::string& host_jid); + + void OnConnectionClosed(ProtocolTestConnection* connection); + + // JingleClient::Callback interface. + virtual void OnStateChange(JingleClient* client, JingleClient::State state); + virtual bool OnAcceptConnection(JingleClient* client, const std::string& jid, + JingleChannel::Callback** callback); + virtual void OnNewConnection(JingleClient* client, + scoped_refptr<JingleChannel> channel); + + // callback for JingleChromotingServer interface. + virtual void OnNewChromotocolConnection(ChromotingConnection* connection, + bool* accept); + + private: + typedef std::list<scoped_refptr<ProtocolTestConnection> > ConnectionsList; + + void OnFinishedClosing(); + void DestroyConnection(scoped_refptr<ProtocolTestConnection> connection); + + std::string host_jid_; + scoped_refptr<JingleClient> client_; + scoped_refptr<JingleChromotingServer> server_; + ConnectionsList connections_; + Lock connections_lock_; + base::WaitableEvent closed_event_; +}; + + +void ProtocolTestConnection::Init(ChromotingConnection* connection) { + connection_ = connection; +} + +void ProtocolTestConnection::Write(const std::string& str) { + if (str.empty()) + return; + + scoped_refptr<net::IOBuffer> buf = new net::IOBuffer(str.length()); + memcpy(buf->data(), str.c_str(), str.length()); + message_loop_->PostTask( + FROM_HERE, NewRunnableMethod( + this, &ProtocolTestConnection::DoWrite, buf, str.length())); +} + +void ProtocolTestConnection::DoWrite( + scoped_refptr<net::IOBuffer> buf, int size) { + if (pending_write_) { + LOG(ERROR) << "Cannot write because there is another pending write."; + return; + } + + net::Socket* channel = connection_->GetEventsChannel(); + if (channel != NULL) { + int result = channel->Write(buf, size, &write_cb_); + if (result < 0) { + if (result == net::ERR_IO_PENDING) + pending_write_ = true; + else + LOG(ERROR) << "Write() returned error " << result; + } + } else { + LOG(ERROR) << "Cannot write because the channel isn't intialized yet."; + } +} + +void ProtocolTestConnection::Read() { + message_loop_->PostTask( + FROM_HERE, NewRunnableMethod( + this, &ProtocolTestConnection::DoRead)); +} + +void ProtocolTestConnection::DoRead() { + read_buffer_ = new net::IOBuffer(kBufferSize); + while (true) { + int result = connection_->GetEventsChannel()->Read( + read_buffer_, kBufferSize, &read_cb_); + if (result < 0) { + if (result != net::ERR_IO_PENDING) + LOG(ERROR) << "Read failed: " << result; + break; + } else { + HandleReadResult(result); + } + } +} + +void ProtocolTestConnection::Close() { + connection_->Close( + NewRunnableMethod(this, &ProtocolTestConnection::OnFinishedClosing)); + closed_event_.Wait(); +} + +void ProtocolTestConnection::OnFinishedClosing() { + closed_event_.Signal(); +} + +void ProtocolTestConnection::OnStateChange( + ChromotingConnection::State state) { + LOG(INFO) << "State of " << connection_->jid() << " changed to " << state; + if (state == ChromotingConnection::CONNECTED) { + // Start reading after we've connected. + Read(); + } else if (state == ChromotingConnection::CLOSED) { + std::cerr << "Connection to " << connection_->jid() + << " closed" << std::endl; + client_->OnConnectionClosed(this); + } +} + +void ProtocolTestConnection::OnWritten(int result) { + pending_write_ = false; + if (result < 0) + LOG(ERROR) << "Write() returned error " << result; +} + +void ProtocolTestConnection::OnRead(int result) { + HandleReadResult(result); + DoRead(); +} + +void ProtocolTestConnection::HandleReadResult(int result) { + if (result > 0) { + std::string str(reinterpret_cast<const char*>(read_buffer_->data()), + result); + std::cout << "(" << connection_->jid() << "): " << str << std::endl; + } else { + LOG(ERROR) << "Read() returned error " << result; + } +} + +void ProtocolTestClient::Run(const std::string& username, + const std::string& auth_token, + const std::string& host_jid) { + remoting::JingleThread jingle_thread; + jingle_thread.Start(); + client_ = new JingleClient(&jingle_thread); + client_->Init(username, auth_token, kChromotingTokenServiceName, this); + + server_ = new JingleChromotingServer(jingle_thread.message_loop()); + + host_jid_ = host_jid; + + while (true) { + std::string line; + std::getline(std::cin, line); + + { + AutoLock auto_lock(connections_lock_); + + // Broadcast message to all clients. + for (ConnectionsList::iterator it = connections_.begin(); + it != connections_.end(); ++it) { + (*it)->Write(line); + } + } + + if (line == "exit") + break; + } + + while (!connections_.empty()) { + connections_.front()->Close(); + connections_.pop_front(); + } + + if (server_) { + server_->Close( + NewRunnableMethod(this, &ProtocolTestClient::OnFinishedClosing)); + closed_event_.Wait(); + } + + client_->Close(); + jingle_thread.Stop(); +} + +void ProtocolTestClient::OnConnectionClosed( + ProtocolTestConnection* connection) { + client_->message_loop()->PostTask( + FROM_HERE, NewRunnableMethod( + this, &ProtocolTestClient::DestroyConnection, + scoped_refptr<ProtocolTestConnection>(connection))); +} + +void ProtocolTestClient::OnStateChange( + JingleClient* client, JingleClient::State state) { + if (state == JingleClient::CONNECTED) { + std::cerr << "Connected as " << client->GetFullJid() << std::endl; + + server_->Init( + client_->GetFullJid(), client_->session_manager(), + NewCallback(this, + &ProtocolTestClient::OnNewChromotocolConnection)); + server_->set_allow_local_ips(true); + + if (host_jid_ != "") { + ProtocolTestConnection* connection = + new ProtocolTestConnection(this, client_->message_loop()); + connection->Init(server_->Connect( + host_jid_, NewCallback(connection, + &ProtocolTestConnection::OnStateChange))); + connections_.push_back(connection); + } + } else if (state == JingleClient::CLOSED) { + std::cerr << "Connection closed" << std::endl; + } +} + +bool ProtocolTestClient::OnAcceptConnection( + JingleClient* client, const std::string& jid, + JingleChannel::Callback** callback) { + return false; +} + +void ProtocolTestClient::OnNewConnection( + JingleClient* client, scoped_refptr<JingleChannel> channel) { + NOTREACHED(); +} + +void ProtocolTestClient::OnNewChromotocolConnection( + ChromotingConnection* connection, bool* accept) { + std::cerr << "Accepting connection from " << connection->jid() << std::endl; + ProtocolTestConnection* test_connection = + new ProtocolTestConnection(this, client_->message_loop()); + connection->SetStateChangeCallback( + NewCallback(test_connection, &ProtocolTestConnection::OnStateChange)); + test_connection->Init(connection); + AutoLock auto_lock(connections_lock_); + connections_.push_back(test_connection); + *accept = true; +} + +void ProtocolTestClient::OnFinishedClosing() { + closed_event_.Signal(); +} + +void ProtocolTestClient::DestroyConnection( + scoped_refptr<ProtocolTestConnection> connection) { + connection->Close(); + AutoLock auto_lock(connections_lock_); + for (ConnectionsList::iterator it = connections_.begin(); + it != connections_.end(); ++it) { + if ((*it) == connection) { + connections_.erase(it); + return; + } + } +} + +} // namespace remoting + +using remoting::ProtocolTestClient; + +void usage(char* command) { + std::cerr << "Usage: " << command << "--username=<username>" << std::endl + << "\t--auth_token=<auth_token>" << std::endl + << "\t[--host_jid=<host_jid>]" << std::endl; + exit(1); +} + +int main(int argc, char** argv) { + CommandLine::Init(argc, argv); + const CommandLine* cmd_line = CommandLine::ForCurrentProcess(); + + if (!cmd_line->args().empty() || cmd_line->HasSwitch("help")) + usage(argv[0]); + + base::AtExitManager exit_manager; + + base::EnsureNSPRInit(); + base::EnsureNSSInit(); + + std::string host_jid(cmd_line->GetSwitchValueASCII("host_jid")); + + if (!cmd_line->HasSwitch("username")) + usage(argv[0]); + std::string username(cmd_line->GetSwitchValueASCII("username")); + + if (!cmd_line->HasSwitch("auth_token")) + usage(argv[0]); + std::string auth_token(cmd_line->GetSwitchValueASCII("auth_token")); + + scoped_refptr<ProtocolTestClient> client = new ProtocolTestClient(); + + client->Run(username, auth_token, host_jid); + + return 0; +} diff --git a/remoting/remoting.gyp b/remoting/remoting.gyp index 9f524ac..203ec43 100644 --- a/remoting/remoting.gyp +++ b/remoting/remoting.gyp @@ -308,6 +308,8 @@ '../third_party/libjingle/libjingle.gyp:libjingle_p2p', ], 'sources': [ + 'jingle_glue/channel_socket_adapter.cc', + 'jingle_glue/channel_socket_adapter.h', 'jingle_glue/iq_request.cc', 'jingle_glue/iq_request.h', 'jingle_glue/jingle_channel.cc', @@ -320,10 +322,14 @@ 'jingle_glue/jingle_thread.h', 'jingle_glue/relay_port_allocator.cc', 'jingle_glue/relay_port_allocator.h', + 'jingle_glue/stream_socket_adapter.cc', + 'jingle_glue/stream_socket_adapter.h', 'jingle_glue/ssl_adapter.h', 'jingle_glue/ssl_adapter.cc', 'jingle_glue/ssl_socket_adapter.cc', 'jingle_glue/ssl_socket_adapter.h', + 'jingle_glue/utils.cc', + 'jingle_glue/utils.h', 'jingle_glue/xmpp_socket_adapter.cc', 'jingle_glue/xmpp_socket_adapter.h', ], @@ -342,6 +348,38 @@ ], }, # end of target 'chromoting_jingle_test_client' + { + 'target_name': 'chromoting_protocol', + 'type': '<(library)', + 'dependencies': [ + 'chromoting_base', + 'chromoting_jingle_glue', + ], + 'export_dependent_settings': [ + 'chromoting_jingle_glue', + ], + 'sources': [ + 'protocol/chromoting_connection.h', + 'protocol/chromoting_server.h', + 'protocol/jingle_chromoting_connection.cc', + 'protocol/jingle_chromoting_connection.h', + 'protocol/jingle_chromoting_server.cc', + 'protocol/jingle_chromoting_server.h', + ], + }, # end of target 'chromoting_protocol' + + { + 'target_name': 'chromotocol_test_client', + 'type': 'executable', + 'dependencies': [ + 'chromoting_base', + 'chromoting_protocol', + ], + 'sources': [ + 'protocol/protocol_test_client.cc', + ], + }, # end of target 'chromotocol_test_client' + # Remoting unit tests { 'target_name': 'remoting_unittests', |