summaryrefslogtreecommitdiffstats
path: root/media
diff options
context:
space:
mode:
authorpwestin@google.com <pwestin@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2013-09-12 00:05:01 +0000
committerpwestin@google.com <pwestin@google.com@0039d316-1c4b-4281-b951-d872f2087c98>2013-09-12 00:05:01 +0000
commit428c39804a10f55c0ae81392b25806bc0117d053 (patch)
tree8a913d600a181cf7d03d01120c3dab83437985ec /media
parente05f7f3d6db7108e73a92fb004ded36fa3e7624d (diff)
downloadchromium_src-428c39804a10f55c0ae81392b25806bc0117d053.zip
chromium_src-428c39804a10f55c0ae81392b25806bc0117d053.tar.gz
chromium_src-428c39804a10f55c0ae81392b25806bc0117d053.tar.bz2
Added cast sender impl and made required modifications to the pacer.
BUG= Review URL: https://chromiumcodereview.appspot.com/24051002 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@222666 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'media')
-rw-r--r--media/cast/audio_sender/audio_sender.cc4
-rw-r--r--media/cast/audio_sender/audio_sender.h3
-rw-r--r--media/cast/cast_config.h9
-rw-r--r--media/cast/cast_receiver.h17
-rw-r--r--media/cast/cast_sender.gyp9
-rw-r--r--media/cast/cast_sender.h5
-rw-r--r--media/cast/cast_sender_impl.cc176
-rw-r--r--media/cast/cast_sender_impl.h55
-rw-r--r--media/cast/pacing/paced_sender.cc44
-rw-r--r--media/cast/pacing/paced_sender.h29
-rw-r--r--media/cast/pacing/paced_sender_unittest.cc207
-rw-r--r--media/cast/rtcp/rtcp.cc33
-rw-r--r--media/cast/rtcp/rtcp.h2
-rw-r--r--media/cast/rtp_sender/rtp_packetizer/rtp_packetizer.h2
-rw-r--r--media/cast/video_sender/video_sender.cc4
-rw-r--r--media/cast/video_sender/video_sender.h3
16 files changed, 490 insertions, 112 deletions
diff --git a/media/cast/audio_sender/audio_sender.cc b/media/cast/audio_sender/audio_sender.cc
index 70149dc..39fccda 100644
--- a/media/cast/audio_sender/audio_sender.cc
+++ b/media/cast/audio_sender/audio_sender.cc
@@ -141,8 +141,10 @@ void AudioSender::ResendPackets(
rtp_sender_.ResendPackets(missing_frames_and_packets);
}
-void AudioSender::IncomingRtcpPacket(const uint8* packet, int length) {
+void AudioSender::IncomingRtcpPacket(const uint8* packet, int length,
+ const base::Closure callback) {
rtcp_.IncomingRtcpPacket(packet, length);
+ cast_thread_->PostTask(CastThread::MAIN, FROM_HERE, callback);
}
void AudioSender::ScheduleNextRtcpReport() {
diff --git a/media/cast/audio_sender/audio_sender.h b/media/cast/audio_sender/audio_sender.h
index 30d48b4..3d389b3 100644
--- a/media/cast/audio_sender/audio_sender.h
+++ b/media/cast/audio_sender/audio_sender.h
@@ -54,7 +54,8 @@ class AudioSender : public base::NonThreadSafe,
const base::Closure callback);
// Only called from the main cast thread.
- void IncomingRtcpPacket(const uint8* packet, int length);
+ void IncomingRtcpPacket(const uint8* packet, int length,
+ const base::Closure callback);
// Only used for testing.
void set_clock(base::TickClock* clock) {
diff --git a/media/cast/cast_config.h b/media/cast/cast_config.h
index e1280cd..988924a 100644
--- a/media/cast/cast_config.h
+++ b/media/cast/cast_config.h
@@ -9,6 +9,8 @@
#include <vector>
#include "base/basictypes.h"
+#include "base/callback.h"
+#include "base/memory/ref_counted.h"
#include "media/cast/cast_defines.h"
namespace media {
@@ -175,17 +177,16 @@ class PacketSender {
// All packets to be sent to the network will be delivered via this function.
virtual bool SendPacket(const uint8* packet, int length) = 0;
- protected:
virtual ~PacketSender() {}
};
-class PacketReceiver {
+class PacketReceiver : public base::RefCountedThreadSafe<PacketReceiver> {
public:
// All packets received from the network should be delivered via this
// function.
- virtual void ReceivedPacket(const uint8* packet, int length) = 0;
+ virtual void ReceivedPacket(const uint8* packet, int length,
+ const base::Closure callback) = 0;
- protected:
virtual ~PacketReceiver() {}
};
diff --git a/media/cast/cast_receiver.h b/media/cast/cast_receiver.h
index 3dafbe5..fa09721 100644
--- a/media/cast/cast_receiver.h
+++ b/media/cast/cast_receiver.h
@@ -1,6 +1,9 @@
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+//
+// This is the main interface for the cast receiver. All configuration are done
+// at creation.
#ifndef MEDIA_CAST_CAST_RECEIVER_H_
#define MEDIA_CAST_CAST_RECEIVER_H_
@@ -12,8 +15,10 @@
namespace media {
namespace cast {
-class FrameReceiver {
+// This Class is thread safe.
+class FrameReceiver : public base::RefCountedThreadSafe<FrameReceiver>{
public:
+ // TODO(pwestin): These functions must be updated.
virtual bool GetRawVideoFrame(I420VideoFrame* video_frame,
base::TimeTicks* render_time) = 0;
@@ -32,24 +37,24 @@ class FrameReceiver {
virtual void ReleaseCodedAudioFrame(uint8 frame_id) = 0;
-protected:
virtual ~FrameReceiver() {}
};
+// This Class is thread safe.
class CastReceiver {
public:
static CastReceiver* CreateCastReceiver(
+ scoped_refptr<CastThread> cast_thread,
const AudioReceiverConfig& audio_config,
const VideoReceiverConfig& video_config,
PacketSender* const packet_sender);
// All received RTP and RTCP packets for the call should be inserted to this
- // PacketReceiver. The PacketReceiver pointer is valid as long as the
- // CastReceiver instance exists.
- virtual PacketReceiver* packet_receiver() = 0;
+ // PacketReceiver.
+ virtual scoped_refptr<PacketReceiver> packet_receiver() = 0;
// Polling interface to get audio and video frames from the CastReceiver.
- virtual FrameReceiver* frame_receiver() = 0;
+ virtual scoped_refptr<FrameReceiver> frame_receiver() = 0;
virtual ~CastReceiver() {};
};
diff --git a/media/cast/cast_sender.gyp b/media/cast/cast_sender.gyp
index 591a3b7..fe99f80 100644
--- a/media/cast/cast_sender.gyp
+++ b/media/cast/cast_sender.gyp
@@ -12,10 +12,15 @@
{
'target_name': 'cast_sender_impl',
'type': 'static_library',
+ 'include_dirs': [
+ '<(DEPTH)/',
+ '<(DEPTH)/third_party/',
+ '<(DEPTH)/third_party/webrtc/',
+ ],
'sources': [
'cast_sender.h',
-# 'cast_sender_impl.cc',
-# 'cast_sender_impl.h',
+ 'cast_sender_impl.cc',
+ 'cast_sender_impl.h',
], # source
'dependencies': [
'audio_sender',
diff --git a/media/cast/cast_sender.h b/media/cast/cast_sender.h
index 254370b..f4d3653 100644
--- a/media/cast/cast_sender.h
+++ b/media/cast/cast_sender.h
@@ -22,7 +22,7 @@ namespace media {
namespace cast {
// This Class is thread safe.
-class FrameInput {
+class FrameInput : public base::RefCountedThreadSafe<PacketReceiver> {
public:
// The video_frame must be valid until the callback is called.
// The callback is called from the main cast thread as soon as
@@ -56,11 +56,12 @@ class FrameInput {
const base::TimeTicks& recorded_time,
const base::Closure callback) = 0;
- protected:
virtual ~FrameInput() {}
};
// This Class is thread safe.
+// The provided PacketSender object will always be called form the main cast
+// thread.
class CastSender {
public:
static CastSender* CreateCastSender(
diff --git a/media/cast/cast_sender_impl.cc b/media/cast/cast_sender_impl.cc
new file mode 100644
index 0000000..76f2f99
--- /dev/null
+++ b/media/cast/cast_sender_impl.cc
@@ -0,0 +1,176 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+#include "media/cast/cast_sender_impl.h"
+
+#include "base/bind.h"
+#include "base/callback.h"
+#include "base/logging.h"
+#include "base/message_loop/message_loop.h"
+
+namespace media {
+namespace cast {
+
+// The LocalFrameInput class posts all incoming frames; audio and video to the
+// main cast thread for processing.
+// This make the cast sender interface thread safe.
+class LocalFrameInput : public FrameInput {
+ public:
+ LocalFrameInput(scoped_refptr<CastThread> cast_thread,
+ base::WeakPtr<AudioSender> audio_sender,
+ base::WeakPtr<VideoSender> video_sender)
+ : cast_thread_(cast_thread),
+ audio_sender_(audio_sender),
+ video_sender_(video_sender) {}
+
+ virtual void InsertRawVideoFrame(const I420VideoFrame* video_frame,
+ const base::TimeTicks& capture_time,
+ const base::Closure callback) OVERRIDE {
+ cast_thread_->PostTask(CastThread::MAIN, FROM_HERE,
+ base::Bind(&VideoSender::InsertRawVideoFrame, video_sender_,
+ video_frame, capture_time, callback));
+ }
+
+ virtual void InsertCodedVideoFrame(const EncodedVideoFrame* video_frame,
+ const base::TimeTicks& capture_time,
+ const base::Closure callback) OVERRIDE {
+ cast_thread_->PostTask(CastThread::MAIN, FROM_HERE,
+ base::Bind(&VideoSender::InsertCodedVideoFrame, video_sender_,
+ video_frame, capture_time, callback));
+ }
+
+ virtual void InsertRawAudioFrame(const PcmAudioFrame* audio_frame,
+ const base::TimeTicks& recorded_time,
+ const base::Closure callback) OVERRIDE {
+ cast_thread_->PostTask(CastThread::MAIN, FROM_HERE,
+ base::Bind(&AudioSender::InsertRawAudioFrame, audio_sender_,
+ audio_frame, recorded_time, callback));
+ }
+
+ virtual void InsertCodedAudioFrame(const EncodedAudioFrame* audio_frame,
+ const base::TimeTicks& recorded_time,
+ const base::Closure callback) OVERRIDE {
+ cast_thread_->PostTask(CastThread::MAIN, FROM_HERE,
+ base::Bind(&AudioSender::InsertCodedAudioFrame, audio_sender_,
+ audio_frame, recorded_time, callback));
+ }
+
+ private:
+ scoped_refptr<CastThread> cast_thread_;
+ base::WeakPtr<AudioSender> audio_sender_;
+ base::WeakPtr<VideoSender> video_sender_;
+};
+
+// LocalCastSenderPacketReceiver handle the incoming packets to the cast sender
+// it's only expected to receive RTCP feedback packets from the remote cast
+// receiver. The class verifies that that it is a RTCP packet and based on the
+// SSRC of the incoming packet route the packet to the correct sender; audio or
+// video.
+//
+// Definition of SSRC as defined in RFC 3550.
+// Synchronization source (SSRC): The source of a stream of RTP
+// packets, identified by a 32-bit numeric SSRC identifier carried in
+// the RTP header so as not to be dependent upon the network address.
+// All packets from a synchronization source form part of the same
+// timing and sequence number space, so a receiver groups packets by
+// synchronization source for playback. Examples of synchronization
+// sources include the sender of a stream of packets derived from a
+// signal source such as a microphone or a camera, or an RTP mixer
+// (see below). A synchronization source may change its data format,
+// e.g., audio encoding, over time. The SSRC identifier is a
+// randomly chosen value meant to be globally unique within a
+// particular RTP session (see Section 8). A participant need not
+// use the same SSRC identifier for all the RTP sessions in a
+// multimedia session; the binding of the SSRC identifiers is
+// provided through RTCP (see Section 6.5.1). If a participant
+// generates multiple streams in one RTP session, for example from
+// separate video cameras, each MUST be identified as a different
+// SSRC.
+
+class LocalCastSenderPacketReceiver : public PacketReceiver {
+ public:
+ LocalCastSenderPacketReceiver(scoped_refptr<CastThread> cast_thread,
+ base::WeakPtr<AudioSender> audio_sender,
+ base::WeakPtr<VideoSender> video_sender,
+ uint32 ssrc_of_audio_sender,
+ uint32 ssrc_of_video_sender)
+ : cast_thread_(cast_thread),
+ audio_sender_(audio_sender),
+ video_sender_(video_sender),
+ ssrc_of_audio_sender_(ssrc_of_audio_sender),
+ ssrc_of_video_sender_(ssrc_of_video_sender) {}
+
+ virtual ~LocalCastSenderPacketReceiver() {}
+
+ virtual void ReceivedPacket(const uint8* packet,
+ int length,
+ const base::Closure callback) OVERRIDE {
+ if (!Rtcp::IsRtcpPacket(packet, length)) {
+ // We should have no incoming RTP packets.
+ // No action; just log and call the callback informing that we are done
+ // with the packet.
+ VLOG(1) << "Unexpectedly received a RTP packet in the cast sender";
+ cast_thread_->PostTask(CastThread::MAIN, FROM_HERE, callback);
+ return;
+ }
+ uint32 ssrc_of_sender = Rtcp::GetSsrcOfSender(packet, length);
+ if (ssrc_of_sender == ssrc_of_audio_sender_) {
+ cast_thread_->PostTask(CastThread::MAIN, FROM_HERE,
+ base::Bind(&AudioSender::IncomingRtcpPacket, audio_sender_,
+ packet, length, callback));
+ } else if (ssrc_of_sender == ssrc_of_video_sender_) {
+ cast_thread_->PostTask(CastThread::MAIN, FROM_HERE,
+ base::Bind(&VideoSender::IncomingRtcpPacket, video_sender_,
+ packet, length, callback));
+ } else {
+ // No action; just log and call the callback informing that we are done
+ // with the packet.
+ VLOG(1) << "Received a RTCP packet with a non matching sender SSRC "
+ << ssrc_of_sender;
+
+ cast_thread_->PostTask(CastThread::MAIN, FROM_HERE, callback);
+ }
+ }
+
+ private:
+ scoped_refptr<CastThread> cast_thread_;
+ base::WeakPtr<AudioSender> audio_sender_;
+ base::WeakPtr<VideoSender> video_sender_;
+ uint32 ssrc_of_audio_sender_;
+ uint32 ssrc_of_video_sender_;
+};
+
+CastSender* CastSender::CreateCastSender(
+ scoped_refptr<CastThread> cast_thread,
+ const AudioSenderConfig& audio_config,
+ const VideoSenderConfig& video_config,
+ VideoEncoderController* const video_encoder_controller,
+ PacketSender* const packet_sender) {
+ return new CastSenderImpl(cast_thread,
+ audio_config,
+ video_config,
+ video_encoder_controller,
+ packet_sender);
+}
+
+CastSenderImpl::CastSenderImpl(
+ scoped_refptr<CastThread> cast_thread,
+ const AudioSenderConfig& audio_config,
+ const VideoSenderConfig& video_config,
+ VideoEncoderController* const video_encoder_controller,
+ PacketSender* const packet_sender)
+ : pacer_(cast_thread, packet_sender),
+ audio_sender_(cast_thread, audio_config, &pacer_),
+ video_sender_(cast_thread, video_config, video_encoder_controller,
+ &pacer_),
+ frame_input_(new LocalFrameInput(cast_thread, audio_sender_.AsWeakPtr(),
+ video_sender_.AsWeakPtr())),
+ packet_receiver_(new LocalCastSenderPacketReceiver(cast_thread,
+ audio_sender_.AsWeakPtr(), video_sender_.AsWeakPtr(),
+ audio_config.incoming_feedback_ssrc,
+ video_config.incoming_feedback_ssrc)) {}
+
+CastSenderImpl::~CastSenderImpl() {}
+
+} // namespace cast
+} // namespace media
diff --git a/media/cast/cast_sender_impl.h b/media/cast/cast_sender_impl.h
new file mode 100644
index 0000000..eb19caa
--- /dev/null
+++ b/media/cast/cast_sender_impl.h
@@ -0,0 +1,55 @@
+// Copyright 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+#ifndef MEDIA_CAST_CAST_SENDER_IMPL_H_
+#define MEDIA_CAST_CAST_SENDER_IMPL_H_
+
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "media/cast/audio_sender/audio_sender.h"
+#include "media/cast/cast_config.h"
+#include "media/cast/cast_sender.h"
+#include "media/cast/cast_thread.h"
+#include "media/cast/pacing/paced_sender.h"
+#include "media/cast/video_sender/video_sender.h"
+
+namespace media {
+namespace cast {
+
+class AudioSender;
+class PacedSender;
+class VideoSender;
+
+// This calls is a pure owner class that group all required sending objects
+// together such as pacer, packet receiver, frame input, audio and video sender.
+class CastSenderImpl : public CastSender {
+ public:
+ CastSenderImpl(scoped_refptr<CastThread> cast_thread,
+ const AudioSenderConfig& audio_config,
+ const VideoSenderConfig& video_config,
+ VideoEncoderController* const video_encoder_controller,
+ PacketSender* const packet_sender);
+
+ virtual ~CastSenderImpl();
+
+ virtual scoped_refptr<FrameInput> frame_input() OVERRIDE {
+ return frame_input_;
+ }
+
+ virtual scoped_refptr<PacketReceiver> packet_receiver() OVERRIDE {
+ return packet_receiver_;
+ }
+
+ private:
+ PacedSender pacer_;
+ AudioSender audio_sender_;
+ VideoSender video_sender_;
+ scoped_refptr<FrameInput> frame_input_;
+ scoped_refptr<PacketReceiver> packet_receiver_;
+};
+
+} // namespace cast
+} // namespace media
+
+#endif // MEDIA_CAST_CAST_SENDER_IMPL_H_
+
diff --git a/media/cast/pacing/paced_sender.cc b/media/cast/pacing/paced_sender.cc
index f89361b..d2935f3 100644
--- a/media/cast/pacing/paced_sender.cc
+++ b/media/cast/pacing/paced_sender.cc
@@ -4,19 +4,25 @@
#include "media/cast/pacing/paced_sender.h"
+#include "base/bind.h"
+#include "base/logging.h"
+#include "base/message_loop/message_loop.h"
+
namespace media {
namespace cast {
static const int64 kPacingIntervalMs = 10;
-static const int64 kPacingMinIntervalMs = 7;
static const int kPacingMaxBurstsPerFrame = 3;
-PacedSender::PacedSender(PacketSender* transport)
- : burst_size_(1),
+PacedSender::PacedSender(scoped_refptr<CastThread> cast_thread,
+ PacketSender* transport)
+ : cast_thread_(cast_thread),
+ burst_size_(1),
packets_sent_in_burst_(0),
transport_(transport),
- default_tick_clock_(new base::DefaultTickClock()),
- clock_(default_tick_clock_.get()) {
+ clock_(&default_tick_clock_),
+ weak_factory_(this) {
+ ScheduleNextSend();
}
PacedSender::~PacedSender() {}
@@ -62,31 +68,29 @@ bool PacedSender::SendRtcpPacket(const std::vector<uint8>& packet) {
return transport_->SendPacket(&(packet[0]), packet.size());
}
-base::TimeTicks PacedSender::TimeNextProcess() {
- return time_last_process_ +
+void PacedSender::ScheduleNextSend() {
+ base::TimeDelta time_to_next = time_last_process_ - clock_->NowTicks() +
base::TimeDelta::FromMilliseconds(kPacingIntervalMs);
-}
-void PacedSender::Process() {
- int packets_to_send = 0;
- base::TimeTicks now = clock_->NowTicks();
+ time_to_next = std::max(time_to_next,
+ base::TimeDelta::FromMilliseconds(0));
- base::TimeDelta min_pacing_interval =
- base::TimeDelta::FromMilliseconds(kPacingMinIntervalMs);
-
- // Have enough time have passed?
- if (now - time_last_process_ < min_pacing_interval) return;
+ cast_thread_->PostDelayedTask(CastThread::MAIN, FROM_HERE,
+ base::Bind(&PacedSender::SendNextPacketBurst, weak_factory_.GetWeakPtr()),
+ time_to_next);
+}
- time_last_process_ = now;
- packets_to_send = burst_size_;
- // Allow new packets to be inserted while we loop over our packets to send.
+void PacedSender::SendNextPacketBurst() {
+ int packets_to_send = burst_size_;
+ time_last_process_ = clock_->NowTicks();
for (int i = 0; i < packets_to_send; ++i) {
SendStoredPacket();
}
+ ScheduleNextSend();
}
void PacedSender::SendStoredPacket() {
- if (packet_list_.empty() && resend_packet_list_.empty()) return;
+ if (packet_list_.empty() && resend_packet_list_.empty()) return;
if (!resend_packet_list_.empty()) {
// Send our re-send packets first.
diff --git a/media/cast/pacing/paced_sender.h b/media/cast/pacing/paced_sender.h
index 45c8dc1..9dcd03e84 100644
--- a/media/cast/pacing/paced_sender.h
+++ b/media/cast/pacing/paced_sender.h
@@ -10,10 +10,13 @@
#include "base/basictypes.h"
#include "base/memory/scoped_ptr.h"
+#include "base/memory/weak_ptr.h"
+#include "base/threading/non_thread_safe.h"
#include "base/time/default_tick_clock.h"
#include "base/time/tick_clock.h"
#include "base/time/time.h"
#include "media/cast/cast_config.h"
+#include "media/cast/cast_thread.h"
namespace media {
namespace cast {
@@ -29,21 +32,16 @@ class PacedPacketSender {
virtual bool SendRtcpPacket(const std::vector<uint8>& packet) = 0;
- protected:
virtual ~PacedPacketSender() {}
};
-class PacedSender : public PacedPacketSender {
+class PacedSender : public PacedPacketSender,
+ public base::NonThreadSafe,
+ public base::SupportsWeakPtr<PacedSender> {
public:
- explicit PacedSender(PacketSender* transport);
+ PacedSender(scoped_refptr<CastThread> cast_thread, PacketSender* transport);
virtual ~PacedSender();
- // Returns the time when the pacer want a worker thread to call Process.
- base::TimeTicks TimeNextProcess();
-
- // Process any pending packets in the queue(s).
- void Process();
-
virtual bool SendPacket(const std::vector<uint8>& packet,
int num_of_packets) OVERRIDE;
@@ -56,12 +54,21 @@ class PacedSender : public PacedPacketSender {
clock_ = clock;
}
+ protected:
+ // Schedule a delayed task on the main cast thread when it's time to send the
+ // next packet burst.
+ void ScheduleNextSend();
+
+ // Process any pending packets in the queue(s).
+ void SendNextPacketBurst();
+
private:
void SendStoredPacket();
void UpdateBurstSize(int num_of_packets);
typedef std::list<std::vector<uint8> > PacketList;
+ scoped_refptr<CastThread> cast_thread_;
int burst_size_;
int packets_sent_in_burst_;
base::TimeTicks time_last_process_;
@@ -69,9 +76,11 @@ class PacedSender : public PacedPacketSender {
PacketList resend_packet_list_;
PacketSender* transport_;
- scoped_ptr<base::TickClock> default_tick_clock_;
+ base::DefaultTickClock default_tick_clock_;
base::TickClock* clock_;
+ base::WeakPtrFactory<PacedSender> weak_factory_;
+
DISALLOW_COPY_AND_ASSIGN(PacedSender);
};
diff --git a/media/cast/pacing/paced_sender_unittest.cc b/media/cast/pacing/paced_sender_unittest.cc
index 9108965..b731d60 100644
--- a/media/cast/pacing/paced_sender_unittest.cc
+++ b/media/cast/pacing/paced_sender_unittest.cc
@@ -2,16 +2,19 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+#include "base/message_loop/message_loop.h"
+#include "base/run_loop.h"
#include "base/test/simple_test_tick_clock.h"
#include "media/cast/pacing/mock_packet_sender.h"
#include "media/cast/pacing/paced_sender.h"
#include "testing/gmock/include/gmock/gmock.h"
-using testing::_;
-
namespace media {
namespace cast {
+using base::RunLoop;
+using testing::_;
+
static const uint8 kValue = 123;
static const size_t kSize1 = 100;
static const size_t kSize2 = 101;
@@ -22,18 +25,29 @@ static const int64 kStartMillisecond = 123456789;
class PacedSenderTest : public ::testing::Test {
protected:
- PacedSenderTest()
- : paced_sender_(&mock_transport_) {
+ PacedSenderTest() {
testing_clock_.Advance(
base::TimeDelta::FromMilliseconds(kStartMillisecond));
- paced_sender_.set_clock(&testing_clock_);
}
virtual ~PacedSenderTest() {}
+ virtual void SetUp() {
+ // TODO(pwestin): Write a generic message loop that runs with a mock clock.
+ cast_thread_ = new CastThread(MessageLoopProxy::current(),
+ MessageLoopProxy::current(),
+ MessageLoopProxy::current(),
+ MessageLoopProxy::current(),
+ MessageLoopProxy::current());
+ paced_sender_.reset(new PacedSender(cast_thread_, &mock_transport_));
+ paced_sender_->set_clock(&testing_clock_);
+ }
+
+ base::MessageLoop loop_;
MockPacketSender mock_transport_;
- PacedSender paced_sender_;
+ scoped_ptr<PacedSender> paced_sender_;
base::SimpleTestTickClock testing_clock_;
+ scoped_refptr<CastThread> cast_thread_;
};
TEST_F(PacedSenderTest, PassThroughRtcp) {
@@ -42,14 +56,14 @@ TEST_F(PacedSenderTest, PassThroughRtcp) {
std::vector<uint8> packet(kSize1, kValue);
int num_of_packets = 1;
- EXPECT_TRUE(paced_sender_.SendPacket(packet, num_of_packets));
+ EXPECT_TRUE(paced_sender_->SendPacket(packet, num_of_packets));
EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(0);
- EXPECT_TRUE(paced_sender_.ResendPacket(packet, num_of_packets));
+ EXPECT_TRUE(paced_sender_->ResendPacket(packet, num_of_packets));
EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(1).WillRepeatedly(
testing::Return(true));
- EXPECT_TRUE(paced_sender_.SendRtcpPacket(packet));
+ EXPECT_TRUE(paced_sender_->SendRtcpPacket(packet));
}
TEST_F(PacedSenderTest, BasicPace) {
@@ -59,30 +73,52 @@ TEST_F(PacedSenderTest, BasicPace) {
EXPECT_CALL(mock_transport_,
SendPacket(_, kSize1)).Times(3).WillRepeatedly(testing::Return(true));
for (int i = 0; i < num_of_packets; ++i) {
- EXPECT_TRUE(paced_sender_.SendPacket(packet, num_of_packets));
+ EXPECT_TRUE(paced_sender_->SendPacket(packet, num_of_packets));
}
- testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10));
+ base::TimeDelta timeout = base::TimeDelta::FromMilliseconds(10);
+ testing_clock_.Advance(timeout);
// Check that we get the next burst.
EXPECT_CALL(mock_transport_,
SendPacket(_, kSize1)).Times(3).WillRepeatedly(testing::Return(true));
- paced_sender_.Process();
+
+ // TODO(pwestin): haven't found a way to make the post delayed task to go
+ // faster than a real-time.
+ base::PlatformThread::Sleep(timeout);
+ {
+ RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
// If we call process too early make sure we don't send any packets.
- testing_clock_.Advance(base::TimeDelta::FromMilliseconds(5));
+ timeout = base::TimeDelta::FromMilliseconds(5);
+ testing_clock_.Advance(timeout);
EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(0);
- paced_sender_.Process();
+ base::PlatformThread::Sleep(timeout);
+ {
+ RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
// Check that we get the next burst.
- testing_clock_.Advance(base::TimeDelta::FromMilliseconds(5));
+ testing_clock_.Advance(timeout);
EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(3).WillRepeatedly(
testing::Return(true));
- paced_sender_.Process();
+ base::PlatformThread::Sleep(timeout);
+ {
+ RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
// Check that we don't get any more packets.
EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(0);
- testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10));
- paced_sender_.Process();
+ timeout = base::TimeDelta::FromMilliseconds(10);
+ testing_clock_.Advance(timeout);
+ base::PlatformThread::Sleep(timeout);
+ {
+ RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
}
TEST_F(PacedSenderTest, PaceWithNack) {
@@ -98,65 +134,95 @@ TEST_F(PacedSenderTest, PaceWithNack) {
EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(3).WillRepeatedly(
testing::Return(true));
for (int i = 0; i < num_of_packets_in_frame; ++i) {
- EXPECT_TRUE(paced_sender_.SendPacket(firts_packet,
+ EXPECT_TRUE(paced_sender_->SendPacket(firts_packet,
num_of_packets_in_frame));
}
// Add first NACK request.
for (int i = 0; i < num_of_packets_in_nack; ++i) {
- EXPECT_TRUE(paced_sender_.ResendPacket(nack_packet,
+ EXPECT_TRUE(paced_sender_->ResendPacket(nack_packet,
num_of_packets_in_nack));
}
// Check that we get the first NACK burst.
- testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10));
EXPECT_CALL(mock_transport_, SendPacket(_, kNackSize)).Times(5).
WillRepeatedly(testing::Return(true));
- paced_sender_.Process();
+
+ base::TimeDelta timeout = base::TimeDelta::FromMilliseconds(10);
+ testing_clock_.Advance(timeout);
+ base::PlatformThread::Sleep(timeout);
+ {
+ RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
// Add second NACK request.
for (int i = 0; i < num_of_packets_in_nack; ++i) {
- EXPECT_TRUE(paced_sender_.ResendPacket(nack_packet,
+ EXPECT_TRUE(paced_sender_->ResendPacket(nack_packet,
num_of_packets_in_nack));
}
// Check that we get the next NACK burst.
- testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10));
EXPECT_CALL(mock_transport_, SendPacket(_, kNackSize)).Times(7)
.WillRepeatedly(testing::Return(true));
- paced_sender_.Process();
+
+ testing_clock_.Advance(timeout);
+ base::PlatformThread::Sleep(timeout);
+ {
+ RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
// End of NACK plus a packet from the oldest frame.
- testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10));
EXPECT_CALL(mock_transport_, SendPacket(_, kNackSize)).Times(6)
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(1)
.WillRepeatedly(testing::Return(true));
- paced_sender_.Process();
+
+ testing_clock_.Advance(timeout);
+ base::PlatformThread::Sleep(timeout);
+ {
+ RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
// Add second frame.
// Make sure we don't delay the second frame due to the previous packets.
for (int i = 0; i < num_of_packets_in_frame; ++i) {
- EXPECT_TRUE(paced_sender_.SendPacket(second_packet,
+ EXPECT_TRUE(paced_sender_->SendPacket(second_packet,
num_of_packets_in_frame));
}
// Last packets of frame 1 and the first packets of frame 2.
- testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10));
EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(5).WillRepeatedly(
testing::Return(true));
EXPECT_CALL(mock_transport_, SendPacket(_, kSize2)).Times(2).WillRepeatedly(
testing::Return(true));
- paced_sender_.Process();
+
+ testing_clock_.Advance(timeout);
+ base::PlatformThread::Sleep(timeout);
+ {
+ RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
// Last packets of frame 2.
- testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10));
EXPECT_CALL(mock_transport_, SendPacket(_, kSize2)).Times(7).WillRepeatedly(
testing::Return(true));
- paced_sender_.Process();
+
+ testing_clock_.Advance(timeout);
+ base::PlatformThread::Sleep(timeout);
+ {
+ RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
// No more packets.
- testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10));
EXPECT_CALL(mock_transport_, SendPacket(_, kSize2)).Times(0);
- paced_sender_.Process();
+ testing_clock_.Advance(timeout);
+ base::PlatformThread::Sleep(timeout);
+ {
+ RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
}
TEST_F(PacedSenderTest, PaceWith60fps) {
@@ -172,19 +238,24 @@ TEST_F(PacedSenderTest, PaceWith60fps) {
EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(3).WillRepeatedly(
testing::Return(true));
for (int i = 0; i < num_of_packets_in_frame; ++i) {
- EXPECT_TRUE(paced_sender_.SendPacket(firts_packet,
+ EXPECT_TRUE(paced_sender_->SendPacket(firts_packet,
num_of_packets_in_frame));
}
- testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10));
+ base::TimeDelta timeout_10ms = base::TimeDelta::FromMilliseconds(10);
EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(3).
WillRepeatedly(testing::Return(true));
- paced_sender_.Process();
- testing_clock_.Advance(base::TimeDelta::FromMilliseconds(6));
+ testing_clock_.Advance(timeout_10ms);
+ base::PlatformThread::Sleep(timeout_10ms);
+ {
+ RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
+ testing_clock_.Advance(base::TimeDelta::FromMilliseconds(6));
// Add second frame, after 16 ms.
for (int i = 0; i < num_of_packets_in_frame; ++i) {
- EXPECT_TRUE(paced_sender_.SendPacket(second_packet,
+ EXPECT_TRUE(paced_sender_->SendPacket(second_packet,
num_of_packets_in_frame));
}
@@ -193,53 +264,83 @@ TEST_F(PacedSenderTest, PaceWith60fps) {
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(mock_transport_, SendPacket(_, kSize2)).Times(1)
.WillRepeatedly(testing::Return(true));
- paced_sender_.Process();
+
+ base::PlatformThread::Sleep(timeout_10ms);
+ {
+ RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10));
EXPECT_CALL(mock_transport_, SendPacket(_, kSize2)).Times(4)
.WillRepeatedly(testing::Return(true));
- paced_sender_.Process();
+
+ base::PlatformThread::Sleep(timeout_10ms);
+ {
+ RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
testing_clock_.Advance(base::TimeDelta::FromMilliseconds(3));
// Add third frame, after 33 ms.
for (int i = 0; i < num_of_packets_in_frame; ++i) {
- EXPECT_TRUE(paced_sender_.SendPacket(third_packet,
+ EXPECT_TRUE(paced_sender_->SendPacket(third_packet,
num_of_packets_in_frame));
}
-
testing_clock_.Advance(base::TimeDelta::FromMilliseconds(7));
EXPECT_CALL(mock_transport_, SendPacket(_, kSize2)).Times(4)
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(mock_transport_, SendPacket(_, kSize3)).Times(1)
.WillRepeatedly(testing::Return(true));
- paced_sender_.Process();
+
+ base::PlatformThread::Sleep(timeout_10ms);
+ {
+ RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
// Add fourth frame, after 50 ms.
- testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10));
for (int i = 0; i < num_of_packets_in_frame; ++i) {
- EXPECT_TRUE(paced_sender_.SendPacket(fourth_packet,
+ EXPECT_TRUE(paced_sender_->SendPacket(fourth_packet,
num_of_packets_in_frame));
}
EXPECT_CALL(mock_transport_, SendPacket(_, kSize3)).Times(6)
.WillRepeatedly(testing::Return(true));
- paced_sender_.Process();
+ testing_clock_.Advance(timeout_10ms);
+ base::PlatformThread::Sleep(timeout_10ms);
+ {
+ RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
- testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10));
EXPECT_CALL(mock_transport_, SendPacket(_, kSize3)).Times(2)
.WillRepeatedly(testing::Return(true));
EXPECT_CALL(mock_transport_, SendPacket(_, kSize4)).Times(4)
.WillRepeatedly(testing::Return(true));
- paced_sender_.Process();
+ testing_clock_.Advance(timeout_10ms);
+ base::PlatformThread::Sleep(timeout_10ms);
+ {
+ RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
- testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10));
EXPECT_CALL(mock_transport_, SendPacket(_, kSize4)).Times(5)
.WillRepeatedly(testing::Return(true));
- paced_sender_.Process();
+ testing_clock_.Advance(timeout_10ms);
+ base::PlatformThread::Sleep(timeout_10ms);
+ {
+ RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
- testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10));
EXPECT_CALL(mock_transport_, SendPacket(_, kSize4)).Times(0);
- paced_sender_.Process();
+ testing_clock_.Advance(timeout_10ms);
+ base::PlatformThread::Sleep(timeout_10ms);
+ {
+ RunLoop run_loop;
+ run_loop.RunUntilIdle();
+ }
}
} // namespace cast
diff --git a/media/cast/rtcp/rtcp.cc b/media/cast/rtcp/rtcp.cc
index 16956f7..c3e2c8e 100644
--- a/media/cast/rtcp/rtcp.cc
+++ b/media/cast/rtcp/rtcp.cc
@@ -12,6 +12,7 @@
#include "media/cast/rtcp/rtcp_receiver.h"
#include "media/cast/rtcp/rtcp_sender.h"
#include "media/cast/rtcp/rtcp_utility.h"
+#include "net/base/big_endian.h"
namespace media {
namespace cast {
@@ -117,17 +118,7 @@ Rtcp::Rtcp(RtcpSenderFeedback* sender_feedback,
Rtcp::~Rtcp() {}
-base::TimeTicks Rtcp::TimeToSendNextRtcpReport() {
- if (next_time_to_send_rtcp_.is_null()) {
- UpdateNextTimeToSendRtcp();
- }
- return next_time_to_send_rtcp_;
-}
-
-void Rtcp::SetRemoteSSRC(uint32 ssrc) {
- rtcp_receiver_->SetRemoteSSRC(ssrc);
-}
-
+// static
bool Rtcp::IsRtcpPacket(const uint8* packet, int length) {
DCHECK_GE(length, 8) << "Invalid RTCP packet";
if (length < 8) return false;
@@ -139,6 +130,26 @@ bool Rtcp::IsRtcpPacket(const uint8* packet, int length) {
return false;
}
+// static
+uint32 Rtcp::GetSsrcOfSender(const uint8* rtcp_buffer, int length) {
+ uint32 ssrc_of_sender;
+ net::BigEndianReader big_endian_reader(rtcp_buffer, length);
+ big_endian_reader.Skip(4); // Skip header
+ big_endian_reader.ReadU32(&ssrc_of_sender);
+ return ssrc_of_sender;
+}
+
+base::TimeTicks Rtcp::TimeToSendNextRtcpReport() {
+ if (next_time_to_send_rtcp_.is_null()) {
+ UpdateNextTimeToSendRtcp();
+ }
+ return next_time_to_send_rtcp_;
+}
+
+void Rtcp::SetRemoteSSRC(uint32 ssrc) {
+ rtcp_receiver_->SetRemoteSSRC(ssrc);
+}
+
void Rtcp::IncomingRtcpPacket(const uint8* rtcp_buffer, int length) {
RtcpParser rtcp_parser(rtcp_buffer, length);
if (!rtcp_parser.IsValid()) {
diff --git a/media/cast/rtcp/rtcp.h b/media/cast/rtcp/rtcp.h
index 5b06b82..31962a52 100644
--- a/media/cast/rtcp/rtcp.h
+++ b/media/cast/rtcp/rtcp.h
@@ -80,6 +80,8 @@ class Rtcp {
static bool IsRtcpPacket(const uint8* rtcp_buffer, int length);
+ static uint32 GetSsrcOfSender(const uint8* rtcp_buffer, int length);
+
base::TimeTicks TimeToSendNextRtcpReport();
void SendRtcpReport(uint32 media_ssrc);
void SendRtcpPli(uint32 media_ssrc);
diff --git a/media/cast/rtp_sender/rtp_packetizer/rtp_packetizer.h b/media/cast/rtp_sender/rtp_packetizer/rtp_packetizer.h
index 3ada98b..63035d0 100644
--- a/media/cast/rtp_sender/rtp_packetizer/rtp_packetizer.h
+++ b/media/cast/rtp_sender/rtp_packetizer/rtp_packetizer.h
@@ -20,6 +20,8 @@ namespace cast {
class PacedPacketSender;
// This object is only called from the main cast thread.
+// This class break encoded audio and video frames into packets and add an RTP
+// header to each packet.
class RtpPacketizer {
public:
RtpPacketizer(PacedPacketSender* transport,
diff --git a/media/cast/video_sender/video_sender.cc b/media/cast/video_sender/video_sender.cc
index 83f3397..1b42238 100644
--- a/media/cast/video_sender/video_sender.cc
+++ b/media/cast/video_sender/video_sender.cc
@@ -179,8 +179,10 @@ void VideoSender::OnReceivedIntraFrameRequest() {
last_sent_frame_id_ = -1;
}
-void VideoSender::IncomingRtcpPacket(const uint8* packet, int length) {
+void VideoSender::IncomingRtcpPacket(const uint8* packet, int length,
+ const base::Closure callback) {
rtcp_->IncomingRtcpPacket(packet, length);
+ cast_thread_->PostTask(CastThread::MAIN, FROM_HERE, callback);
}
void VideoSender::ScheduleNextRtcpReport() {
diff --git a/media/cast/video_sender/video_sender.h b/media/cast/video_sender/video_sender.h
index ff45326..9098e97 100644
--- a/media/cast/video_sender/video_sender.h
+++ b/media/cast/video_sender/video_sender.h
@@ -62,7 +62,8 @@ class VideoSender : public base::NonThreadSafe,
const base::Closure callback);
// Only called from the main cast thread.
- void IncomingRtcpPacket(const uint8* packet, int length);
+ void IncomingRtcpPacket(const uint8* packet, int length,
+ const base::Closure callback);
void set_clock(base::TickClock* clock) {
clock_ = clock;