diff options
author | pwestin@google.com <pwestin@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-09-12 00:05:01 +0000 |
---|---|---|
committer | pwestin@google.com <pwestin@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-09-12 00:05:01 +0000 |
commit | 428c39804a10f55c0ae81392b25806bc0117d053 (patch) | |
tree | 8a913d600a181cf7d03d01120c3dab83437985ec /media | |
parent | e05f7f3d6db7108e73a92fb004ded36fa3e7624d (diff) | |
download | chromium_src-428c39804a10f55c0ae81392b25806bc0117d053.zip chromium_src-428c39804a10f55c0ae81392b25806bc0117d053.tar.gz chromium_src-428c39804a10f55c0ae81392b25806bc0117d053.tar.bz2 |
Added cast sender impl and made required modifications to the pacer.
BUG=
Review URL: https://chromiumcodereview.appspot.com/24051002
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@222666 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'media')
-rw-r--r-- | media/cast/audio_sender/audio_sender.cc | 4 | ||||
-rw-r--r-- | media/cast/audio_sender/audio_sender.h | 3 | ||||
-rw-r--r-- | media/cast/cast_config.h | 9 | ||||
-rw-r--r-- | media/cast/cast_receiver.h | 17 | ||||
-rw-r--r-- | media/cast/cast_sender.gyp | 9 | ||||
-rw-r--r-- | media/cast/cast_sender.h | 5 | ||||
-rw-r--r-- | media/cast/cast_sender_impl.cc | 176 | ||||
-rw-r--r-- | media/cast/cast_sender_impl.h | 55 | ||||
-rw-r--r-- | media/cast/pacing/paced_sender.cc | 44 | ||||
-rw-r--r-- | media/cast/pacing/paced_sender.h | 29 | ||||
-rw-r--r-- | media/cast/pacing/paced_sender_unittest.cc | 207 | ||||
-rw-r--r-- | media/cast/rtcp/rtcp.cc | 33 | ||||
-rw-r--r-- | media/cast/rtcp/rtcp.h | 2 | ||||
-rw-r--r-- | media/cast/rtp_sender/rtp_packetizer/rtp_packetizer.h | 2 | ||||
-rw-r--r-- | media/cast/video_sender/video_sender.cc | 4 | ||||
-rw-r--r-- | media/cast/video_sender/video_sender.h | 3 |
16 files changed, 490 insertions, 112 deletions
diff --git a/media/cast/audio_sender/audio_sender.cc b/media/cast/audio_sender/audio_sender.cc index 70149dc..39fccda 100644 --- a/media/cast/audio_sender/audio_sender.cc +++ b/media/cast/audio_sender/audio_sender.cc @@ -141,8 +141,10 @@ void AudioSender::ResendPackets( rtp_sender_.ResendPackets(missing_frames_and_packets); } -void AudioSender::IncomingRtcpPacket(const uint8* packet, int length) { +void AudioSender::IncomingRtcpPacket(const uint8* packet, int length, + const base::Closure callback) { rtcp_.IncomingRtcpPacket(packet, length); + cast_thread_->PostTask(CastThread::MAIN, FROM_HERE, callback); } void AudioSender::ScheduleNextRtcpReport() { diff --git a/media/cast/audio_sender/audio_sender.h b/media/cast/audio_sender/audio_sender.h index 30d48b4..3d389b3 100644 --- a/media/cast/audio_sender/audio_sender.h +++ b/media/cast/audio_sender/audio_sender.h @@ -54,7 +54,8 @@ class AudioSender : public base::NonThreadSafe, const base::Closure callback); // Only called from the main cast thread. - void IncomingRtcpPacket(const uint8* packet, int length); + void IncomingRtcpPacket(const uint8* packet, int length, + const base::Closure callback); // Only used for testing. void set_clock(base::TickClock* clock) { diff --git a/media/cast/cast_config.h b/media/cast/cast_config.h index e1280cd..988924a 100644 --- a/media/cast/cast_config.h +++ b/media/cast/cast_config.h @@ -9,6 +9,8 @@ #include <vector> #include "base/basictypes.h" +#include "base/callback.h" +#include "base/memory/ref_counted.h" #include "media/cast/cast_defines.h" namespace media { @@ -175,17 +177,16 @@ class PacketSender { // All packets to be sent to the network will be delivered via this function. virtual bool SendPacket(const uint8* packet, int length) = 0; - protected: virtual ~PacketSender() {} }; -class PacketReceiver { +class PacketReceiver : public base::RefCountedThreadSafe<PacketReceiver> { public: // All packets received from the network should be delivered via this // function. - virtual void ReceivedPacket(const uint8* packet, int length) = 0; + virtual void ReceivedPacket(const uint8* packet, int length, + const base::Closure callback) = 0; - protected: virtual ~PacketReceiver() {} }; diff --git a/media/cast/cast_receiver.h b/media/cast/cast_receiver.h index 3dafbe5..fa09721 100644 --- a/media/cast/cast_receiver.h +++ b/media/cast/cast_receiver.h @@ -1,6 +1,9 @@ // Copyright 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +// +// This is the main interface for the cast receiver. All configuration are done +// at creation. #ifndef MEDIA_CAST_CAST_RECEIVER_H_ #define MEDIA_CAST_CAST_RECEIVER_H_ @@ -12,8 +15,10 @@ namespace media { namespace cast { -class FrameReceiver { +// This Class is thread safe. +class FrameReceiver : public base::RefCountedThreadSafe<FrameReceiver>{ public: + // TODO(pwestin): These functions must be updated. virtual bool GetRawVideoFrame(I420VideoFrame* video_frame, base::TimeTicks* render_time) = 0; @@ -32,24 +37,24 @@ class FrameReceiver { virtual void ReleaseCodedAudioFrame(uint8 frame_id) = 0; -protected: virtual ~FrameReceiver() {} }; +// This Class is thread safe. class CastReceiver { public: static CastReceiver* CreateCastReceiver( + scoped_refptr<CastThread> cast_thread, const AudioReceiverConfig& audio_config, const VideoReceiverConfig& video_config, PacketSender* const packet_sender); // All received RTP and RTCP packets for the call should be inserted to this - // PacketReceiver. The PacketReceiver pointer is valid as long as the - // CastReceiver instance exists. - virtual PacketReceiver* packet_receiver() = 0; + // PacketReceiver. + virtual scoped_refptr<PacketReceiver> packet_receiver() = 0; // Polling interface to get audio and video frames from the CastReceiver. - virtual FrameReceiver* frame_receiver() = 0; + virtual scoped_refptr<FrameReceiver> frame_receiver() = 0; virtual ~CastReceiver() {}; }; diff --git a/media/cast/cast_sender.gyp b/media/cast/cast_sender.gyp index 591a3b7..fe99f80 100644 --- a/media/cast/cast_sender.gyp +++ b/media/cast/cast_sender.gyp @@ -12,10 +12,15 @@ { 'target_name': 'cast_sender_impl', 'type': 'static_library', + 'include_dirs': [ + '<(DEPTH)/', + '<(DEPTH)/third_party/', + '<(DEPTH)/third_party/webrtc/', + ], 'sources': [ 'cast_sender.h', -# 'cast_sender_impl.cc', -# 'cast_sender_impl.h', + 'cast_sender_impl.cc', + 'cast_sender_impl.h', ], # source 'dependencies': [ 'audio_sender', diff --git a/media/cast/cast_sender.h b/media/cast/cast_sender.h index 254370b..f4d3653 100644 --- a/media/cast/cast_sender.h +++ b/media/cast/cast_sender.h @@ -22,7 +22,7 @@ namespace media { namespace cast { // This Class is thread safe. -class FrameInput { +class FrameInput : public base::RefCountedThreadSafe<PacketReceiver> { public: // The video_frame must be valid until the callback is called. // The callback is called from the main cast thread as soon as @@ -56,11 +56,12 @@ class FrameInput { const base::TimeTicks& recorded_time, const base::Closure callback) = 0; - protected: virtual ~FrameInput() {} }; // This Class is thread safe. +// The provided PacketSender object will always be called form the main cast +// thread. class CastSender { public: static CastSender* CreateCastSender( diff --git a/media/cast/cast_sender_impl.cc b/media/cast/cast_sender_impl.cc new file mode 100644 index 0000000..76f2f99 --- /dev/null +++ b/media/cast/cast_sender_impl.cc @@ -0,0 +1,176 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +#include "media/cast/cast_sender_impl.h" + +#include "base/bind.h" +#include "base/callback.h" +#include "base/logging.h" +#include "base/message_loop/message_loop.h" + +namespace media { +namespace cast { + +// The LocalFrameInput class posts all incoming frames; audio and video to the +// main cast thread for processing. +// This make the cast sender interface thread safe. +class LocalFrameInput : public FrameInput { + public: + LocalFrameInput(scoped_refptr<CastThread> cast_thread, + base::WeakPtr<AudioSender> audio_sender, + base::WeakPtr<VideoSender> video_sender) + : cast_thread_(cast_thread), + audio_sender_(audio_sender), + video_sender_(video_sender) {} + + virtual void InsertRawVideoFrame(const I420VideoFrame* video_frame, + const base::TimeTicks& capture_time, + const base::Closure callback) OVERRIDE { + cast_thread_->PostTask(CastThread::MAIN, FROM_HERE, + base::Bind(&VideoSender::InsertRawVideoFrame, video_sender_, + video_frame, capture_time, callback)); + } + + virtual void InsertCodedVideoFrame(const EncodedVideoFrame* video_frame, + const base::TimeTicks& capture_time, + const base::Closure callback) OVERRIDE { + cast_thread_->PostTask(CastThread::MAIN, FROM_HERE, + base::Bind(&VideoSender::InsertCodedVideoFrame, video_sender_, + video_frame, capture_time, callback)); + } + + virtual void InsertRawAudioFrame(const PcmAudioFrame* audio_frame, + const base::TimeTicks& recorded_time, + const base::Closure callback) OVERRIDE { + cast_thread_->PostTask(CastThread::MAIN, FROM_HERE, + base::Bind(&AudioSender::InsertRawAudioFrame, audio_sender_, + audio_frame, recorded_time, callback)); + } + + virtual void InsertCodedAudioFrame(const EncodedAudioFrame* audio_frame, + const base::TimeTicks& recorded_time, + const base::Closure callback) OVERRIDE { + cast_thread_->PostTask(CastThread::MAIN, FROM_HERE, + base::Bind(&AudioSender::InsertCodedAudioFrame, audio_sender_, + audio_frame, recorded_time, callback)); + } + + private: + scoped_refptr<CastThread> cast_thread_; + base::WeakPtr<AudioSender> audio_sender_; + base::WeakPtr<VideoSender> video_sender_; +}; + +// LocalCastSenderPacketReceiver handle the incoming packets to the cast sender +// it's only expected to receive RTCP feedback packets from the remote cast +// receiver. The class verifies that that it is a RTCP packet and based on the +// SSRC of the incoming packet route the packet to the correct sender; audio or +// video. +// +// Definition of SSRC as defined in RFC 3550. +// Synchronization source (SSRC): The source of a stream of RTP +// packets, identified by a 32-bit numeric SSRC identifier carried in +// the RTP header so as not to be dependent upon the network address. +// All packets from a synchronization source form part of the same +// timing and sequence number space, so a receiver groups packets by +// synchronization source for playback. Examples of synchronization +// sources include the sender of a stream of packets derived from a +// signal source such as a microphone or a camera, or an RTP mixer +// (see below). A synchronization source may change its data format, +// e.g., audio encoding, over time. The SSRC identifier is a +// randomly chosen value meant to be globally unique within a +// particular RTP session (see Section 8). A participant need not +// use the same SSRC identifier for all the RTP sessions in a +// multimedia session; the binding of the SSRC identifiers is +// provided through RTCP (see Section 6.5.1). If a participant +// generates multiple streams in one RTP session, for example from +// separate video cameras, each MUST be identified as a different +// SSRC. + +class LocalCastSenderPacketReceiver : public PacketReceiver { + public: + LocalCastSenderPacketReceiver(scoped_refptr<CastThread> cast_thread, + base::WeakPtr<AudioSender> audio_sender, + base::WeakPtr<VideoSender> video_sender, + uint32 ssrc_of_audio_sender, + uint32 ssrc_of_video_sender) + : cast_thread_(cast_thread), + audio_sender_(audio_sender), + video_sender_(video_sender), + ssrc_of_audio_sender_(ssrc_of_audio_sender), + ssrc_of_video_sender_(ssrc_of_video_sender) {} + + virtual ~LocalCastSenderPacketReceiver() {} + + virtual void ReceivedPacket(const uint8* packet, + int length, + const base::Closure callback) OVERRIDE { + if (!Rtcp::IsRtcpPacket(packet, length)) { + // We should have no incoming RTP packets. + // No action; just log and call the callback informing that we are done + // with the packet. + VLOG(1) << "Unexpectedly received a RTP packet in the cast sender"; + cast_thread_->PostTask(CastThread::MAIN, FROM_HERE, callback); + return; + } + uint32 ssrc_of_sender = Rtcp::GetSsrcOfSender(packet, length); + if (ssrc_of_sender == ssrc_of_audio_sender_) { + cast_thread_->PostTask(CastThread::MAIN, FROM_HERE, + base::Bind(&AudioSender::IncomingRtcpPacket, audio_sender_, + packet, length, callback)); + } else if (ssrc_of_sender == ssrc_of_video_sender_) { + cast_thread_->PostTask(CastThread::MAIN, FROM_HERE, + base::Bind(&VideoSender::IncomingRtcpPacket, video_sender_, + packet, length, callback)); + } else { + // No action; just log and call the callback informing that we are done + // with the packet. + VLOG(1) << "Received a RTCP packet with a non matching sender SSRC " + << ssrc_of_sender; + + cast_thread_->PostTask(CastThread::MAIN, FROM_HERE, callback); + } + } + + private: + scoped_refptr<CastThread> cast_thread_; + base::WeakPtr<AudioSender> audio_sender_; + base::WeakPtr<VideoSender> video_sender_; + uint32 ssrc_of_audio_sender_; + uint32 ssrc_of_video_sender_; +}; + +CastSender* CastSender::CreateCastSender( + scoped_refptr<CastThread> cast_thread, + const AudioSenderConfig& audio_config, + const VideoSenderConfig& video_config, + VideoEncoderController* const video_encoder_controller, + PacketSender* const packet_sender) { + return new CastSenderImpl(cast_thread, + audio_config, + video_config, + video_encoder_controller, + packet_sender); +} + +CastSenderImpl::CastSenderImpl( + scoped_refptr<CastThread> cast_thread, + const AudioSenderConfig& audio_config, + const VideoSenderConfig& video_config, + VideoEncoderController* const video_encoder_controller, + PacketSender* const packet_sender) + : pacer_(cast_thread, packet_sender), + audio_sender_(cast_thread, audio_config, &pacer_), + video_sender_(cast_thread, video_config, video_encoder_controller, + &pacer_), + frame_input_(new LocalFrameInput(cast_thread, audio_sender_.AsWeakPtr(), + video_sender_.AsWeakPtr())), + packet_receiver_(new LocalCastSenderPacketReceiver(cast_thread, + audio_sender_.AsWeakPtr(), video_sender_.AsWeakPtr(), + audio_config.incoming_feedback_ssrc, + video_config.incoming_feedback_ssrc)) {} + +CastSenderImpl::~CastSenderImpl() {} + +} // namespace cast +} // namespace media diff --git a/media/cast/cast_sender_impl.h b/media/cast/cast_sender_impl.h new file mode 100644 index 0000000..eb19caa --- /dev/null +++ b/media/cast/cast_sender_impl.h @@ -0,0 +1,55 @@ +// Copyright 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +#ifndef MEDIA_CAST_CAST_SENDER_IMPL_H_ +#define MEDIA_CAST_CAST_SENDER_IMPL_H_ + +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "media/cast/audio_sender/audio_sender.h" +#include "media/cast/cast_config.h" +#include "media/cast/cast_sender.h" +#include "media/cast/cast_thread.h" +#include "media/cast/pacing/paced_sender.h" +#include "media/cast/video_sender/video_sender.h" + +namespace media { +namespace cast { + +class AudioSender; +class PacedSender; +class VideoSender; + +// This calls is a pure owner class that group all required sending objects +// together such as pacer, packet receiver, frame input, audio and video sender. +class CastSenderImpl : public CastSender { + public: + CastSenderImpl(scoped_refptr<CastThread> cast_thread, + const AudioSenderConfig& audio_config, + const VideoSenderConfig& video_config, + VideoEncoderController* const video_encoder_controller, + PacketSender* const packet_sender); + + virtual ~CastSenderImpl(); + + virtual scoped_refptr<FrameInput> frame_input() OVERRIDE { + return frame_input_; + } + + virtual scoped_refptr<PacketReceiver> packet_receiver() OVERRIDE { + return packet_receiver_; + } + + private: + PacedSender pacer_; + AudioSender audio_sender_; + VideoSender video_sender_; + scoped_refptr<FrameInput> frame_input_; + scoped_refptr<PacketReceiver> packet_receiver_; +}; + +} // namespace cast +} // namespace media + +#endif // MEDIA_CAST_CAST_SENDER_IMPL_H_ + diff --git a/media/cast/pacing/paced_sender.cc b/media/cast/pacing/paced_sender.cc index f89361b..d2935f3 100644 --- a/media/cast/pacing/paced_sender.cc +++ b/media/cast/pacing/paced_sender.cc @@ -4,19 +4,25 @@ #include "media/cast/pacing/paced_sender.h" +#include "base/bind.h" +#include "base/logging.h" +#include "base/message_loop/message_loop.h" + namespace media { namespace cast { static const int64 kPacingIntervalMs = 10; -static const int64 kPacingMinIntervalMs = 7; static const int kPacingMaxBurstsPerFrame = 3; -PacedSender::PacedSender(PacketSender* transport) - : burst_size_(1), +PacedSender::PacedSender(scoped_refptr<CastThread> cast_thread, + PacketSender* transport) + : cast_thread_(cast_thread), + burst_size_(1), packets_sent_in_burst_(0), transport_(transport), - default_tick_clock_(new base::DefaultTickClock()), - clock_(default_tick_clock_.get()) { + clock_(&default_tick_clock_), + weak_factory_(this) { + ScheduleNextSend(); } PacedSender::~PacedSender() {} @@ -62,31 +68,29 @@ bool PacedSender::SendRtcpPacket(const std::vector<uint8>& packet) { return transport_->SendPacket(&(packet[0]), packet.size()); } -base::TimeTicks PacedSender::TimeNextProcess() { - return time_last_process_ + +void PacedSender::ScheduleNextSend() { + base::TimeDelta time_to_next = time_last_process_ - clock_->NowTicks() + base::TimeDelta::FromMilliseconds(kPacingIntervalMs); -} -void PacedSender::Process() { - int packets_to_send = 0; - base::TimeTicks now = clock_->NowTicks(); + time_to_next = std::max(time_to_next, + base::TimeDelta::FromMilliseconds(0)); - base::TimeDelta min_pacing_interval = - base::TimeDelta::FromMilliseconds(kPacingMinIntervalMs); - - // Have enough time have passed? - if (now - time_last_process_ < min_pacing_interval) return; + cast_thread_->PostDelayedTask(CastThread::MAIN, FROM_HERE, + base::Bind(&PacedSender::SendNextPacketBurst, weak_factory_.GetWeakPtr()), + time_to_next); +} - time_last_process_ = now; - packets_to_send = burst_size_; - // Allow new packets to be inserted while we loop over our packets to send. +void PacedSender::SendNextPacketBurst() { + int packets_to_send = burst_size_; + time_last_process_ = clock_->NowTicks(); for (int i = 0; i < packets_to_send; ++i) { SendStoredPacket(); } + ScheduleNextSend(); } void PacedSender::SendStoredPacket() { - if (packet_list_.empty() && resend_packet_list_.empty()) return; + if (packet_list_.empty() && resend_packet_list_.empty()) return; if (!resend_packet_list_.empty()) { // Send our re-send packets first. diff --git a/media/cast/pacing/paced_sender.h b/media/cast/pacing/paced_sender.h index 45c8dc1..9dcd03e84 100644 --- a/media/cast/pacing/paced_sender.h +++ b/media/cast/pacing/paced_sender.h @@ -10,10 +10,13 @@ #include "base/basictypes.h" #include "base/memory/scoped_ptr.h" +#include "base/memory/weak_ptr.h" +#include "base/threading/non_thread_safe.h" #include "base/time/default_tick_clock.h" #include "base/time/tick_clock.h" #include "base/time/time.h" #include "media/cast/cast_config.h" +#include "media/cast/cast_thread.h" namespace media { namespace cast { @@ -29,21 +32,16 @@ class PacedPacketSender { virtual bool SendRtcpPacket(const std::vector<uint8>& packet) = 0; - protected: virtual ~PacedPacketSender() {} }; -class PacedSender : public PacedPacketSender { +class PacedSender : public PacedPacketSender, + public base::NonThreadSafe, + public base::SupportsWeakPtr<PacedSender> { public: - explicit PacedSender(PacketSender* transport); + PacedSender(scoped_refptr<CastThread> cast_thread, PacketSender* transport); virtual ~PacedSender(); - // Returns the time when the pacer want a worker thread to call Process. - base::TimeTicks TimeNextProcess(); - - // Process any pending packets in the queue(s). - void Process(); - virtual bool SendPacket(const std::vector<uint8>& packet, int num_of_packets) OVERRIDE; @@ -56,12 +54,21 @@ class PacedSender : public PacedPacketSender { clock_ = clock; } + protected: + // Schedule a delayed task on the main cast thread when it's time to send the + // next packet burst. + void ScheduleNextSend(); + + // Process any pending packets in the queue(s). + void SendNextPacketBurst(); + private: void SendStoredPacket(); void UpdateBurstSize(int num_of_packets); typedef std::list<std::vector<uint8> > PacketList; + scoped_refptr<CastThread> cast_thread_; int burst_size_; int packets_sent_in_burst_; base::TimeTicks time_last_process_; @@ -69,9 +76,11 @@ class PacedSender : public PacedPacketSender { PacketList resend_packet_list_; PacketSender* transport_; - scoped_ptr<base::TickClock> default_tick_clock_; + base::DefaultTickClock default_tick_clock_; base::TickClock* clock_; + base::WeakPtrFactory<PacedSender> weak_factory_; + DISALLOW_COPY_AND_ASSIGN(PacedSender); }; diff --git a/media/cast/pacing/paced_sender_unittest.cc b/media/cast/pacing/paced_sender_unittest.cc index 9108965..b731d60 100644 --- a/media/cast/pacing/paced_sender_unittest.cc +++ b/media/cast/pacing/paced_sender_unittest.cc @@ -2,16 +2,19 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include "base/message_loop/message_loop.h" +#include "base/run_loop.h" #include "base/test/simple_test_tick_clock.h" #include "media/cast/pacing/mock_packet_sender.h" #include "media/cast/pacing/paced_sender.h" #include "testing/gmock/include/gmock/gmock.h" -using testing::_; - namespace media { namespace cast { +using base::RunLoop; +using testing::_; + static const uint8 kValue = 123; static const size_t kSize1 = 100; static const size_t kSize2 = 101; @@ -22,18 +25,29 @@ static const int64 kStartMillisecond = 123456789; class PacedSenderTest : public ::testing::Test { protected: - PacedSenderTest() - : paced_sender_(&mock_transport_) { + PacedSenderTest() { testing_clock_.Advance( base::TimeDelta::FromMilliseconds(kStartMillisecond)); - paced_sender_.set_clock(&testing_clock_); } virtual ~PacedSenderTest() {} + virtual void SetUp() { + // TODO(pwestin): Write a generic message loop that runs with a mock clock. + cast_thread_ = new CastThread(MessageLoopProxy::current(), + MessageLoopProxy::current(), + MessageLoopProxy::current(), + MessageLoopProxy::current(), + MessageLoopProxy::current()); + paced_sender_.reset(new PacedSender(cast_thread_, &mock_transport_)); + paced_sender_->set_clock(&testing_clock_); + } + + base::MessageLoop loop_; MockPacketSender mock_transport_; - PacedSender paced_sender_; + scoped_ptr<PacedSender> paced_sender_; base::SimpleTestTickClock testing_clock_; + scoped_refptr<CastThread> cast_thread_; }; TEST_F(PacedSenderTest, PassThroughRtcp) { @@ -42,14 +56,14 @@ TEST_F(PacedSenderTest, PassThroughRtcp) { std::vector<uint8> packet(kSize1, kValue); int num_of_packets = 1; - EXPECT_TRUE(paced_sender_.SendPacket(packet, num_of_packets)); + EXPECT_TRUE(paced_sender_->SendPacket(packet, num_of_packets)); EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(0); - EXPECT_TRUE(paced_sender_.ResendPacket(packet, num_of_packets)); + EXPECT_TRUE(paced_sender_->ResendPacket(packet, num_of_packets)); EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(1).WillRepeatedly( testing::Return(true)); - EXPECT_TRUE(paced_sender_.SendRtcpPacket(packet)); + EXPECT_TRUE(paced_sender_->SendRtcpPacket(packet)); } TEST_F(PacedSenderTest, BasicPace) { @@ -59,30 +73,52 @@ TEST_F(PacedSenderTest, BasicPace) { EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(3).WillRepeatedly(testing::Return(true)); for (int i = 0; i < num_of_packets; ++i) { - EXPECT_TRUE(paced_sender_.SendPacket(packet, num_of_packets)); + EXPECT_TRUE(paced_sender_->SendPacket(packet, num_of_packets)); } - testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10)); + base::TimeDelta timeout = base::TimeDelta::FromMilliseconds(10); + testing_clock_.Advance(timeout); // Check that we get the next burst. EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(3).WillRepeatedly(testing::Return(true)); - paced_sender_.Process(); + + // TODO(pwestin): haven't found a way to make the post delayed task to go + // faster than a real-time. + base::PlatformThread::Sleep(timeout); + { + RunLoop run_loop; + run_loop.RunUntilIdle(); + } // If we call process too early make sure we don't send any packets. - testing_clock_.Advance(base::TimeDelta::FromMilliseconds(5)); + timeout = base::TimeDelta::FromMilliseconds(5); + testing_clock_.Advance(timeout); EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(0); - paced_sender_.Process(); + base::PlatformThread::Sleep(timeout); + { + RunLoop run_loop; + run_loop.RunUntilIdle(); + } // Check that we get the next burst. - testing_clock_.Advance(base::TimeDelta::FromMilliseconds(5)); + testing_clock_.Advance(timeout); EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(3).WillRepeatedly( testing::Return(true)); - paced_sender_.Process(); + base::PlatformThread::Sleep(timeout); + { + RunLoop run_loop; + run_loop.RunUntilIdle(); + } // Check that we don't get any more packets. EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(0); - testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10)); - paced_sender_.Process(); + timeout = base::TimeDelta::FromMilliseconds(10); + testing_clock_.Advance(timeout); + base::PlatformThread::Sleep(timeout); + { + RunLoop run_loop; + run_loop.RunUntilIdle(); + } } TEST_F(PacedSenderTest, PaceWithNack) { @@ -98,65 +134,95 @@ TEST_F(PacedSenderTest, PaceWithNack) { EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(3).WillRepeatedly( testing::Return(true)); for (int i = 0; i < num_of_packets_in_frame; ++i) { - EXPECT_TRUE(paced_sender_.SendPacket(firts_packet, + EXPECT_TRUE(paced_sender_->SendPacket(firts_packet, num_of_packets_in_frame)); } // Add first NACK request. for (int i = 0; i < num_of_packets_in_nack; ++i) { - EXPECT_TRUE(paced_sender_.ResendPacket(nack_packet, + EXPECT_TRUE(paced_sender_->ResendPacket(nack_packet, num_of_packets_in_nack)); } // Check that we get the first NACK burst. - testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10)); EXPECT_CALL(mock_transport_, SendPacket(_, kNackSize)).Times(5). WillRepeatedly(testing::Return(true)); - paced_sender_.Process(); + + base::TimeDelta timeout = base::TimeDelta::FromMilliseconds(10); + testing_clock_.Advance(timeout); + base::PlatformThread::Sleep(timeout); + { + RunLoop run_loop; + run_loop.RunUntilIdle(); + } // Add second NACK request. for (int i = 0; i < num_of_packets_in_nack; ++i) { - EXPECT_TRUE(paced_sender_.ResendPacket(nack_packet, + EXPECT_TRUE(paced_sender_->ResendPacket(nack_packet, num_of_packets_in_nack)); } // Check that we get the next NACK burst. - testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10)); EXPECT_CALL(mock_transport_, SendPacket(_, kNackSize)).Times(7) .WillRepeatedly(testing::Return(true)); - paced_sender_.Process(); + + testing_clock_.Advance(timeout); + base::PlatformThread::Sleep(timeout); + { + RunLoop run_loop; + run_loop.RunUntilIdle(); + } // End of NACK plus a packet from the oldest frame. - testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10)); EXPECT_CALL(mock_transport_, SendPacket(_, kNackSize)).Times(6) .WillRepeatedly(testing::Return(true)); EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(1) .WillRepeatedly(testing::Return(true)); - paced_sender_.Process(); + + testing_clock_.Advance(timeout); + base::PlatformThread::Sleep(timeout); + { + RunLoop run_loop; + run_loop.RunUntilIdle(); + } // Add second frame. // Make sure we don't delay the second frame due to the previous packets. for (int i = 0; i < num_of_packets_in_frame; ++i) { - EXPECT_TRUE(paced_sender_.SendPacket(second_packet, + EXPECT_TRUE(paced_sender_->SendPacket(second_packet, num_of_packets_in_frame)); } // Last packets of frame 1 and the first packets of frame 2. - testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10)); EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(5).WillRepeatedly( testing::Return(true)); EXPECT_CALL(mock_transport_, SendPacket(_, kSize2)).Times(2).WillRepeatedly( testing::Return(true)); - paced_sender_.Process(); + + testing_clock_.Advance(timeout); + base::PlatformThread::Sleep(timeout); + { + RunLoop run_loop; + run_loop.RunUntilIdle(); + } // Last packets of frame 2. - testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10)); EXPECT_CALL(mock_transport_, SendPacket(_, kSize2)).Times(7).WillRepeatedly( testing::Return(true)); - paced_sender_.Process(); + + testing_clock_.Advance(timeout); + base::PlatformThread::Sleep(timeout); + { + RunLoop run_loop; + run_loop.RunUntilIdle(); + } // No more packets. - testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10)); EXPECT_CALL(mock_transport_, SendPacket(_, kSize2)).Times(0); - paced_sender_.Process(); + testing_clock_.Advance(timeout); + base::PlatformThread::Sleep(timeout); + { + RunLoop run_loop; + run_loop.RunUntilIdle(); + } } TEST_F(PacedSenderTest, PaceWith60fps) { @@ -172,19 +238,24 @@ TEST_F(PacedSenderTest, PaceWith60fps) { EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(3).WillRepeatedly( testing::Return(true)); for (int i = 0; i < num_of_packets_in_frame; ++i) { - EXPECT_TRUE(paced_sender_.SendPacket(firts_packet, + EXPECT_TRUE(paced_sender_->SendPacket(firts_packet, num_of_packets_in_frame)); } - testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10)); + base::TimeDelta timeout_10ms = base::TimeDelta::FromMilliseconds(10); EXPECT_CALL(mock_transport_, SendPacket(_, kSize1)).Times(3). WillRepeatedly(testing::Return(true)); - paced_sender_.Process(); - testing_clock_.Advance(base::TimeDelta::FromMilliseconds(6)); + testing_clock_.Advance(timeout_10ms); + base::PlatformThread::Sleep(timeout_10ms); + { + RunLoop run_loop; + run_loop.RunUntilIdle(); + } + testing_clock_.Advance(base::TimeDelta::FromMilliseconds(6)); // Add second frame, after 16 ms. for (int i = 0; i < num_of_packets_in_frame; ++i) { - EXPECT_TRUE(paced_sender_.SendPacket(second_packet, + EXPECT_TRUE(paced_sender_->SendPacket(second_packet, num_of_packets_in_frame)); } @@ -193,53 +264,83 @@ TEST_F(PacedSenderTest, PaceWith60fps) { .WillRepeatedly(testing::Return(true)); EXPECT_CALL(mock_transport_, SendPacket(_, kSize2)).Times(1) .WillRepeatedly(testing::Return(true)); - paced_sender_.Process(); + + base::PlatformThread::Sleep(timeout_10ms); + { + RunLoop run_loop; + run_loop.RunUntilIdle(); + } testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10)); EXPECT_CALL(mock_transport_, SendPacket(_, kSize2)).Times(4) .WillRepeatedly(testing::Return(true)); - paced_sender_.Process(); + + base::PlatformThread::Sleep(timeout_10ms); + { + RunLoop run_loop; + run_loop.RunUntilIdle(); + } testing_clock_.Advance(base::TimeDelta::FromMilliseconds(3)); // Add third frame, after 33 ms. for (int i = 0; i < num_of_packets_in_frame; ++i) { - EXPECT_TRUE(paced_sender_.SendPacket(third_packet, + EXPECT_TRUE(paced_sender_->SendPacket(third_packet, num_of_packets_in_frame)); } - testing_clock_.Advance(base::TimeDelta::FromMilliseconds(7)); EXPECT_CALL(mock_transport_, SendPacket(_, kSize2)).Times(4) .WillRepeatedly(testing::Return(true)); EXPECT_CALL(mock_transport_, SendPacket(_, kSize3)).Times(1) .WillRepeatedly(testing::Return(true)); - paced_sender_.Process(); + + base::PlatformThread::Sleep(timeout_10ms); + { + RunLoop run_loop; + run_loop.RunUntilIdle(); + } // Add fourth frame, after 50 ms. - testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10)); for (int i = 0; i < num_of_packets_in_frame; ++i) { - EXPECT_TRUE(paced_sender_.SendPacket(fourth_packet, + EXPECT_TRUE(paced_sender_->SendPacket(fourth_packet, num_of_packets_in_frame)); } EXPECT_CALL(mock_transport_, SendPacket(_, kSize3)).Times(6) .WillRepeatedly(testing::Return(true)); - paced_sender_.Process(); + testing_clock_.Advance(timeout_10ms); + base::PlatformThread::Sleep(timeout_10ms); + { + RunLoop run_loop; + run_loop.RunUntilIdle(); + } - testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10)); EXPECT_CALL(mock_transport_, SendPacket(_, kSize3)).Times(2) .WillRepeatedly(testing::Return(true)); EXPECT_CALL(mock_transport_, SendPacket(_, kSize4)).Times(4) .WillRepeatedly(testing::Return(true)); - paced_sender_.Process(); + testing_clock_.Advance(timeout_10ms); + base::PlatformThread::Sleep(timeout_10ms); + { + RunLoop run_loop; + run_loop.RunUntilIdle(); + } - testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10)); EXPECT_CALL(mock_transport_, SendPacket(_, kSize4)).Times(5) .WillRepeatedly(testing::Return(true)); - paced_sender_.Process(); + testing_clock_.Advance(timeout_10ms); + base::PlatformThread::Sleep(timeout_10ms); + { + RunLoop run_loop; + run_loop.RunUntilIdle(); + } - testing_clock_.Advance(base::TimeDelta::FromMilliseconds(10)); EXPECT_CALL(mock_transport_, SendPacket(_, kSize4)).Times(0); - paced_sender_.Process(); + testing_clock_.Advance(timeout_10ms); + base::PlatformThread::Sleep(timeout_10ms); + { + RunLoop run_loop; + run_loop.RunUntilIdle(); + } } } // namespace cast diff --git a/media/cast/rtcp/rtcp.cc b/media/cast/rtcp/rtcp.cc index 16956f7..c3e2c8e 100644 --- a/media/cast/rtcp/rtcp.cc +++ b/media/cast/rtcp/rtcp.cc @@ -12,6 +12,7 @@ #include "media/cast/rtcp/rtcp_receiver.h" #include "media/cast/rtcp/rtcp_sender.h" #include "media/cast/rtcp/rtcp_utility.h" +#include "net/base/big_endian.h" namespace media { namespace cast { @@ -117,17 +118,7 @@ Rtcp::Rtcp(RtcpSenderFeedback* sender_feedback, Rtcp::~Rtcp() {} -base::TimeTicks Rtcp::TimeToSendNextRtcpReport() { - if (next_time_to_send_rtcp_.is_null()) { - UpdateNextTimeToSendRtcp(); - } - return next_time_to_send_rtcp_; -} - -void Rtcp::SetRemoteSSRC(uint32 ssrc) { - rtcp_receiver_->SetRemoteSSRC(ssrc); -} - +// static bool Rtcp::IsRtcpPacket(const uint8* packet, int length) { DCHECK_GE(length, 8) << "Invalid RTCP packet"; if (length < 8) return false; @@ -139,6 +130,26 @@ bool Rtcp::IsRtcpPacket(const uint8* packet, int length) { return false; } +// static +uint32 Rtcp::GetSsrcOfSender(const uint8* rtcp_buffer, int length) { + uint32 ssrc_of_sender; + net::BigEndianReader big_endian_reader(rtcp_buffer, length); + big_endian_reader.Skip(4); // Skip header + big_endian_reader.ReadU32(&ssrc_of_sender); + return ssrc_of_sender; +} + +base::TimeTicks Rtcp::TimeToSendNextRtcpReport() { + if (next_time_to_send_rtcp_.is_null()) { + UpdateNextTimeToSendRtcp(); + } + return next_time_to_send_rtcp_; +} + +void Rtcp::SetRemoteSSRC(uint32 ssrc) { + rtcp_receiver_->SetRemoteSSRC(ssrc); +} + void Rtcp::IncomingRtcpPacket(const uint8* rtcp_buffer, int length) { RtcpParser rtcp_parser(rtcp_buffer, length); if (!rtcp_parser.IsValid()) { diff --git a/media/cast/rtcp/rtcp.h b/media/cast/rtcp/rtcp.h index 5b06b82..31962a52 100644 --- a/media/cast/rtcp/rtcp.h +++ b/media/cast/rtcp/rtcp.h @@ -80,6 +80,8 @@ class Rtcp { static bool IsRtcpPacket(const uint8* rtcp_buffer, int length); + static uint32 GetSsrcOfSender(const uint8* rtcp_buffer, int length); + base::TimeTicks TimeToSendNextRtcpReport(); void SendRtcpReport(uint32 media_ssrc); void SendRtcpPli(uint32 media_ssrc); diff --git a/media/cast/rtp_sender/rtp_packetizer/rtp_packetizer.h b/media/cast/rtp_sender/rtp_packetizer/rtp_packetizer.h index 3ada98b..63035d0 100644 --- a/media/cast/rtp_sender/rtp_packetizer/rtp_packetizer.h +++ b/media/cast/rtp_sender/rtp_packetizer/rtp_packetizer.h @@ -20,6 +20,8 @@ namespace cast { class PacedPacketSender; // This object is only called from the main cast thread. +// This class break encoded audio and video frames into packets and add an RTP +// header to each packet. class RtpPacketizer { public: RtpPacketizer(PacedPacketSender* transport, diff --git a/media/cast/video_sender/video_sender.cc b/media/cast/video_sender/video_sender.cc index 83f3397..1b42238 100644 --- a/media/cast/video_sender/video_sender.cc +++ b/media/cast/video_sender/video_sender.cc @@ -179,8 +179,10 @@ void VideoSender::OnReceivedIntraFrameRequest() { last_sent_frame_id_ = -1; } -void VideoSender::IncomingRtcpPacket(const uint8* packet, int length) { +void VideoSender::IncomingRtcpPacket(const uint8* packet, int length, + const base::Closure callback) { rtcp_->IncomingRtcpPacket(packet, length); + cast_thread_->PostTask(CastThread::MAIN, FROM_HERE, callback); } void VideoSender::ScheduleNextRtcpReport() { diff --git a/media/cast/video_sender/video_sender.h b/media/cast/video_sender/video_sender.h index ff45326..9098e97 100644 --- a/media/cast/video_sender/video_sender.h +++ b/media/cast/video_sender/video_sender.h @@ -62,7 +62,8 @@ class VideoSender : public base::NonThreadSafe, const base::Closure callback); // Only called from the main cast thread. - void IncomingRtcpPacket(const uint8* packet, int length); + void IncomingRtcpPacket(const uint8* packet, int length, + const base::Closure callback); void set_clock(base::TickClock* clock) { clock_ = clock; |