// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "remoting/jingle_glue/jingle_channel.h" #include "base/lock.h" #include "base/logging.h" #include "base/message_loop.h" #include "base/waitable_event.h" #include "media/base/data_buffer.h" #include "remoting/jingle_glue/jingle_thread.h" #include "third_party/libjingle/source/talk/base/stream.h" using media::DataBuffer; namespace remoting { // Size of a read buffer chunk in bytes. const size_t kReadBufferSize = 4096; JingleChannel::JingleChannel(Callback* callback) : state_(INITIALIZING), callback_(callback), closed_(false), event_handler_(this), write_buffer_size_(0), current_write_buf_pos_(0) { DCHECK(callback_ != NULL); } // This constructor is only used in unit test. JingleChannel::JingleChannel() : state_(CLOSED), closed_(false), write_buffer_size_(0), current_write_buf_pos_(0) { } JingleChannel::~JingleChannel() { DCHECK(closed_ || stream_ == NULL); } void JingleChannel::Init(JingleThread* thread, talk_base::StreamInterface* stream, const std::string& jid) { thread_ = thread; stream_.reset(stream); stream_->SignalEvent.connect(&event_handler_, &EventHandler::OnStreamEvent); jid_ = jid; // Initialize |state_|. switch (stream->GetState()) { case talk_base::SS_CLOSED: SetState(CLOSED); break; case talk_base::SS_OPENING: SetState(CONNECTING); break; case talk_base::SS_OPEN: SetState(OPEN); // Try to read in case there is something in the stream. thread_->message_loop()->PostTask( FROM_HERE, NewRunnableMethod(this, &JingleChannel::DoRead)); break; default: NOTREACHED(); } } void JingleChannel::Write(scoped_refptr data) { // Discard empty packets. if (data->GetDataSize() != 0) { AutoLock auto_lock(write_lock_); write_queue_.push_back(data); write_buffer_size_ += data->GetDataSize(); // Post event so that the data gets written in the tunnel thread. thread_->message_loop()->PostTask( FROM_HERE, NewRunnableMethod(this, &JingleChannel::DoWrite)); } } void JingleChannel::DoRead() { while (true) { size_t bytes_to_read; if (stream_->GetAvailable(&bytes_to_read)) { // Return immediately if we know there is nothing to read. if (bytes_to_read == 0) return; } else { // Try to read kReadBufferSize if the stream doesn't support // GetAvailable(). bytes_to_read = kReadBufferSize; } scoped_refptr buffer( new DataBuffer(new uint8[bytes_to_read], kReadBufferSize)); size_t bytes_read; int error; talk_base::StreamResult result = stream_->Read( buffer->GetWritableData(), bytes_to_read, &bytes_read, &error); switch (result) { case talk_base::SR_SUCCESS: { DCHECK_GT(bytes_read, 0U); buffer->SetDataSize(bytes_read); { AutoLock auto_lock(state_lock_); // Drop received data if the channel is already closed. if (!closed_) callback_->OnPacketReceived(this, buffer); } break; } case talk_base::SR_BLOCK: { return; } case talk_base::SR_EOS: { SetState(CLOSED); return; } case talk_base::SR_ERROR: { SetState(FAILED); return; } } } } void JingleChannel::DoWrite() { while (true) { if (!current_write_buf_) { AutoLock auto_lock(write_lock_); if (write_queue_.empty()) break; current_write_buf_ = write_queue_.front(); current_write_buf_pos_ = 0; write_queue_.pop_front(); } size_t bytes_written; int error; talk_base::StreamResult result = stream_->Write( current_write_buf_->GetData() + current_write_buf_pos_, current_write_buf_->GetDataSize() - current_write_buf_pos_, &bytes_written, &error); switch (result) { case talk_base::SR_SUCCESS: { current_write_buf_pos_ += bytes_written; if (current_write_buf_pos_ >= current_write_buf_->GetDataSize()) current_write_buf_ = NULL; { AutoLock auto_lock(write_lock_); write_buffer_size_ -= bytes_written; } break; } case talk_base::SR_BLOCK: { return; } case talk_base::SR_EOS: { SetState(CLOSED); return; } case talk_base::SR_ERROR: { SetState(FAILED); return; } } } } void JingleChannel::OnStreamEvent(talk_base::StreamInterface* stream, int events, int error) { if (events & talk_base::SE_OPEN) SetState(OPEN); if (state_ == OPEN && (events & talk_base::SE_WRITE)) DoWrite(); if (state_ == OPEN && (events & talk_base::SE_READ)) DoRead(); if (events & talk_base::SE_CLOSE) SetState(CLOSED); } void JingleChannel::SetState(State new_state) { if (new_state != state_) { state_ = new_state; { AutoLock auto_lock(state_lock_); if (!closed_) callback_->OnStateChange(this, new_state); } } } void JingleChannel::Close() { Close(NULL); } void JingleChannel::Close(Task* closed_task) { { AutoLock auto_lock(state_lock_); if (closed_) { // We are already closed. if (closed_task) thread_->message_loop()->PostTask(FROM_HERE, closed_task); return; } closed_ = true; if (closed_task) closed_task_.reset(closed_task); } thread_->message_loop()->PostTask( FROM_HERE, NewRunnableMethod(this, &JingleChannel::DoClose)); } void JingleChannel::DoClose() { DCHECK(closed_); stream_->Close(); stream_.reset(); // TODO(sergeyu): Even though we have called Close() for the stream, it // doesn't mean that the p2p sessions has been closed. I.e. |closed_task_| // is called too early. If the client is closed right after that the other // side will not receive notification that the channel was closed. if (closed_task_.get()) { closed_task_->Run(); closed_task_.reset(); } } size_t JingleChannel::write_buffer_size() { AutoLock auto_lock(write_lock_); return write_buffer_size_; } } // namespace remoting