// Copyright 2014 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "components/devtools_bridge/session_dependency_factory.h" #include "base/bind.h" #include "base/location.h" #include "base/task_runner.h" #include "base/threading/thread.h" #include "components/devtools_bridge/abstract_data_channel.h" #include "components/devtools_bridge/abstract_peer_connection.h" #include "components/devtools_bridge/rtc_configuration.h" #include "third_party/libjingle/source/talk/app/webrtc/mediaconstraintsinterface.h" #include "third_party/libjingle/source/talk/app/webrtc/peerconnectioninterface.h" #include "third_party/webrtc/base/bind.h" #include "third_party/webrtc/base/messagehandler.h" #include "third_party/webrtc/base/messagequeue.h" #include "third_party/webrtc/base/ssladapter.h" #include "third_party/webrtc/base/thread.h" namespace devtools_bridge { class RTCConfiguration::Impl : public RTCConfiguration, public webrtc::PeerConnectionInterface::RTCConfiguration { public: void AddIceServer(const std::string& uri, const std::string& username, const std::string& credential) override { webrtc::PeerConnectionInterface::IceServer server; server.uri = uri; server.username = username; server.password = credential; servers.push_back(server); } const Impl& impl() const override { return *this; } private: webrtc::PeerConnectionInterface::RTCConfiguration base_; }; namespace { template void CheckedRelease(rtc::scoped_refptr* ptr) { CHECK_EQ(0, ptr->release()->Release()); } class MediaConstraints : public webrtc::MediaConstraintsInterface { public: ~MediaConstraints() override {} const Constraints& GetMandatory() const override { return mandatory_; } const Constraints& GetOptional() const override { return optional_; } void AddMandatory(const std::string& key, const std::string& value) { mandatory_.push_back(Constraint(key, value)); } private: Constraints mandatory_; Constraints optional_; }; /** * Posts tasks on signaling thread. If stopped (when SesseionDependencyFactry * is destroying) ignores posted tasks. */ class SignalingThreadTaskRunner : public base::TaskRunner, private rtc::MessageHandler { public: explicit SignalingThreadTaskRunner(rtc::Thread* thread) : thread_(thread) {} bool PostDelayedTask(const tracked_objects::Location& from_here, const base::Closure& task, base::TimeDelta delay) override { DCHECK(delay.ToInternalValue() == 0); rtc::CritScope scope(&critical_section_); if (thread_) thread_->Send(this, 0, new Task(task)); return true; } bool RunsTasksOnCurrentThread() const override { rtc::CritScope scope(&critical_section_); return thread_ != NULL && thread_->IsCurrent(); } void Stop() { rtc::CritScope scope(&critical_section_); thread_ = NULL; } private: typedef rtc::TypedMessageData Task; ~SignalingThreadTaskRunner() override {} void OnMessage(rtc::Message* msg) override { static_cast(msg->pdata)->data().Run(); } mutable rtc::CriticalSection critical_section_; rtc::Thread* thread_; // Guarded by |critical_section_|. }; class DataChannelObserverImpl : public webrtc::DataChannelObserver { public: DataChannelObserverImpl( webrtc::DataChannelInterface* data_channel, scoped_ptr observer) : data_channel_(data_channel), observer_(observer.Pass()) { } void InitState() { open_ = data_channel_->state() == webrtc::DataChannelInterface::kOpen; } void OnStateChange() override { bool open = data_channel_->state() == webrtc::DataChannelInterface::kOpen; if (open == open_) return; open_ = open; if (open) { observer_->OnOpen(); } else { observer_->OnClose(); } } void OnMessage(const webrtc::DataBuffer& buffer) override { observer_->OnMessage(buffer.data.data(), buffer.size()); } private: webrtc::DataChannelInterface* const data_channel_; scoped_ptr const observer_; bool open_; }; /** * Thread-safe view on AbstractDataChannel. */ class DataChannelProxyImpl : public AbstractDataChannel::Proxy { public: DataChannelProxyImpl( SessionDependencyFactory* factory, rtc::scoped_refptr data_channel) : data_channel_(data_channel), signaling_thread_task_runner_( factory->signaling_thread_task_runner()) { } void StopOnSignalingThread() { data_channel_ = NULL; } void SendBinaryMessage(const void* data, size_t length) override { auto buffer = make_scoped_ptr(new webrtc::DataBuffer(rtc::Buffer(), true)); buffer->data.SetData(static_cast(data), length); signaling_thread_task_runner_->PostTask( FROM_HERE, base::Bind( &DataChannelProxyImpl::SendMessageOnSignalingThread, this, base::Passed(&buffer))); } void Close() override { signaling_thread_task_runner_->PostTask( FROM_HERE, base::Bind(&DataChannelProxyImpl::CloseOnSignalingThread, this)); } private: ~DataChannelProxyImpl() override {} void SendMessageOnSignalingThread(scoped_ptr message) { if (data_channel_ != NULL) data_channel_->Send(*message); } void CloseOnSignalingThread() { if (data_channel_ != NULL) data_channel_->Close(); } // Accessed on signaling thread. rtc::scoped_refptr data_channel_; const scoped_refptr signaling_thread_task_runner_; }; class DataChannelImpl : public AbstractDataChannel { public: DataChannelImpl( SessionDependencyFactory* factory, rtc::Thread* const signaling_thread, rtc::scoped_refptr impl) : factory_(factory), signaling_thread_(signaling_thread), impl_(impl) { } ~DataChannelImpl() override { if (proxy_.get()) { signaling_thread_->Invoke(rtc::Bind( &DataChannelProxyImpl::StopOnSignalingThread, proxy_.get())); } } void RegisterObserver(scoped_ptr observer) override { observer_.reset(new DataChannelObserverImpl(impl_.get(), observer.Pass())); signaling_thread_->Invoke(rtc::Bind( &DataChannelImpl::RegisterObserverOnSignalingThread, this)); } void UnregisterObserver() override { DCHECK(observer_.get() != NULL); impl_->UnregisterObserver(); observer_.reset(); } void SendBinaryMessage(void* data, size_t length) override { SendMessage(data, length, true); } void SendTextMessage(void* data, size_t length) override { SendMessage(data, length, false); } void SendMessage(void* data, size_t length, bool is_binary) { impl_->Send(webrtc::DataBuffer( rtc::Buffer(static_cast(data), length), is_binary)); } void Close() override { impl_->Close(); } scoped_refptr proxy() override { if (!proxy_.get()) proxy_ = new DataChannelProxyImpl(factory_, impl_); return proxy_; } private: void RegisterObserverOnSignalingThread() { // State initialization and observer registration happen atomically // if done on the signaling thread (see rtc::Thread::Send). observer_->InitState(); impl_->RegisterObserver(observer_.get()); } SessionDependencyFactory* const factory_; scoped_refptr proxy_; rtc::Thread* const signaling_thread_; scoped_ptr observer_; const rtc::scoped_refptr impl_; }; class PeerConnectionObserverImpl : public webrtc::PeerConnectionObserver { public: PeerConnectionObserverImpl(AbstractPeerConnection::Delegate* delegate) : delegate_(delegate), connected_(false) { } void OnAddStream(webrtc::MediaStreamInterface* stream) override {} void OnRemoveStream(webrtc::MediaStreamInterface* stream) override {} void OnDataChannel(webrtc::DataChannelInterface* data_channel) override {} void OnRenegotiationNeeded() override {} void OnSignalingChange( webrtc::PeerConnectionInterface::SignalingState new_state) override {} void OnIceConnectionChange( webrtc::PeerConnectionInterface::IceConnectionState new_state) override { bool connected = new_state == webrtc::PeerConnectionInterface::kIceConnectionConnected || new_state == webrtc::PeerConnectionInterface::kIceConnectionCompleted; if (connected != connected_) { connected_ = connected; delegate_->OnIceConnectionChange(connected_); } } void OnIceCandidate(const webrtc::IceCandidateInterface* candidate) override { std::string sdp; candidate->ToString(&sdp); delegate_->OnIceCandidate( candidate->sdp_mid(), candidate->sdp_mline_index(), sdp); } private: AbstractPeerConnection::Delegate* const delegate_; bool connected_; }; /** * Helper object which may outlive PeerConnectionImpl. Provides access * to the connection and the delegate to operaion callback objects * in a safe way. Always accessible on the signaling thread. */ class PeerConnectionHolder : public rtc::RefCountInterface { public: PeerConnectionHolder( rtc::Thread* signaling_thread, webrtc::PeerConnectionInterface* connection, AbstractPeerConnection::Delegate* delegate) : signaling_thread_(signaling_thread), connection_(connection), delegate_(delegate), disposed_(false) { } ~PeerConnectionHolder() override { DCHECK(disposed_); } void Dispose() { DCHECK(!IsDisposed()); disposed_ = true; } webrtc::PeerConnectionInterface* connection() { DCHECK(!IsDisposed()); return connection_; } AbstractPeerConnection::Delegate* delegate() { DCHECK(!IsDisposed()); return delegate_; } bool IsDisposed() { DCHECK(signaling_thread_->IsCurrent()); return disposed_; } private: rtc::Thread* const signaling_thread_; webrtc::PeerConnectionInterface* const connection_; AbstractPeerConnection::Delegate* const delegate_; bool disposed_; }; class CreateAndSetHandler : public webrtc::CreateSessionDescriptionObserver, public webrtc::SetSessionDescriptionObserver { public: explicit CreateAndSetHandler( rtc::scoped_refptr holder) : holder_(holder) { } void OnSuccess(webrtc::SessionDescriptionInterface* desc) override { if (holder_->IsDisposed()) return; type_ = desc->type(); if (desc->ToString(&description_)) { holder_->connection()->SetLocalDescription(this, desc); } else { OnFailure("Can't serialize session description"); } } void OnSuccess() override { if (holder_->IsDisposed()) return; if (type_ == webrtc::SessionDescriptionInterface::kOffer) { holder_->delegate()->OnLocalOfferCreatedAndSetSet(description_); } else { DCHECK_EQ(webrtc::SessionDescriptionInterface::kAnswer, type_); holder_->delegate()->OnLocalAnswerCreatedAndSetSet(description_); } } void OnFailure(const std::string& error) override { if (holder_->IsDisposed()) return; holder_->delegate()->OnFailure(error); } private: const rtc::scoped_refptr holder_; std::string type_; std::string description_; }; class SetRemoteDescriptionHandler : public webrtc::SetSessionDescriptionObserver { public: SetRemoteDescriptionHandler( rtc::scoped_refptr holder) : holder_(holder) { } void OnSuccess() override { if (holder_->IsDisposed()) return; holder_->delegate()->OnRemoteDescriptionSet(); } void OnFailure(const std::string& error) override { if (holder_->IsDisposed()) return; holder_->delegate()->OnFailure(error); } private: const rtc::scoped_refptr holder_; }; class PeerConnectionImpl : public AbstractPeerConnection { public: PeerConnectionImpl( SessionDependencyFactory* const factory, rtc::Thread* signaling_thread, rtc::scoped_refptr connection, scoped_ptr observer, scoped_ptr delegate) : factory_(factory), holder_(new rtc::RefCountedObject( signaling_thread, connection.get(), delegate.get())), signaling_thread_(signaling_thread), connection_(connection), observer_(observer.Pass()), delegate_(delegate.Pass()) { } ~PeerConnectionImpl() override { signaling_thread_->Invoke(rtc::Bind( &PeerConnectionImpl::DisposeOnSignalingThread, this)); } void CreateAndSetLocalOffer() override { connection_->CreateOffer(MakeCreateAndSetHandler(), NULL); } void CreateAndSetLocalAnswer() override { connection_->CreateAnswer(MakeCreateAndSetHandler(), NULL); } void SetRemoteOffer(const std::string& description) override { SetRemoteDescription( webrtc::SessionDescriptionInterface::kOffer, description); } void SetRemoteAnswer(const std::string& description) override { SetRemoteDescription( webrtc::SessionDescriptionInterface::kAnswer, description); } void SetRemoteDescription( const std::string& type, const std::string& description) { webrtc::SdpParseError error; scoped_ptr value( webrtc::CreateSessionDescription(type, description, &error)); if (value == NULL) { OnParseError(error); return; } // Takes ownership on |value|. connection_->SetRemoteDescription( new rtc::RefCountedObject(holder_), value.release()); } void AddIceCandidate(const std::string& sdp_mid, int sdp_mline_index, const std::string& sdp) override { webrtc::SdpParseError error; auto candidate = webrtc::CreateIceCandidate( sdp_mid, sdp_mline_index, sdp, &error); if (candidate == NULL) { OnParseError(error); return; } // Doesn't takes ownership. connection_->AddIceCandidate(candidate); delete candidate; } scoped_ptr CreateDataChannel(int channelId) override { webrtc::DataChannelInit init; init.ordered = true; init.negotiated = true; init.id = channelId; return make_scoped_ptr(new DataChannelImpl( factory_, signaling_thread_, connection_->CreateDataChannel("", &init))); } private: webrtc::CreateSessionDescriptionObserver* MakeCreateAndSetHandler() { return new rtc::RefCountedObject(holder_); } void DisposeOnSignalingThread() { DCHECK(signaling_thread_->IsCurrent()); CheckedRelease(&connection_); holder_->Dispose(); } void OnParseError(const webrtc::SdpParseError& error) { // TODO(serya): Send on signaling thread. } SessionDependencyFactory* const factory_; const rtc::scoped_refptr holder_; rtc::Thread* const signaling_thread_; rtc::scoped_refptr connection_; const scoped_ptr observer_; const scoped_ptr delegate_; }; class SessionDependencyFactoryImpl : public SessionDependencyFactory { public: SessionDependencyFactoryImpl( const base::Closure& cleanup_on_signaling_thread) : cleanup_on_signaling_thread_(cleanup_on_signaling_thread) { signaling_thread_.SetName("signaling_thread", NULL); signaling_thread_.Start(); worker_thread_.SetName("worker_thread", NULL); worker_thread_.Start(); factory_ = webrtc::CreatePeerConnectionFactory( &worker_thread_, &signaling_thread_, NULL, NULL, NULL); } ~SessionDependencyFactoryImpl() override { if (signaling_thread_task_runner_.get()) signaling_thread_task_runner_->Stop(); signaling_thread_.Invoke(rtc::Bind( &SessionDependencyFactoryImpl::DisposeOnSignalingThread, this)); } scoped_ptr CreatePeerConnection( scoped_ptr config, scoped_ptr delegate) override { auto observer = make_scoped_ptr( new PeerConnectionObserverImpl(delegate.get())); MediaConstraints constraints; constraints.AddMandatory( MediaConstraints::kEnableDtlsSrtp, MediaConstraints::kValueTrue); auto connection = factory_->CreatePeerConnection( config->impl(), &constraints, NULL, NULL, observer.get()); return make_scoped_ptr(new PeerConnectionImpl( this, &signaling_thread_, connection, observer.Pass(), delegate.Pass())); } scoped_refptr signaling_thread_task_runner() override { if (!signaling_thread_task_runner_.get()) { signaling_thread_task_runner_ = new SignalingThreadTaskRunner(&signaling_thread_); } return signaling_thread_task_runner_; } scoped_refptr io_thread_task_runner() override { if (!io_thread_.get()) { io_thread_.reset(new base::Thread("devtools bridge IO thread")); base::Thread::Options options; options.message_loop_type = base::MessageLoop::TYPE_IO; CHECK(io_thread_->StartWithOptions(options)); } return io_thread_->task_runner(); } private: void DisposeOnSignalingThread() { DCHECK(signaling_thread_.IsCurrent()); CheckedRelease(&factory_); if (!cleanup_on_signaling_thread_.is_null()) cleanup_on_signaling_thread_.Run(); } scoped_ptr io_thread_; scoped_refptr signaling_thread_task_runner_; base::Closure cleanup_on_signaling_thread_; rtc::Thread signaling_thread_; rtc::Thread worker_thread_; rtc::scoped_refptr factory_; }; } // namespace // RTCCOnfiguration // static scoped_ptr RTCConfiguration::CreateInstance() { return make_scoped_ptr(new RTCConfiguration::Impl()); } // SessionDependencyFactory // static bool SessionDependencyFactory::InitializeSSL() { return rtc::InitializeSSL(); } // static bool SessionDependencyFactory::CleanupSSL() { return rtc::CleanupSSL(); } // static scoped_ptr SessionDependencyFactory::CreateInstance( const base::Closure& cleanup_on_signaling_thread) { return make_scoped_ptr(new SessionDependencyFactoryImpl( cleanup_on_signaling_thread)); } } // namespace devtools_bridge