summaryrefslogtreecommitdiffstats
path: root/third_party/libjingle/source/talk/p2p/base/transport.cc
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/libjingle/source/talk/p2p/base/transport.cc')
-rw-r--r--third_party/libjingle/source/talk/p2p/base/transport.cc443
1 files changed, 443 insertions, 0 deletions
diff --git a/third_party/libjingle/source/talk/p2p/base/transport.cc b/third_party/libjingle/source/talk/p2p/base/transport.cc
new file mode 100644
index 0000000..6be6d6e
--- /dev/null
+++ b/third_party/libjingle/source/talk/p2p/base/transport.cc
@@ -0,0 +1,443 @@
+/*
+ * libjingle
+ * Copyright 2004--2005, Google Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. The name of the author may not be used to endorse or promote products
+ * derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+ * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "talk/p2p/base/transport.h"
+
+#include "talk/base/common.h"
+#include "talk/p2p/base/candidate.h"
+#include "talk/p2p/base/constants.h"
+#include "talk/p2p/base/sessionmanager.h"
+#include "talk/p2p/base/parsing.h"
+#include "talk/p2p/base/transportchannelimpl.h"
+#include "talk/xmllite/xmlelement.h"
+#include "talk/xmpp/constants.h"
+
+namespace cricket {
+
+struct ChannelParams {
+ ChannelParams() : channel(NULL), candidate(NULL) {}
+ explicit ChannelParams(const std::string& name)
+ : name(name), channel(NULL), candidate(NULL) {}
+ ChannelParams(const std::string& name,
+ const std::string& session_type)
+ : name(name), session_type(session_type),
+ channel(NULL), candidate(NULL) {}
+ explicit ChannelParams(cricket::Candidate* candidate) :
+ channel(NULL), candidate(candidate) {
+ name = candidate->name();
+ }
+
+ ~ChannelParams() {
+ delete candidate;
+ }
+
+ std::string name;
+ std::string session_type;
+ cricket::TransportChannelImpl* channel;
+ cricket::Candidate* candidate;
+};
+typedef talk_base::TypedMessageData<ChannelParams*> ChannelMessage;
+
+enum {
+ MSG_CREATECHANNEL = 1,
+ MSG_DESTROYCHANNEL = 2,
+ MSG_DESTROYALLCHANNELS = 3,
+ MSG_CONNECTCHANNELS = 4,
+ MSG_RESETCHANNELS = 5,
+ MSG_ONSIGNALINGREADY = 6,
+ MSG_ONREMOTECANDIDATE = 7,
+ MSG_READSTATE = 8,
+ MSG_WRITESTATE = 9,
+ MSG_REQUESTSIGNALING = 10,
+ MSG_ONCHANNELCANDIDATEREADY = 11,
+ MSG_CONNECTING = 12,
+};
+
+Transport::Transport(talk_base::Thread* worker_thread, const std::string& name,
+ PortAllocator* allocator)
+ : signaling_thread_(talk_base::Thread::Current()),
+ worker_thread_(worker_thread), name_(name), allocator_(allocator),
+ destroyed_(false), readable_(false), writable_(false),
+ connect_requested_(false), allow_local_ips_(false) {
+}
+
+Transport::~Transport() {
+ ASSERT(signaling_thread_->IsCurrent());
+ ASSERT(destroyed_);
+}
+
+TransportChannelImpl* Transport::CreateChannel(
+ const std::string& name, const std::string& session_type) {
+ ChannelParams params(name, session_type);
+ ChannelMessage msg(&params);
+ worker_thread()->Send(this, MSG_CREATECHANNEL, &msg);
+ return msg.data()->channel;
+}
+
+TransportChannelImpl* Transport::CreateChannel_w(
+ const std::string& name, const std::string& session_type) {
+ ASSERT(worker_thread()->IsCurrent());
+
+ TransportChannelImpl* impl = CreateTransportChannel(name, session_type);
+ impl->SignalReadableState.connect(this, &Transport::OnChannelReadableState);
+ impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState);
+ impl->SignalRequestSignaling.connect(
+ this, &Transport::OnChannelRequestSignaling);
+ impl->SignalCandidateReady.connect(this, &Transport::OnChannelCandidateReady);
+
+ talk_base::CritScope cs(&crit_);
+ ASSERT(channels_.find(name) == channels_.end());
+ channels_[name] = impl;
+ destroyed_ = false;
+ if (connect_requested_) {
+ impl->Connect();
+ if (channels_.size() == 1) {
+ // If this is the first channel, then indicate that we have started
+ // connecting.
+ signaling_thread()->Post(this, MSG_CONNECTING, NULL);
+ }
+ }
+ return impl;
+}
+
+TransportChannelImpl* Transport::GetChannel(const std::string& name) {
+ talk_base::CritScope cs(&crit_);
+ ChannelMap::iterator iter = channels_.find(name);
+ return (iter != channels_.end()) ? iter->second : NULL;
+}
+
+bool Transport::HasChannels() {
+ talk_base::CritScope cs(&crit_);
+ return !channels_.empty();
+}
+
+void Transport::DestroyChannel(const std::string& name) {
+ ChannelParams params(name);
+ ChannelMessage msg(&params);
+ worker_thread()->Send(this, MSG_DESTROYCHANNEL, &msg);
+}
+
+void Transport::DestroyChannel_w(const std::string& name) {
+ ASSERT(worker_thread()->IsCurrent());
+ TransportChannelImpl* impl = NULL;
+ {
+ talk_base::CritScope cs(&crit_);
+ ChannelMap::iterator iter = channels_.find(name);
+ ASSERT(iter != channels_.end());
+ impl = iter->second;
+ channels_.erase(iter);
+ }
+
+ if (connect_requested_ && channels_.empty()) {
+ // We're not longer attempting to connect.
+ signaling_thread()->Post(this, MSG_CONNECTING, NULL);
+ }
+
+ if (impl)
+ DestroyTransportChannel(impl);
+}
+
+void Transport::ConnectChannels() {
+ ASSERT(signaling_thread()->IsCurrent());
+ worker_thread()->Post(this, MSG_CONNECTCHANNELS, NULL);
+}
+
+void Transport::ConnectChannels_w() {
+ ASSERT(worker_thread()->IsCurrent());
+ if (connect_requested_)
+ return;
+ connect_requested_ = true;
+ signaling_thread()->Post(
+ this, MSG_ONCHANNELCANDIDATEREADY, NULL);
+ CallChannels_w(&TransportChannelImpl::Connect);
+ if (!channels_.empty()) {
+ signaling_thread()->Post(this, MSG_CONNECTING, NULL);
+ }
+}
+
+void Transport::OnConnecting_s() {
+ ASSERT(signaling_thread()->IsCurrent());
+ SignalConnecting(this);
+}
+
+void Transport::DestroyAllChannels() {
+ ASSERT(signaling_thread()->IsCurrent());
+ worker_thread()->Send(this, MSG_DESTROYALLCHANNELS, NULL);
+ destroyed_ = true;
+}
+
+void Transport::DestroyAllChannels_w() {
+ ASSERT(worker_thread()->IsCurrent());
+ std::vector<TransportChannelImpl*> impls;
+ {
+ talk_base::CritScope cs(&crit_);
+ for (ChannelMap::iterator iter = channels_.begin();
+ iter != channels_.end();
+ ++iter) {
+ impls.push_back(iter->second);
+ }
+ channels_.clear();
+ }
+
+ for (size_t i = 0; i < impls.size(); ++i)
+ DestroyTransportChannel(impls[i]);
+}
+
+void Transport::ResetChannels() {
+ ASSERT(signaling_thread()->IsCurrent());
+ worker_thread()->Post(this, MSG_RESETCHANNELS, NULL);
+}
+
+void Transport::ResetChannels_w() {
+ ASSERT(worker_thread()->IsCurrent());
+
+ // We are no longer attempting to connect
+ connect_requested_ = false;
+
+ // Clear out the old messages, they aren't relevant
+ talk_base::CritScope cs(&crit_);
+ ready_candidates_.clear();
+
+ // Reset all of the channels
+ CallChannels_w(&TransportChannelImpl::Reset);
+}
+
+void Transport::OnSignalingReady() {
+ ASSERT(signaling_thread()->IsCurrent());
+ worker_thread()->Post(this, MSG_ONSIGNALINGREADY, NULL);
+
+ // Notify the subclass.
+ OnTransportSignalingReady();
+}
+
+void Transport::CallChannels_w(TransportChannelFunc func) {
+ ASSERT(worker_thread()->IsCurrent());
+ talk_base::CritScope cs(&crit_);
+ for (ChannelMap::iterator iter = channels_.begin();
+ iter != channels_.end();
+ ++iter) {
+ ((iter->second)->*func)();
+ }
+}
+
+void Transport::OnRemoteCandidates(const std::vector<Candidate>& candidates) {
+ for (std::vector<Candidate>::const_iterator iter = candidates.begin();
+ iter != candidates.end();
+ ++iter) {
+ OnRemoteCandidate(*iter);
+ }
+}
+
+void Transport::OnRemoteCandidate(const Candidate& candidate) {
+ ASSERT(signaling_thread()->IsCurrent());
+ ASSERT(HasChannel(candidate.name()));
+ // new candidate deleted when params is deleted
+ ChannelParams* params = new ChannelParams(new Candidate(candidate));
+ ChannelMessage* msg = new ChannelMessage(params);
+ worker_thread()->Post(this, MSG_ONREMOTECANDIDATE, msg);
+}
+
+void Transport::OnRemoteCandidate_w(const Candidate& candidate) {
+ ASSERT(worker_thread()->IsCurrent());
+ ChannelMap::iterator iter = channels_.find(candidate.name());
+ // It's ok for a channel to go away while this message is in transit.
+ if (iter != channels_.end()) {
+ iter->second->OnCandidate(candidate);
+ }
+}
+
+void Transport::OnChannelReadableState(TransportChannel* channel) {
+ ASSERT(worker_thread()->IsCurrent());
+ signaling_thread()->Post(this, MSG_READSTATE, NULL);
+}
+
+void Transport::OnChannelReadableState_s() {
+ ASSERT(signaling_thread()->IsCurrent());
+ bool readable = GetTransportState_s(true);
+ if (readable_ != readable) {
+ readable_ = readable;
+ SignalReadableState(this);
+ }
+}
+
+void Transport::OnChannelWritableState(TransportChannel* channel) {
+ ASSERT(worker_thread()->IsCurrent());
+ signaling_thread()->Post(this, MSG_WRITESTATE, NULL);
+}
+
+void Transport::OnChannelWritableState_s() {
+ ASSERT(signaling_thread()->IsCurrent());
+ bool writable = GetTransportState_s(false);
+ if (writable_ != writable) {
+ writable_ = writable;
+ SignalWritableState(this);
+ }
+}
+
+bool Transport::GetTransportState_s(bool read) {
+ ASSERT(signaling_thread()->IsCurrent());
+ bool result = false;
+ talk_base::CritScope cs(&crit_);
+ for (ChannelMap::iterator iter = channels_.begin();
+ iter != channels_.end();
+ ++iter) {
+ bool b = (read ? iter->second->readable() : iter->second->writable());
+ result = result || b;
+ }
+ return result;
+}
+
+void Transport::OnChannelRequestSignaling() {
+ ASSERT(worker_thread()->IsCurrent());
+ signaling_thread()->Post(this, MSG_REQUESTSIGNALING, NULL);
+}
+
+void Transport::OnChannelRequestSignaling_s() {
+ ASSERT(signaling_thread()->IsCurrent());
+ SignalRequestSignaling(this);
+}
+
+void Transport::OnChannelCandidateReady(TransportChannelImpl* channel,
+ const Candidate& candidate) {
+ ASSERT(worker_thread()->IsCurrent());
+ talk_base::CritScope cs(&crit_);
+ ready_candidates_.push_back(candidate);
+
+ // We hold any messages until the client lets us connect.
+ if (connect_requested_) {
+ signaling_thread()->Post(
+ this, MSG_ONCHANNELCANDIDATEREADY, NULL);
+ }
+}
+
+void Transport::OnChannelCandidateReady_s() {
+ ASSERT(signaling_thread()->IsCurrent());
+ ASSERT(connect_requested_);
+
+ std::vector<Candidate> candidates;
+ {
+ talk_base::CritScope cs(&crit_);
+ candidates.swap(ready_candidates_);
+ }
+
+ // we do the deleting of Candidate* here to keep the new above and
+ // delete below close to each other
+ if (!candidates.empty()) {
+ SignalCandidatesReady(this, candidates);
+ }
+}
+
+void Transport::OnMessage(talk_base::Message* msg) {
+ switch (msg->message_id) {
+ case MSG_CREATECHANNEL:
+ {
+ ChannelParams* params = static_cast<ChannelMessage*>(msg->pdata)->data();
+ params->channel = CreateChannel_w(params->name, params->session_type);
+ }
+ break;
+ case MSG_DESTROYCHANNEL:
+ {
+ ChannelParams* params = static_cast<ChannelMessage*>(msg->pdata)->data();
+ DestroyChannel_w(params->name);
+ }
+ break;
+ case MSG_CONNECTCHANNELS:
+ ConnectChannels_w();
+ break;
+ case MSG_RESETCHANNELS:
+ ResetChannels_w();
+ break;
+ case MSG_DESTROYALLCHANNELS:
+ DestroyAllChannels_w();
+ break;
+ case MSG_ONSIGNALINGREADY:
+ CallChannels_w(&TransportChannelImpl::OnSignalingReady);
+ break;
+ case MSG_ONREMOTECANDIDATE:
+ {
+ ChannelParams* params = static_cast<ChannelMessage*>(msg->pdata)->data();
+ OnRemoteCandidate_w(*(params->candidate));
+ delete params;
+ }
+ break;
+ case MSG_CONNECTING:
+ OnConnecting_s();
+ break;
+ case MSG_READSTATE:
+ OnChannelReadableState_s();
+ break;
+ case MSG_WRITESTATE:
+ OnChannelWritableState_s();
+ break;
+ case MSG_REQUESTSIGNALING:
+ OnChannelRequestSignaling_s();
+ break;
+ case MSG_ONCHANNELCANDIDATEREADY:
+ OnChannelCandidateReady_s();
+ break;
+ }
+}
+
+bool Transport::ParseAddress(const buzz::XmlElement* elem,
+ const buzz::QName& address_name,
+ const buzz::QName& port_name,
+ talk_base::SocketAddress* address,
+ ParseError* error) {
+ ASSERT(elem->HasAttr(address_name));
+ ASSERT(elem->HasAttr(port_name));
+
+ // Record the parts of the address.
+ address->SetIP(elem->Attr(address_name));
+ std::istringstream ist(elem->Attr(port_name));
+ int port;
+ ist >> port;
+ address->SetPort(port);
+
+ // No address zero.
+ if (address->IsAny()) {
+ return BadParse("candidate has address of zero", error);
+ }
+
+ // Always disallow addresses that refer to the local host.
+ if (address->IsLocalIP() && !allow_local_ips_)
+ return BadParse("candidate has local IP address", error);
+
+ // Disallow all ports below 1024, except for 80 and 443 on public addresses.
+ if (port < 1024) {
+ if ((port != 80) && (port != 443))
+ return BadParse(
+ "candidate has port below 1024, but not 80 or 443", error);
+ if (address->IsPrivateIP()) {
+ return BadParse(
+ "candidate has port of 80 or 443 with private IP address", error);
+ }
+ }
+
+ return true;
+}
+
+} // namespace cricket