diff options
author | hubbe <hubbe@chromium.org> | 2014-12-10 17:05:57 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2014-12-11 01:06:23 +0000 |
commit | 8029778b0e060e4a22c59fa40ca95e12c437c1f7 (patch) | |
tree | 1ee59bf03b03d4c0e803a1b79026a4d90d51bf6c /media/cast | |
parent | c571107b9edb9d666d7bcfa7d371ba366b4a4afb (diff) | |
download | chromium_src-8029778b0e060e4a22c59fa40ca95e12c437c1f7.zip chromium_src-8029778b0e060e4a22c59fa40ca95e12c437c1f7.tar.gz chromium_src-8029778b0e060e4a22c59fa40ca95e12c437c1f7.tar.bz2 |
Cast: Make receiver use cast_transport
Also, fix rtcp_builder to not be responsible for logging event redundancy.
Instead, we make the rtcp_event_subscriber do that.
Review URL: https://codereview.chromium.org/765643006
Cr-Commit-Position: refs/heads/master@{#307814}
Diffstat (limited to 'media/cast')
40 files changed, 629 insertions, 309 deletions
diff --git a/media/cast/BUILD.gn b/media/cast/BUILD.gn index 488afb1..745b9919 100644 --- a/media/cast/BUILD.gn +++ b/media/cast/BUILD.gn @@ -259,6 +259,8 @@ test("cast_unittests") { "logging/simple_event_subscriber_unittest.cc", "logging/stats_event_subscriber_unittest.cc", "net/cast_transport_sender_impl_unittest.cc", + "net/mock_cast_transport_sender.cc", + "net/mock_cast_transport_sender.h", "net/pacing/mock_paced_packet_sender.cc", "net/pacing/mock_paced_packet_sender.h", "net/pacing/paced_sender_unittest.cc", diff --git a/media/cast/cast.gyp b/media/cast/cast.gyp index a20fde6..8962e76 100644 --- a/media/cast/cast.gyp +++ b/media/cast/cast.gyp @@ -115,8 +115,6 @@ 'net/rtp/framer.h', 'net/rtp/receiver_stats.cc', 'net/rtp/receiver_stats.h', - 'net/rtp/rtp_parser.cc', - 'net/rtp/rtp_parser.h', 'net/rtp/rtp_receiver_defines.cc', 'net/rtp/rtp_receiver_defines.h', ], # source @@ -235,6 +233,8 @@ 'net/rtp/packet_storage.h', 'net/rtp/rtp_packetizer.cc', 'net/rtp/rtp_packetizer.h', + 'net/rtp/rtp_parser.cc', + 'net/rtp/rtp_parser.h', 'net/rtp/rtp_sender.cc', 'net/rtp/rtp_sender.h', 'net/udp_transport.cc', diff --git a/media/cast/cast_receiver.h b/media/cast/cast_receiver.h index f579422..a2668c7 100644 --- a/media/cast/cast_receiver.h +++ b/media/cast/cast_receiver.h @@ -16,6 +16,7 @@ #include "media/base/audio_bus.h" #include "media/cast/cast_config.h" #include "media/cast/cast_environment.h" +#include "media/cast/net/cast_transport_sender.h" namespace media { class VideoFrame; @@ -51,13 +52,11 @@ class CastReceiver { scoped_refptr<CastEnvironment> cast_environment, const FrameReceiverConfig& audio_config, const FrameReceiverConfig& video_config, - PacketSender* const packet_sender); + CastTransportSender* const transport); // All received RTP and RTCP packets for the call should be sent to this // PacketReceiver. Can be called from any thread. - // TODO(hubbe): Replace with: - // virtual void ReceivePacket(scoped_ptr<Packet> packet) = 0; - virtual PacketReceiverCallback packet_receiver() = 0; + virtual void ReceivePacket(scoped_ptr<Packet> packet) = 0; // Polling interface to get audio and video frames from the CastReceiver. The // the RequestDecodedXXXXXFrame() methods utilize internal software-based diff --git a/media/cast/cast_testing.gypi b/media/cast/cast_testing.gypi index 0474eef..6a554e0 100644 --- a/media/cast/cast_testing.gypi +++ b/media/cast/cast_testing.gypi @@ -83,6 +83,8 @@ 'logging/stats_event_subscriber_unittest.cc', 'net/cast_transport_sender_impl_unittest.cc', 'net/frame_id_wrap_helper_test.cc', + 'net/mock_cast_transport_sender.cc', + 'net/mock_cast_transport_sender.h', 'net/pacing/mock_paced_packet_sender.cc', 'net/pacing/mock_paced_packet_sender.h', 'net/pacing/paced_sender_unittest.cc', diff --git a/media/cast/net/cast_transport_config.h b/media/cast/net/cast_transport_config.h index 0f101d8..f73d0a7 100644 --- a/media/cast/net/cast_transport_config.h +++ b/media/cast/net/cast_transport_config.h @@ -127,6 +127,8 @@ typedef scoped_refptr<base::RefCountedData<Packet> > PacketRef; typedef std::vector<PacketRef> PacketList; typedef base::Callback<void(scoped_ptr<Packet> packet)> PacketReceiverCallback; +typedef base::Callback<bool(scoped_ptr<Packet> packet)> + PacketReceiverCallbackWithStatus; class PacketSender { public: diff --git a/media/cast/net/cast_transport_sender.h b/media/cast/net/cast_transport_sender.h index 46031a8..fc88a5c 100644 --- a/media/cast/net/cast_transport_sender.h +++ b/media/cast/net/cast_transport_sender.h @@ -40,6 +40,8 @@ class NetLog; namespace media { namespace cast { +struct RtpReceiverStatistics; +struct RtcpTimeData; // Following the initialization of either audio or video an initialization // status will be sent via this callback. @@ -56,11 +58,13 @@ class CastTransportSender : public base::NonThreadSafe { static scoped_ptr<CastTransportSender> Create( net::NetLog* net_log, base::TickClock* clock, + const net::IPEndPoint& local_end_point, const net::IPEndPoint& remote_end_point, scoped_ptr<base::DictionaryValue> options, const CastTransportStatusCallback& status_callback, const BulkRawEventsCallback& raw_events_callback, base::TimeDelta raw_events_callback_interval, + const PacketReceiverCallback& packet_callback, const scoped_refptr<base::SingleThreadTaskRunner>& transport_task_runner); virtual ~CastTransportSender() {} @@ -100,6 +104,23 @@ class CastTransportSender : public base::NonThreadSafe { // Returns a callback for receiving packets for testing purposes. virtual PacketReceiverCallback PacketReceiverForTesting(); + + // The following functions are needed for receving. + + // Add a valid SSRC. This is used to verify that incoming packets + // come from the right sender. Without valid SSRCs, the return address cannot + // be automatically established. + virtual void AddValidSsrc(uint32 ssrc) = 0; + + // Send an RTCP message from receiver to sender. + virtual void SendRtcpFromRtpReceiver( + uint32 ssrc, + uint32 sender_ssrc, + const RtcpTimeData& time_data, + const RtcpCastMessage* cast_message, + base::TimeDelta target_delay, + const ReceiverRtcpEventSubscriber::RtcpEvents* rtcp_events, + const RtpReceiverStatistics* rtp_receiver_statistics) = 0; }; } // namespace cast diff --git a/media/cast/net/cast_transport_sender_impl.cc b/media/cast/net/cast_transport_sender_impl.cc index 390180f..4ef3017 100644 --- a/media/cast/net/cast_transport_sender_impl.cc +++ b/media/cast/net/cast_transport_sender_impl.cc @@ -52,21 +52,25 @@ int32 GetTransportSendBufferSize(const base::DictionaryValue& options) { scoped_ptr<CastTransportSender> CastTransportSender::Create( net::NetLog* net_log, base::TickClock* clock, + const net::IPEndPoint& local_end_point, const net::IPEndPoint& remote_end_point, scoped_ptr<base::DictionaryValue> options, const CastTransportStatusCallback& status_callback, const BulkRawEventsCallback& raw_events_callback, base::TimeDelta raw_events_callback_interval, + const PacketReceiverCallback& packet_callback, const scoped_refptr<base::SingleThreadTaskRunner>& transport_task_runner) { return scoped_ptr<CastTransportSender>( new CastTransportSenderImpl(net_log, clock, + local_end_point, remote_end_point, options.Pass(), status_callback, raw_events_callback, raw_events_callback_interval, transport_task_runner.get(), + packet_callback, NULL)); } @@ -77,12 +81,14 @@ PacketReceiverCallback CastTransportSender::PacketReceiverForTesting() { CastTransportSenderImpl::CastTransportSenderImpl( net::NetLog* net_log, base::TickClock* clock, + const net::IPEndPoint& local_end_point, const net::IPEndPoint& remote_end_point, scoped_ptr<base::DictionaryValue> options, const CastTransportStatusCallback& status_callback, const BulkRawEventsCallback& raw_events_callback, base::TimeDelta raw_events_callback_interval, const scoped_refptr<base::SingleThreadTaskRunner>& transport_task_runner, + const PacketReceiverCallback& packet_callback, PacketSender* external_transport) : clock_(clock), status_callback_(status_callback), @@ -92,7 +98,7 @@ CastTransportSenderImpl::CastTransportSenderImpl( NULL : new UdpTransport(net_log, transport_task_runner, - net::IPEndPoint(), + local_end_point, remote_end_point, GetTransportSendBufferSize(*options), status_callback)), @@ -109,6 +115,7 @@ CastTransportSenderImpl::CastTransportSenderImpl( raw_events_callback_(raw_events_callback), raw_events_callback_interval_(raw_events_callback_interval), last_byte_acked_for_audio_(0), + packet_callback_(packet_callback), weak_factory_(this) { DCHECK(clock_); if (!raw_events_callback_.is_null()) { @@ -129,7 +136,7 @@ CastTransportSenderImpl::CastTransportSenderImpl( } transport_->StartReceiving( base::Bind(&CastTransportSenderImpl::OnReceivedPacket, - weak_factory_.GetWeakPtr())); + base::Unretained(this))); int wifi_options = 0; if (options->HasKey(kOptionWifiDisableScan)) { wifi_options |= net::WIFI_OPTIONS_DISABLE_SCAN; @@ -144,6 +151,9 @@ CastTransportSenderImpl::CastTransportSenderImpl( } CastTransportSenderImpl::~CastTransportSenderImpl() { + if (transport_) { + transport_->StopReceiving(); + } if (event_subscriber_.get()) logging_.RemoveRawEventSubscriber(event_subscriber_.get()); } @@ -183,6 +193,7 @@ void CastTransportSenderImpl::InitializeAudio( config.ssrc, config.feedback_ssrc)); pacer_.RegisterAudioSsrc(config.ssrc); + AddValidSsrc(config.feedback_ssrc); status_callback_.Run(TRANSPORT_AUDIO_INITIALIZED); } @@ -216,6 +227,7 @@ void CastTransportSenderImpl::InitializeVideo( config.ssrc, config.feedback_ssrc)); pacer_.RegisterVideoSsrc(config.ssrc); + AddValidSsrc(config.feedback_ssrc); status_callback_.Run(TRANSPORT_VIDEO_INITIALIZED); } @@ -314,8 +326,9 @@ void CastTransportSenderImpl::ResendPackets( } PacketReceiverCallback CastTransportSenderImpl::PacketReceiverForTesting() { - return base::Bind(&CastTransportSenderImpl::OnReceivedPacket, - weak_factory_.GetWeakPtr()); + return base::Bind( + base::IgnoreResult(&CastTransportSenderImpl::OnReceivedPacket), + weak_factory_.GetWeakPtr()); } void CastTransportSenderImpl::SendRawEvents() { @@ -334,18 +347,35 @@ void CastTransportSenderImpl::SendRawEvents() { raw_events_callback_interval_); } -void CastTransportSenderImpl::OnReceivedPacket(scoped_ptr<Packet> packet) { +bool CastTransportSenderImpl::OnReceivedPacket(scoped_ptr<Packet> packet) { + const uint8_t* const data = &packet->front(); + const size_t length = packet->size(); + uint32 ssrc; + if (Rtcp::IsRtcpPacket(data, length)) { + ssrc = Rtcp::GetSsrcOfSender(data, length); + } else if (!RtpParser::ParseSsrc(data, length, &ssrc)) { + VLOG(1) << "Invalid RTP packet."; + return false; + } + if (valid_ssrcs_.find(ssrc) == valid_ssrcs_.end()) { + VLOG(1) << "Stale packet received."; + return false; + } + if (audio_rtcp_session_ && - audio_rtcp_session_->IncomingRtcpPacket(&packet->front(), - packet->size())) { - return; + audio_rtcp_session_->IncomingRtcpPacket(data, length)) { + return true; } if (video_rtcp_session_ && - video_rtcp_session_->IncomingRtcpPacket(&packet->front(), - packet->size())) { - return; + video_rtcp_session_->IncomingRtcpPacket(data, length)) { + return true; } - VLOG(1) << "Stale packet received."; + if (packet_callback_.is_null()) { + VLOG(1) << "Stale packet received."; + return false; + } + packet_callback_.Run(packet.Pass()); + return true; } void CastTransportSenderImpl::OnReceivedLogMessage( @@ -422,5 +452,31 @@ void CastTransportSenderImpl::OnReceivedCastMessage( dedup_info); } +void CastTransportSenderImpl::AddValidSsrc(uint32 ssrc) { + valid_ssrcs_.insert(ssrc); +} + +void CastTransportSenderImpl::SendRtcpFromRtpReceiver( + uint32 ssrc, + uint32 sender_ssrc, + const RtcpTimeData& time_data, + const RtcpCastMessage* cast_message, + base::TimeDelta target_delay, + const ReceiverRtcpEventSubscriber::RtcpEvents* rtcp_events, + const RtpReceiverStatistics* rtp_receiver_statistics) { + const Rtcp rtcp(RtcpCastMessageCallback(), + RtcpRttCallback(), + RtcpLogMessageCallback(), + clock_, + &pacer_, + ssrc, + sender_ssrc); + rtcp.SendRtcpFromRtpReceiver(time_data, + cast_message, + target_delay, + rtcp_events, + rtp_receiver_statistics); +} + } // namespace cast } // namespace media diff --git a/media/cast/net/cast_transport_sender_impl.h b/media/cast/net/cast_transport_sender_impl.h index b241ec8..c0d096a 100644 --- a/media/cast/net/cast_transport_sender_impl.h +++ b/media/cast/net/cast_transport_sender_impl.h @@ -24,6 +24,8 @@ #ifndef MEDIA_CAST_NET_CAST_TRANSPORT_SENDER_IMPL_H_ #define MEDIA_CAST_NET_CAST_TRANSPORT_SENDER_IMPL_H_ +#include <set> + #include "base/callback.h" #include "base/gtest_prod_util.h" #include "base/memory/ref_counted.h" @@ -38,6 +40,7 @@ #include "media/cast/net/cast_transport_sender.h" #include "media/cast/net/pacing/paced_sender.h" #include "media/cast/net/rtcp/rtcp.h" +#include "media/cast/net/rtp/rtp_parser.h" #include "media/cast/net/rtp/rtp_sender.h" namespace media { @@ -65,19 +68,23 @@ class CastTransportSenderImpl : public CastTransportSender { // "disable_wifi_scan" (value ignored) - disable wifi scans while streaming // "media_streaming_mode" (value ignored) - turn media streaming mode on // Note, these options may be ignored on some platforms. + // TODO(hubbe): Too many callbacks, replace with an interface. CastTransportSenderImpl( net::NetLog* net_log, base::TickClock* clock, + const net::IPEndPoint& local_end_point, const net::IPEndPoint& remote_end_point, scoped_ptr<base::DictionaryValue> options, const CastTransportStatusCallback& status_callback, const BulkRawEventsCallback& raw_events_callback, base::TimeDelta raw_events_callback_interval, const scoped_refptr<base::SingleThreadTaskRunner>& transport_task_runner, + const PacketReceiverCallback& packet_callback, PacketSender* external_transport); ~CastTransportSenderImpl() override; + // CastTransportSender implementation. void InitializeAudio(const CastTransportRtpConfig& config, const RtcpCastMessageCallback& cast_message_cb, const RtcpRttCallback& rtt_cb) override; @@ -97,6 +104,18 @@ class CastTransportSenderImpl : public CastTransportSender { PacketReceiverCallback PacketReceiverForTesting() override; + // CastTransportReceiver implementation. + void AddValidSsrc(uint32 ssrc) override; + + void SendRtcpFromRtpReceiver( + uint32 ssrc, + uint32 sender_ssrc, + const RtcpTimeData& time_data, + const RtcpCastMessage* cast_message, + base::TimeDelta target_delay, + const ReceiverRtcpEventSubscriber::RtcpEvents* rtcp_events, + const RtpReceiverStatistics* rtp_receiver_statistics) override; + private: FRIEND_TEST_ALL_PREFIXES(CastTransportSenderImplTest, NacksCancelRetransmits); FRIEND_TEST_ALL_PREFIXES(CastTransportSenderImplTest, CancelRetransmits); @@ -118,7 +137,7 @@ class CastTransportSenderImpl : public CastTransportSender { void SendRawEvents(); // Called when a packet is received. - void OnReceivedPacket(scoped_ptr<Packet> packet); + bool OnReceivedPacket(scoped_ptr<Packet> packet); // Called when a log message is received. void OnReceivedLogMessage(EventMediaType media_type, @@ -167,6 +186,13 @@ class CastTransportSenderImpl : public CastTransportSender { // audio packet. int64 last_byte_acked_for_audio_; + // Packets that don't match these ssrcs are ignored. + std::set<uint32> valid_ssrcs_; + + // Called with incoming packets. (Unless they match the + // channels created by Initialize{Audio,Video}. + PacketReceiverCallback packet_callback_; + scoped_ptr<net::ScopedWifiOptions> wifi_options_autoreset_; base::WeakPtrFactory<CastTransportSenderImpl> weak_factory_; diff --git a/media/cast/net/cast_transport_sender_impl_unittest.cc b/media/cast/net/cast_transport_sender_impl_unittest.cc index 7b95157..e841868 100644 --- a/media/cast/net/cast_transport_sender_impl_unittest.cc +++ b/media/cast/net/cast_transport_sender_impl_unittest.cc @@ -80,11 +80,13 @@ class CastTransportSenderImplTest : public ::testing::Test { new CastTransportSenderImpl(NULL, &testing_clock_, net::IPEndPoint(), + net::IPEndPoint(), make_scoped_ptr(new base::DictionaryValue), base::Bind(&UpdateCastTransportStatus), BulkRawEventsCallback(), base::TimeDelta(), task_runner_, + PacketReceiverCallback(), &transport_)); task_runner_->RunTasks(); } @@ -101,11 +103,13 @@ class CastTransportSenderImplTest : public ::testing::Test { new CastTransportSenderImpl(NULL, &testing_clock_, net::IPEndPoint(), + net::IPEndPoint(), options.Pass(), base::Bind(&UpdateCastTransportStatus), BulkRawEventsCallback(), base::TimeDelta(), task_runner_, + PacketReceiverCallback(), &transport_)); task_runner_->RunTasks(); } @@ -115,12 +119,14 @@ class CastTransportSenderImplTest : public ::testing::Test { NULL, &testing_clock_, net::IPEndPoint(), + net::IPEndPoint(), make_scoped_ptr(new base::DictionaryValue), base::Bind(&UpdateCastTransportStatus), base::Bind(&CastTransportSenderImplTest::LogRawEvents, base::Unretained(this)), base::TimeDelta::FromMilliseconds(10), task_runner_, + PacketReceiverCallback(), &transport_)); task_runner_->RunTasks(); } diff --git a/media/cast/net/mock_cast_transport_sender.cc b/media/cast/net/mock_cast_transport_sender.cc new file mode 100644 index 0000000..752698b --- /dev/null +++ b/media/cast/net/mock_cast_transport_sender.cc @@ -0,0 +1,15 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "media/cast/net/mock_cast_transport_sender.h" + +namespace media { +namespace cast { + +MockCastTransportSender::MockCastTransportSender() {} + +MockCastTransportSender::~MockCastTransportSender() {} + +} // namespace cast +} // namespace media diff --git a/media/cast/net/mock_cast_transport_sender.h b/media/cast/net/mock_cast_transport_sender.h new file mode 100644 index 0000000..0827e1f --- /dev/null +++ b/media/cast/net/mock_cast_transport_sender.h @@ -0,0 +1,51 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MEDIA_CAST_NET_MOCK_CAST_TRANSPORT_SENDER_H_ +#define MEDIA_CAST_NET_MOCK_CAST_TRANSPORT_SENDER_H_ + +#include "media/cast/net/cast_transport_sender.h" +#include "testing/gmock/include/gmock/gmock.h" + +namespace media { +namespace cast { + +class MockCastTransportSender : public CastTransportSender { + public: + MockCastTransportSender(); + virtual ~MockCastTransportSender(); + + MOCK_METHOD3(InitializeAudio, void( + const CastTransportRtpConfig& config, + const RtcpCastMessageCallback& cast_message_cb, + const RtcpRttCallback& rtt_cb)); + MOCK_METHOD3(InitializeVideo, void( + const CastTransportRtpConfig& config, + const RtcpCastMessageCallback& cast_message_cb, + const RtcpRttCallback& rtt_cb)); + MOCK_METHOD2(InsertFrame, void(uint32 ssrc, const EncodedFrame& frame)); + MOCK_METHOD3(SendSenderReport, void( + uint32 ssrc, + base::TimeTicks current_time, + uint32 current_time_as_rtp_timestamp)); + MOCK_METHOD2(CancelSendingFrames, void( + uint32 ssrc, + const std::vector<uint32>& frame_ids)); + MOCK_METHOD2(ResendFrameForKickstart, void(uint32 ssrc, uint32 frame_id)); + MOCK_METHOD0(PacketReceiverForTesting, PacketReceiverCallback()); + MOCK_METHOD1(AddValidSsrc, void(uint32 ssrc)); + MOCK_METHOD7(SendRtcpFromRtpReceiver, void( + uint32 ssrc, + uint32 sender_ssrc, + const RtcpTimeData& time_data, + const RtcpCastMessage* cast_message, + base::TimeDelta target_delay, + const ReceiverRtcpEventSubscriber::RtcpEvents* rtcp_events, + const RtpReceiverStatistics* rtp_receiver_statistics)); +}; + +} // namespace cast +} // namespace media + +#endif // MEDIA_CAST_NET_MOCK_CAST_TRANSPORT_SENDER_H_ diff --git a/media/cast/net/rtcp/receiver_rtcp_event_subscriber.cc b/media/cast/net/rtcp/receiver_rtcp_event_subscriber.cc index a751ff9..4ae954f 100644 --- a/media/cast/net/rtcp/receiver_rtcp_event_subscriber.cc +++ b/media/cast/net/rtcp/receiver_rtcp_event_subscriber.cc @@ -11,9 +11,14 @@ namespace cast { ReceiverRtcpEventSubscriber::ReceiverRtcpEventSubscriber( const size_t max_size_to_retain, EventMediaType type) - : max_size_to_retain_(max_size_to_retain), type_(type) { + : max_size_to_retain_( + max_size_to_retain * (kResendDelay * kNumResends + 1)), + type_(type) { DCHECK(max_size_to_retain_ > 0u); DCHECK(type_ == AUDIO_EVENT || type_ == VIDEO_EVENT); + for (size_t i = 0; i < kNumResends; i++) { + send_ptrs_[i] = 0; + } } ReceiverRtcpEventSubscriber::~ReceiverRtcpEventSubscriber() { @@ -33,7 +38,7 @@ void ReceiverRtcpEventSubscriber::OnReceiveFrameEvent( case FRAME_DECODED: rtcp_event.type = frame_event.type; rtcp_event.timestamp = frame_event.timestamp; - rtcp_events_.insert( + rtcp_events_.push_back( std::make_pair(frame_event.rtp_timestamp, rtcp_event)); break; default: @@ -42,8 +47,6 @@ void ReceiverRtcpEventSubscriber::OnReceiveFrameEvent( } TruncateMapIfNeeded(); - - DCHECK(rtcp_events_.size() <= max_size_to_retain_); } void ReceiverRtcpEventSubscriber::OnReceivePacketEvent( @@ -56,22 +59,58 @@ void ReceiverRtcpEventSubscriber::OnReceivePacketEvent( rtcp_event.type = packet_event.type; rtcp_event.timestamp = packet_event.timestamp; rtcp_event.packet_id = packet_event.packet_id; - rtcp_events_.insert( + rtcp_events_.push_back( std::make_pair(packet_event.rtp_timestamp, rtcp_event)); } } TruncateMapIfNeeded(); - - DCHECK(rtcp_events_.size() <= max_size_to_retain_); } -void ReceiverRtcpEventSubscriber::GetRtcpEventsAndReset( - RtcpEventMultiMap* rtcp_events) { +struct CompareByFirst { + bool operator()(const std::pair<RtpTimestamp, RtcpEvent>& a, + const std::pair<RtpTimestamp, RtcpEvent>& b) { + return a.first < b.first; + } +}; + +void ReceiverRtcpEventSubscriber::GetRtcpEventsWithRedundancy( + RtcpEvents* rtcp_events) { DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(rtcp_events); - rtcp_events->swap(rtcp_events_); - rtcp_events_.clear(); + + uint64 event_level = rtcp_events_.size() + popped_events_; + event_levels_for_past_frames_.push_back(event_level); + + for (size_t i = 0; i < kNumResends; i++) { + size_t resend_delay = kResendDelay * i; + if (event_levels_for_past_frames_.size() < resend_delay + 1) + break; + + uint64 send_limit = event_levels_for_past_frames_[ + event_levels_for_past_frames_.size() - 1 - resend_delay]; + + if (send_ptrs_[i] < popped_events_) { + send_ptrs_[i] = popped_events_; + } + + while (send_ptrs_[i] < send_limit && + rtcp_events->size() < kMaxEventsPerRTCP) { + rtcp_events->push_back(rtcp_events_[send_ptrs_[i] - popped_events_]); + send_ptrs_[i]++; + } + send_limit = send_ptrs_[i]; + } + + if (event_levels_for_past_frames_.size() > kResendDelay * (kNumResends + 1)) { + while (popped_events_ < event_levels_for_past_frames_[0]) { + rtcp_events_.pop_front(); + popped_events_++; + } + event_levels_for_past_frames_.pop_front(); + } + + std::sort(rtcp_events->begin(), rtcp_events->end(), CompareByFirst()); } void ReceiverRtcpEventSubscriber::TruncateMapIfNeeded() { @@ -81,8 +120,11 @@ void ReceiverRtcpEventSubscriber::TruncateMapIfNeeded() { DVLOG(3) << "RTCP event map exceeded size limit; " << "removing oldest entry"; // This is fine since we only insert elements one at a time. - rtcp_events_.erase(rtcp_events_.begin()); + rtcp_events_.pop_front(); + popped_events_++; } + + DCHECK(rtcp_events_.size() <= max_size_to_retain_); } bool ReceiverRtcpEventSubscriber::ShouldProcessEvent( diff --git a/media/cast/net/rtcp/receiver_rtcp_event_subscriber.h b/media/cast/net/rtcp/receiver_rtcp_event_subscriber.h index c08733c..b5321f9 100644 --- a/media/cast/net/rtcp/receiver_rtcp_event_subscriber.h +++ b/media/cast/net/rtcp/receiver_rtcp_event_subscriber.h @@ -5,7 +5,8 @@ #ifndef MEDIA_CAST_RTCP_RECEIVER_RTCP_EVENT_SUBSCRIBER_H_ #define MEDIA_CAST_RTCP_RECEIVER_RTCP_EVENT_SUBSCRIBER_H_ -#include <map> +#include <deque> +#include <vector> #include "base/threading/thread_checker.h" #include "media/cast/logging/logging_defines.h" @@ -15,6 +16,10 @@ namespace media { namespace cast { +static const size_t kNumResends = 3; +static const size_t kResendDelay = 10; +static const size_t kMaxEventsPerRTCP = 20; + // A RawEventSubscriber implementation with the following properties: // - Only processes raw event types that are relevant for sending from cast // receiver to cast sender via RTCP. @@ -26,7 +31,8 @@ namespace cast { // timestamp) up to the size limit. class ReceiverRtcpEventSubscriber : public RawEventSubscriber { public: - typedef std::multimap<RtpTimestamp, RtcpEvent> RtcpEventMultiMap; + typedef std::pair<RtpTimestamp, RtcpEvent> RtcpEventPair; + typedef std::vector<std::pair<RtpTimestamp, RtcpEvent> > RtcpEvents; // |max_size_to_retain|: The object will keep up to |max_size_to_retain| // events @@ -43,9 +49,9 @@ class ReceiverRtcpEventSubscriber : public RawEventSubscriber { void OnReceiveFrameEvent(const FrameEvent& frame_event) override; void OnReceivePacketEvent(const PacketEvent& packet_event) override; - // Assigns events collected to |rtcp_events| and clears them from this - // object. - void GetRtcpEventsAndReset(RtcpEventMultiMap* rtcp_events); + // Assigns events collected to |rtcp_events|. If there is space, some + // older events will be added for redundancy as well. + void GetRtcpEventsWithRedundancy(RtcpEvents* rtcp_events); private: // If |rtcp_events_.size()| exceeds |max_size_to_retain_|, remove an oldest @@ -65,7 +71,24 @@ class ReceiverRtcpEventSubscriber : public RawEventSubscriber { // to differentiate between video and audio frames, but since the // implementation doesn't mix audio and video frame events, RTP timestamp // only as key is fine. - RtcpEventMultiMap rtcp_events_; + std::deque<RtcpEventPair> rtcp_events_; + + // Counts how many events have been removed from rtcp_events_. + uint64 popped_events_; + + // Events greater than send_ptrs_[0] have not been sent yet. + // Events greater than send_ptrs_[1] have been transmit once. + // Note that these counters use absolute numbers, so you need + // to subtract popped_events_ before looking up the events in + // rtcp_events_. + uint64 send_ptrs_[kNumResends]; + + // For each frame, we push how many events have been added to + // rtcp_events_ so far. We use this to make sure that + // send_ptrs_[N+1] is always at least kResendDelay frames behind + // send_ptrs_[N]. Old information is removed so that information + // for (kNumResends + 1) * kResendDelay frames remain. + std::deque<uint64> event_levels_for_past_frames_; // Ensures methods are only called on the main thread. base::ThreadChecker thread_checker_; diff --git a/media/cast/net/rtcp/receiver_rtcp_event_subscriber_unittest.cc b/media/cast/net/rtcp/receiver_rtcp_event_subscriber_unittest.cc index 35b72a7..993df70ea 100644 --- a/media/cast/net/rtcp/receiver_rtcp_event_subscriber_unittest.cc +++ b/media/cast/net/rtcp/receiver_rtcp_event_subscriber_unittest.cc @@ -99,8 +99,8 @@ TEST_F(ReceiverRtcpEventSubscriberTest, LogVideoEvents) { Init(VIDEO_EVENT); InsertEvents(); - ReceiverRtcpEventSubscriber::RtcpEventMultiMap rtcp_events; - event_subscriber_->GetRtcpEventsAndReset(&rtcp_events); + ReceiverRtcpEventSubscriber::RtcpEvents rtcp_events; + event_subscriber_->GetRtcpEventsWithRedundancy(&rtcp_events); EXPECT_EQ(3u, rtcp_events.size()); } @@ -108,8 +108,8 @@ TEST_F(ReceiverRtcpEventSubscriberTest, LogAudioEvents) { Init(AUDIO_EVENT); InsertEvents(); - ReceiverRtcpEventSubscriber::RtcpEventMultiMap rtcp_events; - event_subscriber_->GetRtcpEventsAndReset(&rtcp_events); + ReceiverRtcpEventSubscriber::RtcpEvents rtcp_events; + event_subscriber_->GetRtcpEventsWithRedundancy(&rtcp_events); EXPECT_EQ(3u, rtcp_events.size()); } @@ -122,8 +122,8 @@ TEST_F(ReceiverRtcpEventSubscriberTest, DropEventsWhenSizeExceeded) { /*rtp_timestamp*/ i * 10, /*frame_id*/ i); } - ReceiverRtcpEventSubscriber::RtcpEventMultiMap rtcp_events; - event_subscriber_->GetRtcpEventsAndReset(&rtcp_events); + ReceiverRtcpEventSubscriber::RtcpEvents rtcp_events; + event_subscriber_->GetRtcpEventsWithRedundancy(&rtcp_events); EXPECT_EQ(10u, rtcp_events.size()); } diff --git a/media/cast/net/rtcp/rtcp.cc b/media/cast/net/rtcp/rtcp.cc index 4e42ed0..3835bdd 100644 --- a/media/cast/net/rtcp/rtcp.cc +++ b/media/cast/net/rtcp/rtcp.cc @@ -53,13 +53,6 @@ std::pair<uint64, uint64> GetReceiverEventKey( } // namespace -RtpReceiverStatistics::RtpReceiverStatistics() : - fraction_lost(0), - cumulative_lost(0), - extended_high_sequence_number(0), - jitter(0) { -} - Rtcp::Rtcp(const RtcpCastMessageCallback& cast_callback, const RtcpRttCallback& rtt_callback, const RtcpLogMessageCallback& log_callback, @@ -198,19 +191,27 @@ bool Rtcp::DedupeReceiverLog(RtcpReceiverLogMessage* receiver_log) { return !receiver_log->empty(); } +RtcpTimeData Rtcp::ConvertToNTPAndSave(base::TimeTicks now) { + RtcpTimeData ret; + ret.timestamp = now; + + // Attach our NTP to all RTCP packets; with this information a "smart" sender + // can make decisions based on how old the RTCP message is. + ConvertTimeTicksToNtp(now, &ret.ntp_seconds, &ret.ntp_fraction); + SaveLastSentNtpTime(now, ret.ntp_seconds, ret.ntp_fraction); + return ret; +} + void Rtcp::SendRtcpFromRtpReceiver( + RtcpTimeData time_data, const RtcpCastMessage* cast_message, base::TimeDelta target_delay, - const ReceiverRtcpEventSubscriber::RtcpEventMultiMap* rtcp_events, - RtpReceiverStatistics* rtp_receiver_statistics) { - base::TimeTicks now = clock_->NowTicks(); + const ReceiverRtcpEventSubscriber::RtcpEvents* rtcp_events, + const RtpReceiverStatistics* rtp_receiver_statistics) const { RtcpReportBlock report_block; RtcpReceiverReferenceTimeReport rrtr; - - // Attach our NTP to all RTCP packets; with this information a "smart" sender - // can make decisions based on how old the RTCP message is. - ConvertTimeTicksToNtp(now, &rrtr.ntp_seconds, &rrtr.ntp_fraction); - SaveLastSentNtpTime(now, rrtr.ntp_seconds, rrtr.ntp_fraction); + rrtr.ntp_seconds = time_data.ntp_seconds; + rrtr.ntp_fraction = time_data.ntp_fraction; if (rtp_receiver_statistics) { report_block.remote_ssrc = 0; // Not needed to set send side. @@ -224,7 +225,7 @@ void Rtcp::SendRtcpFromRtpReceiver( if (!time_last_report_received_.is_null()) { uint32 delay_seconds = 0; uint32 delay_fraction = 0; - base::TimeDelta delta = now - time_last_report_received_; + base::TimeDelta delta = time_data.timestamp - time_last_report_received_; ConvertTimeToFractions(delta.InMicroseconds(), &delay_seconds, &delay_fraction); report_block.delay_since_last_sr = @@ -233,9 +234,10 @@ void Rtcp::SendRtcpFromRtpReceiver( report_block.delay_since_last_sr = 0; } } + RtcpBuilder rtcp_builder(local_ssrc_); packet_sender_->SendRtcpPacket( local_ssrc_, - rtcp_builder_.BuildRtcpFromReceiver( + rtcp_builder.BuildRtcpFromReceiver( rtp_receiver_statistics ? &report_block : NULL, &rrtr, cast_message, diff --git a/media/cast/net/rtcp/rtcp.h b/media/cast/net/rtcp/rtcp.h index d1cfa48..35608e0 100644 --- a/media/cast/net/rtcp/rtcp.h +++ b/media/cast/net/rtcp/rtcp.h @@ -38,14 +38,6 @@ typedef std::pair<uint32, base::TimeTicks> RtcpSendTimePair; typedef std::map<uint32, base::TimeTicks> RtcpSendTimeMap; typedef std::queue<RtcpSendTimePair> RtcpSendTimeQueue; -struct RtpReceiverStatistics { - RtpReceiverStatistics(); - uint8 fraction_lost; - uint32 cumulative_lost; // 24 bits valid. - uint32 extended_high_sequence_number; - uint32 jitter; -}; - // TODO(hclam): This should be renamed to RtcpSession. class Rtcp { public: @@ -70,16 +62,25 @@ class Rtcp { uint32 send_packet_count, size_t send_octet_count); - // |cast_message| and |rtcp_events| is optional; if |cast_message| is - // provided the RTCP receiver report will append a Cast message containing - // Acks and Nacks; |target_delay| is sent together with |cast_message|. - // If |rtcp_events| is provided the RTCP receiver report will append the - // log messages. + // This function is meant to be used in conjunction with + // SendRtcpFromRtpReceiver. + // |now| is converted to NTP and saved internally for + // future round-trip/lip-sync calculations. + // This is done in a separate method so that SendRtcpFromRtpReceiver can + // be done on a separate (temporary) RTCP object. + RtcpTimeData ConvertToNTPAndSave(base::TimeTicks now); + + // |cast_message|, |rtcp_events| and |rtp_receiver_statistics| are optional; + // if |cast_message| is provided the RTCP receiver report will append a Cast + // message containing Acks and Nacks; |target_delay| is sent together with + // |cast_message|. If |rtcp_events| is provided the RTCP receiver report will + // append the log messages. void SendRtcpFromRtpReceiver( + RtcpTimeData time_data, const RtcpCastMessage* cast_message, base::TimeDelta target_delay, - const ReceiverRtcpEventSubscriber::RtcpEventMultiMap* rtcp_events, - RtpReceiverStatistics* rtp_receiver_statistics); + const ReceiverRtcpEventSubscriber::RtcpEvents* rtcp_events, + const RtpReceiverStatistics* rtp_receiver_statistics) const; // Submit a received packet to this object. The packet will be parsed // and used to maintain a RTCP session. @@ -105,6 +106,9 @@ class Rtcp { static bool IsRtcpPacket(const uint8* packet, size_t length); static uint32 GetSsrcOfSender(const uint8* rtcp_buffer, size_t length); + uint32 GetLocalSsrc() const { return local_ssrc_; } + uint32 GetRemoteSsrc() const { return remote_ssrc_; } + protected: void OnReceivedNtp(uint32 ntp_seconds, uint32 ntp_fraction); void OnReceivedLipSyncInfo(uint32 rtp_timestamp, diff --git a/media/cast/net/rtcp/rtcp_builder.cc b/media/cast/net/rtcp/rtcp_builder.cc index b4e58c3..6f25a99 100644 --- a/media/cast/net/rtcp/rtcp_builder.cc +++ b/media/cast/net/rtcp/rtcp_builder.cc @@ -44,33 +44,6 @@ bool EventTimestampLessThan(const RtcpReceiverEventLogMessage& lhs, return lhs.event_timestamp < rhs.event_timestamp; } -void AddReceiverLogEntries( - const RtcpReceiverLogMessage& redundancy_receiver_log_message, - RtcpReceiverLogMessage* receiver_log_message, - size_t* remaining_space, - size_t* number_of_frames, - size_t* total_number_of_messages_to_send) { - RtcpReceiverLogMessage::const_iterator it = - redundancy_receiver_log_message.begin(); - while (it != redundancy_receiver_log_message.end() && - *remaining_space >= - kRtcpReceiverFrameLogSize + kRtcpReceiverEventLogSize) { - receiver_log_message->push_front(*it); - size_t num_event_logs = (*remaining_space - kRtcpReceiverFrameLogSize) / - kRtcpReceiverEventLogSize; - RtcpReceiverEventLogMessages& event_log_messages = - receiver_log_message->front().event_log_messages_; - if (num_event_logs < event_log_messages.size()) - event_log_messages.resize(num_event_logs); - - *remaining_space -= kRtcpReceiverFrameLogSize + - event_log_messages.size() * kRtcpReceiverEventLogSize; - ++number_of_frames; - *total_number_of_messages_to_send += event_log_messages.size(); - ++it; - } -} - // A class to build a string representing the NACK list in Cast message. // // The string will look like "23:3-6 25:1,5-6", meaning packets 3 to 6 in frame @@ -197,7 +170,7 @@ PacketRef RtcpBuilder::BuildRtcpFromReceiver( const RtcpReportBlock* report_block, const RtcpReceiverReferenceTimeReport* rrtr, const RtcpCastMessage* cast_message, - const ReceiverRtcpEventSubscriber::RtcpEventMultiMap* rtcp_events, + const ReceiverRtcpEventSubscriber::RtcpEvents* rtcp_events, base::TimeDelta target_delay) { Start(); @@ -366,7 +339,7 @@ void RtcpBuilder::AddDlrrRb(const RtcpDlrrReportBlock& dlrr) { } void RtcpBuilder::AddReceiverLog( - const ReceiverRtcpEventSubscriber::RtcpEventMultiMap& rtcp_events) { + const ReceiverRtcpEventSubscriber::RtcpEvents& rtcp_events) { size_t total_number_of_messages_to_send = 0; RtcpReceiverLogMessage receiver_log_message; @@ -441,12 +414,11 @@ void RtcpBuilder::AddReceiverLog( } bool RtcpBuilder::GetRtcpReceiverLogMessage( - const ReceiverRtcpEventSubscriber::RtcpEventMultiMap& rtcp_events, + const ReceiverRtcpEventSubscriber::RtcpEvents& rtcp_events, RtcpReceiverLogMessage* receiver_log_message, size_t* total_number_of_messages_to_send) { size_t number_of_frames = 0; - size_t remaining_space = - std::min<size_t>(kMaxReceiverLogBytes, writer_.remaining()); + size_t remaining_space = writer_.remaining(); if (remaining_space < kRtcpCastLogHeaderSize + kRtcpReceiverFrameLogSize + kRtcpReceiverEventLogSize) { return false; @@ -459,7 +431,7 @@ bool RtcpBuilder::GetRtcpReceiverLogMessage( // Account for the RTCP header for an application-defined packet. remaining_space -= kRtcpCastLogHeaderSize; - ReceiverRtcpEventSubscriber::RtcpEventMultiMap::const_reverse_iterator rit = + ReceiverRtcpEventSubscriber::RtcpEvents::const_reverse_iterator rit = rtcp_events.rbegin(); while (rit != rtcp_events.rend() && @@ -510,36 +482,6 @@ bool RtcpBuilder::GetRtcpReceiverLogMessage( receiver_log_message->push_front(frame_log); } - rtcp_events_history_.push_front(*receiver_log_message); - - // We don't try to match RTP timestamps of redundancy frame logs with those - // from the newest set (which would save the space of an extra RTP timestamp - // over the wire). Unless the redundancy frame logs are very recent, it's - // unlikely there will be a match anyway. - if (rtcp_events_history_.size() > kFirstRedundancyOffset) { - // Add first redundnacy messages, if enough space remaining - AddReceiverLogEntries(rtcp_events_history_[kFirstRedundancyOffset], - receiver_log_message, - &remaining_space, - &number_of_frames, - total_number_of_messages_to_send); - } - - if (rtcp_events_history_.size() > kSecondRedundancyOffset) { - // Add second redundancy messages, if enough space remaining - AddReceiverLogEntries(rtcp_events_history_[kSecondRedundancyOffset], - receiver_log_message, - &remaining_space, - &number_of_frames, - total_number_of_messages_to_send); - } - - if (rtcp_events_history_.size() > kReceiveLogMessageHistorySize) { - rtcp_events_history_.pop_back(); - } - - DCHECK_LE(rtcp_events_history_.size(), kReceiveLogMessageHistorySize); - VLOG(3) << "number of frames: " << number_of_frames; VLOG(3) << "total messages to send: " << *total_number_of_messages_to_send; return number_of_frames > 0; diff --git a/media/cast/net/rtcp/rtcp_builder.h b/media/cast/net/rtcp/rtcp_builder.h index b530648..2f22c39 100644 --- a/media/cast/net/rtcp/rtcp_builder.h +++ b/media/cast/net/rtcp/rtcp_builder.h @@ -19,27 +19,6 @@ namespace media { namespace cast { -// We limit the size of receiver logs to avoid queuing up packets. -const size_t kMaxReceiverLogBytes = 200; - -// The determines how long to hold receiver log events, based on how -// many "receiver log message reports" ago the events were sent. -const size_t kReceiveLogMessageHistorySize = 20; - -// This determines when to send events the second time. -const size_t kFirstRedundancyOffset = 10; -COMPILE_ASSERT(kFirstRedundancyOffset > 0 && - kFirstRedundancyOffset <= kReceiveLogMessageHistorySize, - redundancy_offset_out_of_range); - -// When to send events the third time. -const size_t kSecondRedundancyOffset = 20; -COMPILE_ASSERT(kSecondRedundancyOffset > - kFirstRedundancyOffset && kSecondRedundancyOffset <= - kReceiveLogMessageHistorySize, - redundancy_offset_out_of_range); - - class RtcpBuilder { public: explicit RtcpBuilder(uint32 sending_ssrc); @@ -49,7 +28,7 @@ class RtcpBuilder { const RtcpReportBlock* report_block, const RtcpReceiverReferenceTimeReport* rrtr, const RtcpCastMessage* cast_message, - const ReceiverRtcpEventSubscriber::RtcpEventMultiMap* rtcp_events, + const ReceiverRtcpEventSubscriber::RtcpEvents* rtcp_events, base::TimeDelta target_delay); PacketRef BuildRtcpFromSender(const RtcpSenderInfo& sender_info); @@ -65,10 +44,10 @@ class RtcpBuilder { void AddSR(const RtcpSenderInfo& sender_info); void AddDlrrRb(const RtcpDlrrReportBlock& dlrr); void AddReceiverLog( - const ReceiverRtcpEventSubscriber::RtcpEventMultiMap& rtcp_events); + const ReceiverRtcpEventSubscriber::RtcpEvents& rtcp_events); bool GetRtcpReceiverLogMessage( - const ReceiverRtcpEventSubscriber::RtcpEventMultiMap& rtcp_events, + const ReceiverRtcpEventSubscriber::RtcpEvents& rtcp_events, RtcpReceiverLogMessage* receiver_log_message, size_t* total_number_of_messages_to_send); @@ -79,7 +58,6 @@ class RtcpBuilder { const uint32 ssrc_; char* ptr_of_length_; PacketRef packet_; - std::deque<RtcpReceiverLogMessage> rtcp_events_history_; DISALLOW_COPY_AND_ASSIGN(RtcpBuilder); }; diff --git a/media/cast/net/rtcp/rtcp_builder_unittest.cc b/media/cast/net/rtcp/rtcp_builder_unittest.cc index e68444a..6bd9d05 100644 --- a/media/cast/net/rtcp/rtcp_builder_unittest.cc +++ b/media/cast/net/rtcp/rtcp_builder_unittest.cc @@ -47,12 +47,16 @@ class RtcpBuilderTest : public ::testing::Test { void ExpectPacketEQ(scoped_ptr<Packet> golden_packet, PacketRef packet) { + int diffs = 0; EXPECT_EQ(golden_packet->size(), packet->data.size()); if (golden_packet->size() == packet->data.size()) { for (size_t x = 0; x < golden_packet->size(); x++) { - EXPECT_EQ((*golden_packet)[x], packet->data[x]); - if ((*golden_packet)[x] != packet->data[x]) - break; + EXPECT_EQ((*golden_packet)[x], packet->data[x]) << + "x = " << x << " / " << golden_packet->size(); + if ((*golden_packet)[x] != packet->data[x]) { + if (++diffs > 5) + break; + } } } } @@ -191,7 +195,7 @@ TEST_F(RtcpBuilderTest, RtcpReceiverReportWithRrtrCastMessageAndLog) { missing_packets; ReceiverRtcpEventSubscriber event_subscriber(500, VIDEO_EVENT); - ReceiverRtcpEventSubscriber::RtcpEventMultiMap rtcp_events; + ReceiverRtcpEventSubscriber::RtcpEvents rtcp_events; ExpectPacketEQ(p.GetPacket().Pass(), rtcp_builder_->BuildRtcpFromReceiver( @@ -224,7 +228,7 @@ TEST_F(RtcpBuilderTest, RtcpReceiverReportWithRrtrCastMessageAndLog) { packet_event.timestamp = testing_clock.NowTicks(); packet_event.packet_id = kLostPacketId1; event_subscriber.OnReceivePacketEvent(packet_event); - event_subscriber.GetRtcpEventsAndReset(&rtcp_events); + event_subscriber.GetRtcpEventsWithRedundancy(&rtcp_events); EXPECT_EQ(2u, rtcp_events.size()); ExpectPacketEQ( @@ -252,31 +256,22 @@ TEST_F(RtcpBuilderTest, RtcpReceiverReportWithOversizedFrameLog) { p.AddReceiverLog(kSendingSsrc); - int remaining_bytes = kMaxReceiverLogBytes; - remaining_bytes -= kRtcpCastLogHeaderSize; + int num_events = kMaxEventsPerRTCP; - remaining_bytes -= kRtcpReceiverFrameLogSize; - int num_events = remaining_bytes / kRtcpReceiverEventLogSize; EXPECT_LE(num_events, static_cast<int>(kRtcpMaxReceiverLogMessages)); - // Only the last |num_events| events are sent due to receiver log size cap. p.AddReceiverFrameLog( kRtpTimestamp + 2345, num_events, - kTimeBaseMs + (kRtcpMaxReceiverLogMessages - num_events) * kTimeDelayMs); + kTimeBaseMs); for (int i = 0; i < num_events; i++) { p.AddReceiverEventLog( - kLostPacketId1, PACKET_RECEIVED, + kLostPacketId1, + PACKET_RECEIVED, static_cast<uint16>(kTimeDelayMs * i)); } ReceiverRtcpEventSubscriber event_subscriber(500, VIDEO_EVENT); - FrameEvent frame_event; - frame_event.rtp_timestamp = kRtpTimestamp; - frame_event.type = FRAME_ACK_SENT; - frame_event.media_type = VIDEO_EVENT; - frame_event.timestamp = testing_clock.NowTicks(); - event_subscriber.OnReceiveFrameEvent(frame_event); for (size_t i = 0; i < kRtcpMaxReceiverLogMessages; ++i) { PacketEvent packet_event; @@ -289,8 +284,8 @@ TEST_F(RtcpBuilderTest, RtcpReceiverReportWithOversizedFrameLog) { testing_clock.Advance(base::TimeDelta::FromMilliseconds(kTimeDelayMs)); } - ReceiverRtcpEventSubscriber::RtcpEventMultiMap rtcp_events; - event_subscriber.GetRtcpEventsAndReset(&rtcp_events); + ReceiverRtcpEventSubscriber::RtcpEvents rtcp_events; + event_subscriber.GetRtcpEventsWithRedundancy(&rtcp_events); ExpectPacketEQ(p.GetPacket().Pass(), rtcp_builder_->BuildRtcpFromReceiver( @@ -316,16 +311,9 @@ TEST_F(RtcpBuilderTest, RtcpReceiverReportWithTooManyLogFrames) { p.AddReceiverLog(kSendingSsrc); - int remaining_bytes = kMaxReceiverLogBytes; - remaining_bytes -= kRtcpCastLogHeaderSize; - - int num_events = - remaining_bytes / (kRtcpReceiverFrameLogSize + kRtcpReceiverEventLogSize); + int num_events = kMaxEventsPerRTCP; - // The last |num_events| events are sent due to receiver log size cap. - for (size_t i = kRtcpMaxReceiverLogMessages - num_events; - i < kRtcpMaxReceiverLogMessages; - ++i) { + for (int i = 0; i < num_events; i++) { p.AddReceiverFrameLog(kRtpTimestamp + i, 1, kTimeBaseMs + i * kTimeDelayMs); p.AddReceiverEventLog(0, FRAME_ACK_SENT, 0); } @@ -342,8 +330,8 @@ TEST_F(RtcpBuilderTest, RtcpReceiverReportWithTooManyLogFrames) { testing_clock.Advance(base::TimeDelta::FromMilliseconds(kTimeDelayMs)); } - ReceiverRtcpEventSubscriber::RtcpEventMultiMap rtcp_events; - event_subscriber.GetRtcpEventsAndReset(&rtcp_events); + ReceiverRtcpEventSubscriber::RtcpEvents rtcp_events; + event_subscriber.GetRtcpEventsWithRedundancy(&rtcp_events); ExpectPacketEQ(p.GetPacket().Pass(), rtcp_builder_->BuildRtcpFromReceiver( @@ -389,8 +377,8 @@ TEST_F(RtcpBuilderTest, RtcpReceiverReportWithOldLogFrames) { base::TimeDelta::FromMilliseconds(kTimeBetweenEventsMs)); } - ReceiverRtcpEventSubscriber::RtcpEventMultiMap rtcp_events; - event_subscriber.GetRtcpEventsAndReset(&rtcp_events); + ReceiverRtcpEventSubscriber::RtcpEvents rtcp_events; + event_subscriber.GetRtcpEventsWithRedundancy(&rtcp_events); ExpectPacketEQ(p.GetPacket().Pass(), rtcp_builder_->BuildRtcpFromReceiver( @@ -411,7 +399,7 @@ TEST_F(RtcpBuilderTest, RtcpReceiverReportRedundancy) { testing_clock.Advance(base::TimeDelta::FromMilliseconds(time_base_ms)); ReceiverRtcpEventSubscriber event_subscriber(500, VIDEO_EVENT); - size_t packet_count = kReceiveLogMessageHistorySize + 10; + size_t packet_count = kNumResends * kResendDelay + 10; for (size_t i = 0; i < packet_count; i++) { TestRtcpPacketBuilder p; p.AddRr(kSendingSsrc, 1); @@ -419,22 +407,15 @@ TEST_F(RtcpBuilderTest, RtcpReceiverReportRedundancy) { p.AddReceiverLog(kSendingSsrc); - if (i >= kSecondRedundancyOffset) { - p.AddReceiverFrameLog( - kRtpTimestamp, - 1, - time_base_ms - kSecondRedundancyOffset * kTimeBetweenEventsMs); - p.AddReceiverEventLog(0, FRAME_ACK_SENT, 0); + int num_events = (i + kResendDelay) / kResendDelay; + num_events = std::min<int>(num_events, kNumResends); + p.AddReceiverFrameLog(kRtpTimestamp, num_events, + time_base_ms - (num_events - 1) * kResendDelay * + kTimeBetweenEventsMs); + for (int i = 0; i < num_events; i++) { + p.AddReceiverEventLog(0, FRAME_ACK_SENT, + i * kResendDelay * kTimeBetweenEventsMs); } - if (i >= kFirstRedundancyOffset) { - p.AddReceiverFrameLog( - kRtpTimestamp, - 1, - time_base_ms - kFirstRedundancyOffset * kTimeBetweenEventsMs); - p.AddReceiverEventLog(0, FRAME_ACK_SENT, 0); - } - p.AddReceiverFrameLog(kRtpTimestamp, 1, time_base_ms); - p.AddReceiverEventLog(0, FRAME_ACK_SENT, 0); FrameEvent frame_event; frame_event.rtp_timestamp = kRtpTimestamp; @@ -443,8 +424,8 @@ TEST_F(RtcpBuilderTest, RtcpReceiverReportRedundancy) { frame_event.timestamp = testing_clock.NowTicks(); event_subscriber.OnReceiveFrameEvent(frame_event); - ReceiverRtcpEventSubscriber::RtcpEventMultiMap rtcp_events; - event_subscriber.GetRtcpEventsAndReset(&rtcp_events); + ReceiverRtcpEventSubscriber::RtcpEvents rtcp_events; + event_subscriber.GetRtcpEventsWithRedundancy(&rtcp_events); ExpectPacketEQ(p.GetPacket().Pass(), rtcp_builder_->BuildRtcpFromReceiver( diff --git a/media/cast/net/rtcp/rtcp_defines.cc b/media/cast/net/rtcp/rtcp_defines.cc index a296dc8..f0ceef3 100644 --- a/media/cast/net/rtcp/rtcp_defines.cc +++ b/media/cast/net/rtcp/rtcp_defines.cc @@ -33,5 +33,17 @@ RtcpReceiverReferenceTimeReport::~RtcpReceiverReferenceTimeReport() {} RtcpEvent::RtcpEvent() : type(UNKNOWN), packet_id(0u) {} RtcpEvent::~RtcpEvent() {} +RtpReceiverStatistics::RtpReceiverStatistics() : + fraction_lost(0), + cumulative_lost(0), + extended_high_sequence_number(0), + jitter(0) {} + +SendRtcpFromRtpReceiver_Params::SendRtcpFromRtpReceiver_Params() + : ssrc(0), + sender_ssrc(0) {} + +SendRtcpFromRtpReceiver_Params::~SendRtcpFromRtpReceiver_Params() {} + } // namespace cast } // namespace media diff --git a/media/cast/net/rtcp/rtcp_defines.h b/media/cast/net/rtcp/rtcp_defines.h index 3dd23ef..b35e14b 100644 --- a/media/cast/net/rtcp/rtcp_defines.h +++ b/media/cast/net/rtcp/rtcp_defines.h @@ -109,6 +109,37 @@ typedef base::Callback<void(base::TimeDelta)> RtcpRttCallback; typedef base::Callback<void(const RtcpReceiverLogMessage&)> RtcpLogMessageCallback; +// TODO(hubbe): Document members of this struct. +struct RtpReceiverStatistics { + RtpReceiverStatistics(); + uint8 fraction_lost; + uint32 cumulative_lost; // 24 bits valid. + uint32 extended_high_sequence_number; + uint32 jitter; +}; + +// These are intended to only be created using Rtcp::ConvertToNTPAndSave. +struct RtcpTimeData { + uint32 ntp_seconds; + uint32 ntp_fraction; + base::TimeTicks timestamp; +}; + +// This struct is used to encapsulate all the parameters of the +// SendRtcpFromRtpReceiver for IPC transportation. +struct SendRtcpFromRtpReceiver_Params { + SendRtcpFromRtpReceiver_Params(); + ~SendRtcpFromRtpReceiver_Params(); + uint32 ssrc; + uint32 sender_ssrc; + RtcpTimeData time_data; + scoped_ptr<RtcpCastMessage> cast_message; + base::TimeDelta target_delay; + scoped_ptr<std::vector<std::pair<RtpTimestamp, RtcpEvent> > > rtcp_events; + scoped_ptr<RtpReceiverStatistics> rtp_receiver_statistics; +}; + + } // namespace cast } // namespace media diff --git a/media/cast/net/rtcp/rtcp_unittest.cc b/media/cast/net/rtcp/rtcp_unittest.cc index e0c9f0a..b0fc1db 100644 --- a/media/cast/net/rtcp/rtcp_unittest.cc +++ b/media/cast/net/rtcp/rtcp_unittest.cc @@ -180,6 +180,7 @@ TEST_F(RtcpTest, RoundTripTimesDeterminedFromReportPingPong) { // Receiver --> Sender RtpReceiverStatistics stats; rtcp_for_receiver_.SendRtcpFromRtpReceiver( + rtcp_for_receiver_.ConvertToNTPAndSave(receiver_clock_->NowTicks()), NULL, base::TimeDelta(), NULL, &stats); expected_rtt_according_to_sender = one_way_trip_time * 2; EXPECT_EQ(expected_rtt_according_to_sender, diff --git a/media/cast/net/rtp/rtp_parser.cc b/media/cast/net/rtp/rtp_parser.cc index 1d68cd1..bb6c2c0 100644 --- a/media/cast/net/rtp/rtp_parser.cc +++ b/media/cast/net/rtp/rtp_parser.cc @@ -12,6 +12,15 @@ namespace media { namespace cast { +// static +bool RtpParser::ParseSsrc(const uint8* packet, + size_t length, + uint32* ssrc) { + base::BigEndianReader big_endian_reader( + reinterpret_cast<const char*>(packet), length); + return big_endian_reader.Skip(8) && big_endian_reader.ReadU32(ssrc); +} + RtpParser::RtpParser(uint32 expected_sender_ssrc, uint8 expected_payload_type) : expected_sender_ssrc_(expected_sender_ssrc), expected_payload_type_(expected_payload_type) {} diff --git a/media/cast/net/rtp/rtp_parser.h b/media/cast/net/rtp/rtp_parser.h index 64586d2..821507c 100644 --- a/media/cast/net/rtp/rtp_parser.h +++ b/media/cast/net/rtp/rtp_parser.h @@ -33,6 +33,8 @@ class RtpParser { const uint8** payload_data, size_t* payload_size); + static bool ParseSsrc(const uint8* packet, size_t length, uint32* ssrc); + private: const uint32 expected_sender_ssrc_; const uint8 expected_payload_type_; diff --git a/media/cast/net/udp_transport.cc b/media/cast/net/udp_transport.cc index 2b6495d..dce16ab 100644 --- a/media/cast/net/udp_transport.cc +++ b/media/cast/net/udp_transport.cc @@ -65,7 +65,7 @@ UdpTransport::UdpTransport( UdpTransport::~UdpTransport() {} void UdpTransport::StartReceiving( - const PacketReceiverCallback& packet_receiver) { + const PacketReceiverCallbackWithStatus& packet_receiver) { DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); packet_receiver_ = packet_receiver; @@ -99,6 +99,12 @@ void UdpTransport::StartReceiving( ScheduleReceiveNextPacket(); } +void UdpTransport::StopReceiving() { + DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); + packet_receiver_ = PacketReceiverCallbackWithStatus(); +} + + void UdpTransport::SetDscp(net::DiffServCodePoint dscp) { DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); next_dscp_value_ = dscp; @@ -118,6 +124,9 @@ void UdpTransport::ScheduleReceiveNextPacket() { void UdpTransport::ReceiveNextPacket(int length_or_status) { DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread()); + if (packet_receiver_.is_null()) + return; + // Loop while UdpSocket is delivering data synchronously. When it responds // with a "pending" status, break and expect this method to be called back in // the future when a packet is ready. @@ -155,15 +164,18 @@ void UdpTransport::ReceiveNextPacket(int length_or_status) { remote_addr_ = recv_addr_; VLOG(1) << "Setting remote address from first received packet: " << remote_addr_.ToString(); + next_packet_->resize(length_or_status); + if (!packet_receiver_.Run(next_packet_.Pass())) { + VLOG(1) << "Packet was not valid, resetting remote address."; + remote_addr_ = net::IPEndPoint(); + } } else if (!IsEqual(remote_addr_, recv_addr_)) { VLOG(1) << "Ignoring packet received from an unrecognized address: " << recv_addr_.ToString() << "."; - length_or_status = net::ERR_IO_PENDING; - continue; + } else { + next_packet_->resize(length_or_status); + packet_receiver_.Run(next_packet_.Pass()); } - - next_packet_->resize(length_or_status); - packet_receiver_.Run(next_packet_.Pass()); length_or_status = net::ERR_IO_PENDING; } } diff --git a/media/cast/net/udp_transport.h b/media/cast/net/udp_transport.h index 0ee6346..4184e17a 100644 --- a/media/cast/net/udp_transport.h +++ b/media/cast/net/udp_transport.h @@ -46,7 +46,8 @@ class UdpTransport : public PacketSender { ~UdpTransport() override; // Start receiving packets. Packets are submitted to |packet_receiver|. - void StartReceiving(const PacketReceiverCallback& packet_receiver); + void StartReceiving(const PacketReceiverCallbackWithStatus& packet_receiver); + void StopReceiving(); // Set a new DSCP value to the socket. The value will be set right before // the next send. @@ -82,7 +83,7 @@ class UdpTransport : public PacketSender { scoped_ptr<Packet> next_packet_; scoped_refptr<net::WrappedIOBuffer> recv_buf_; net::IPEndPoint recv_addr_; - PacketReceiverCallback packet_receiver_; + PacketReceiverCallbackWithStatus packet_receiver_; int32 send_buffer_size_; const CastTransportStatusCallback status_callback_; int bytes_sent_; diff --git a/media/cast/net/udp_transport_unittest.cc b/media/cast/net/udp_transport_unittest.cc index 2bc9bab..00a219f 100644 --- a/media/cast/net/udp_transport_unittest.cc +++ b/media/cast/net/udp_transport_unittest.cc @@ -25,14 +25,15 @@ class MockPacketReceiver { MockPacketReceiver(const base::Closure& callback) : packet_callback_(callback) {} - void ReceivedPacket(scoped_ptr<Packet> packet) { + bool ReceivedPacket(scoped_ptr<Packet> packet) { packet_ = std::string(packet->size(), '\0'); std::copy(packet->begin(), packet->end(), packet_.begin()); packet_callback_.Run(); + return true; } std::string packet() const { return packet_; } - PacketReceiverCallback packet_receiver() { + PacketReceiverCallbackWithStatus packet_receiver() { return base::Bind(&MockPacketReceiver::ReceivedPacket, base::Unretained(this)); } diff --git a/media/cast/receiver/cast_receiver_impl.cc b/media/cast/receiver/cast_receiver_impl.cc index 8265211..32b2c8c 100644 --- a/media/cast/receiver/cast_receiver_impl.cc +++ b/media/cast/receiver/cast_receiver_impl.cc @@ -20,25 +20,19 @@ scoped_ptr<CastReceiver> CastReceiver::Create( scoped_refptr<CastEnvironment> cast_environment, const FrameReceiverConfig& audio_config, const FrameReceiverConfig& video_config, - PacketSender* const packet_sender) { + CastTransportSender* const transport) { return scoped_ptr<CastReceiver>(new CastReceiverImpl( - cast_environment, audio_config, video_config, packet_sender)); + cast_environment, audio_config, video_config, transport)); } CastReceiverImpl::CastReceiverImpl( scoped_refptr<CastEnvironment> cast_environment, const FrameReceiverConfig& audio_config, const FrameReceiverConfig& video_config, - PacketSender* const packet_sender) + CastTransportSender* const transport) : cast_environment_(cast_environment), - pacer_(kTargetBurstSize, - kMaxBurstSize, - cast_environment->Clock(), - cast_environment->Logging(), - packet_sender, - cast_environment->GetTaskRunner(CastEnvironment::MAIN)), - audio_receiver_(cast_environment, audio_config, AUDIO_EVENT, &pacer_), - video_receiver_(cast_environment, video_config, VIDEO_EVENT, &pacer_), + audio_receiver_(cast_environment, audio_config, AUDIO_EVENT, transport), + video_receiver_(cast_environment, video_config, VIDEO_EVENT, transport), ssrc_of_audio_sender_(audio_config.incoming_ssrc), ssrc_of_video_sender_(video_config.incoming_ssrc), num_audio_channels_(audio_config.channels), @@ -48,14 +42,14 @@ CastReceiverImpl::CastReceiverImpl( CastReceiverImpl::~CastReceiverImpl() {} -void CastReceiverImpl::DispatchReceivedPacket(scoped_ptr<Packet> packet) { +void CastReceiverImpl::ReceivePacket(scoped_ptr<Packet> packet) { const uint8_t* const data = &packet->front(); const size_t length = packet->size(); uint32 ssrc_of_sender; if (Rtcp::IsRtcpPacket(data, length)) { ssrc_of_sender = Rtcp::GetSsrcOfSender(data, length); - } else if (!FrameReceiver::ParseSenderSsrc(data, length, &ssrc_of_sender)) { + } else if (!RtpParser::ParseSsrc(data, length, &ssrc_of_sender)) { VLOG(1) << "Invalid RTP packet."; return; } @@ -78,14 +72,6 @@ void CastReceiverImpl::DispatchReceivedPacket(scoped_ptr<Packet> packet) { base::Passed(&packet))); } -PacketReceiverCallback CastReceiverImpl::packet_receiver() { - return base::Bind(&CastReceiverImpl::DispatchReceivedPacket, - // TODO(miu): This code structure is dangerous, since the - // callback could be stored and then invoked after - // destruction of |this|. - base::Unretained(this)); -} - void CastReceiverImpl::RequestDecodedAudioFrame( const AudioFrameDecodedCallback& callback) { DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); diff --git a/media/cast/receiver/cast_receiver_impl.h b/media/cast/receiver/cast_receiver_impl.h index 5431393..bc1bf67 100644 --- a/media/cast/receiver/cast_receiver_impl.h +++ b/media/cast/receiver/cast_receiver_impl.h @@ -27,12 +27,12 @@ class CastReceiverImpl : public CastReceiver { CastReceiverImpl(scoped_refptr<CastEnvironment> cast_environment, const FrameReceiverConfig& audio_config, const FrameReceiverConfig& video_config, - PacketSender* const packet_sender); + CastTransportSender* const transport); ~CastReceiverImpl() override; // CastReceiver implementation. - PacketReceiverCallback packet_receiver() override; + void ReceivePacket(scoped_ptr<Packet> packet) override; void RequestDecodedAudioFrame( const AudioFrameDecodedCallback& callback) override; void RequestEncodedAudioFrame( @@ -43,10 +43,6 @@ class CastReceiverImpl : public CastReceiver { const ReceiveEncodedFrameCallback& callback) override; private: - // Forwards |packet| to a specific RTP frame receiver, or drops it if SSRC - // does not map to one of the receivers. - void DispatchReceivedPacket(scoped_ptr<Packet> packet); - // Feeds an EncodedFrame into |audio_decoder_|. RequestDecodedAudioFrame() // uses this as a callback for RequestEncodedAudioFrame(). void DecodeEncodedAudioFrame( @@ -88,7 +84,6 @@ class CastReceiverImpl : public CastReceiver { bool is_continuous); const scoped_refptr<CastEnvironment> cast_environment_; - PacedSender pacer_; FrameReceiver audio_receiver_; FrameReceiver video_receiver_; diff --git a/media/cast/receiver/frame_receiver.cc b/media/cast/receiver/frame_receiver.cc index 561e19f..c1155d0 100644 --- a/media/cast/receiver/frame_receiver.cc +++ b/media/cast/receiver/frame_receiver.cc @@ -23,8 +23,9 @@ FrameReceiver::FrameReceiver( const scoped_refptr<CastEnvironment>& cast_environment, const FrameReceiverConfig& config, EventMediaType event_media_type, - PacedPacketSender* const packet_sender) + CastTransportSender* const transport) : cast_environment_(cast_environment), + transport_(transport), packet_parser_(config.incoming_ssrc, config.rtp_payload_type), stats_(cast_environment->Clock()), event_media_type_(event_media_type), @@ -44,13 +45,14 @@ FrameReceiver::FrameReceiver( RtcpRttCallback(), RtcpLogMessageCallback(), cast_environment_->Clock(), - packet_sender, + NULL, config.feedback_ssrc, config.incoming_ssrc), is_waiting_for_consecutive_frame_(false), lip_sync_drift_(ClockDriftSmoother::GetDefaultTimeConstant()), rtcp_interval_(base::TimeDelta::FromMilliseconds(config.rtcp_interval)), weak_factory_(this) { + transport_->AddValidSsrc(config.incoming_ssrc); DCHECK_GT(config.rtp_max_delay_ms, 0); DCHECK_GT(config.max_frame_rate, 0); decryptor_.Initialize(config.aes_key, config.aes_iv_mask); @@ -100,15 +102,6 @@ bool FrameReceiver::ProcessPacket(scoped_ptr<Packet> packet) { return true; } -// static -bool FrameReceiver::ParseSenderSsrc(const uint8* packet, - size_t length, - uint32* ssrc) { - base::BigEndianReader big_endian_reader( - reinterpret_cast<const char*>(packet), length); - return big_endian_reader.Skip(8) && big_endian_reader.ReadU32(ssrc); -} - void FrameReceiver::ProcessParsedPacket(const RtpCastHeader& rtp_header, const uint8* payload_data, size_t payload_size) { @@ -177,10 +170,15 @@ void FrameReceiver::CastFeedback(const RtcpCastMessage& cast_message) { now, FRAME_ACK_SENT, event_media_type_, rtp_timestamp, cast_message.ack_frame_id); - ReceiverRtcpEventSubscriber::RtcpEventMultiMap rtcp_events; - event_subscriber_.GetRtcpEventsAndReset(&rtcp_events); - rtcp_.SendRtcpFromRtpReceiver(&cast_message, target_playout_delay_, - &rtcp_events, NULL); + ReceiverRtcpEventSubscriber::RtcpEvents rtcp_events; + event_subscriber_.GetRtcpEventsWithRedundancy(&rtcp_events); + transport_->SendRtcpFromRtpReceiver(rtcp_.GetLocalSsrc(), + rtcp_.GetRemoteSsrc(), + rtcp_.ConvertToNTPAndSave(now), + &cast_message, + target_playout_delay_, + &rtcp_events, + NULL); } void FrameReceiver::EmitAvailableEncodedFrames() { @@ -336,8 +334,15 @@ void FrameReceiver::ScheduleNextRtcpReport() { void FrameReceiver::SendNextRtcpReport() { DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); + const base::TimeTicks now = cast_environment_->Clock()->NowTicks(); RtpReceiverStatistics stats = stats_.GetStatistics(); - rtcp_.SendRtcpFromRtpReceiver(NULL, base::TimeDelta(), NULL, &stats); + transport_->SendRtcpFromRtpReceiver(rtcp_.GetLocalSsrc(), + rtcp_.GetRemoteSsrc(), + rtcp_.ConvertToNTPAndSave(now), + NULL, + base::TimeDelta(), + NULL, + &stats); ScheduleNextRtcpReport(); } diff --git a/media/cast/receiver/frame_receiver.h b/media/cast/receiver/frame_receiver.h index 4d673f3..382b798 100644 --- a/media/cast/receiver/frame_receiver.h +++ b/media/cast/receiver/frame_receiver.h @@ -50,7 +50,7 @@ class FrameReceiver : public RtpPayloadFeedback, FrameReceiver(const scoped_refptr<CastEnvironment>& cast_environment, const FrameReceiverConfig& config, EventMediaType event_media_type, - PacedPacketSender* const packet_sender); + CastTransportSender* const transport); ~FrameReceiver() override; @@ -64,10 +64,6 @@ class FrameReceiver : public RtpPayloadFeedback, // out-of-order. Returns true if the parsing of the packet succeeded. bool ProcessPacket(scoped_ptr<Packet> packet); - // TODO(miu): This is the wrong place for this, but the (de)serialization - // implementation needs to be consolidated first. - static bool ParseSenderSsrc(const uint8* packet, size_t length, uint32* ssrc); - protected: friend class FrameReceiverTest; // Invokes ProcessParsedPacket(). @@ -115,6 +111,9 @@ class FrameReceiver : public RtpPayloadFeedback, const scoped_refptr<CastEnvironment> cast_environment_; + // Transport used to send data back. + CastTransportSender* const transport_; + // Deserializes a packet into a RtpHeader + payload bytes. RtpParser packet_parser_; diff --git a/media/cast/receiver/frame_receiver_unittest.cc b/media/cast/receiver/frame_receiver_unittest.cc index 24e1a0b..42bc4e3 100644 --- a/media/cast/receiver/frame_receiver_unittest.cc +++ b/media/cast/receiver/frame_receiver_unittest.cc @@ -12,7 +12,8 @@ #include "media/cast/cast_defines.h" #include "media/cast/cast_environment.h" #include "media/cast/logging/simple_event_subscriber.h" -#include "media/cast/net/pacing/mock_paced_packet_sender.h" +#include "media/cast/net/cast_transport_sender_impl.h" +#include "media/cast/net/mock_cast_transport_sender.h" #include "media/cast/net/rtcp/test_rtcp_packet_builder.h" #include "media/cast/receiver/frame_receiver.h" #include "media/cast/test/fake_single_thread_task_runner.h" @@ -136,7 +137,7 @@ class FrameReceiverTest : public ::testing::Test { RtpCastHeader rtp_header_; base::SimpleTestTickClock* testing_clock_; // Owned by CastEnvironment. base::TimeTicks start_time_; - MockPacedPacketSender mock_transport_; + MockCastTransportSender mock_transport_; scoped_refptr<test::FakeSingleThreadTaskRunner> task_runner_; scoped_refptr<CastEnvironment> cast_environment_; FakeFrameClient frame_client_; @@ -171,8 +172,8 @@ TEST_F(FrameReceiverTest, ReceivesOneFrame) { SimpleEventSubscriber event_subscriber; cast_environment_->Logging()->AddRawEventSubscriber(&event_subscriber); - EXPECT_CALL(mock_transport_, SendRtcpPacket(_, _)) - .WillRepeatedly(testing::Return(true)); + EXPECT_CALL(mock_transport_, SendRtcpFromRtpReceiver(_, _, _, _, _, _, _)) + .WillRepeatedly(testing::Return()); FeedLipSyncInfoIntoReceiver(); task_runner_->RunTasks(); @@ -212,8 +213,8 @@ TEST_F(FrameReceiverTest, ReceivesFramesSkippingWhenAppropriate) { SimpleEventSubscriber event_subscriber; cast_environment_->Logging()->AddRawEventSubscriber(&event_subscriber); - EXPECT_CALL(mock_transport_, SendRtcpPacket(_, _)) - .WillRepeatedly(testing::Return(true)); + EXPECT_CALL(mock_transport_, SendRtcpFromRtpReceiver(_, _, _, _, _, _, _)) + .WillRepeatedly(testing::Return()); const uint32 rtp_advance_per_frame = config_.frequency / config_.max_frame_rate; @@ -315,8 +316,8 @@ TEST_F(FrameReceiverTest, ReceivesFramesRefusingToSkipAny) { SimpleEventSubscriber event_subscriber; cast_environment_->Logging()->AddRawEventSubscriber(&event_subscriber); - EXPECT_CALL(mock_transport_, SendRtcpPacket(_, _)) - .WillRepeatedly(testing::Return(true)); + EXPECT_CALL(mock_transport_, SendRtcpFromRtpReceiver(_, _, _, _, _, _, _)) + .WillRepeatedly(testing::Return()); const uint32 rtp_advance_per_frame = config_.frequency / config_.max_frame_rate; diff --git a/media/cast/sender/audio_sender_unittest.cc b/media/cast/sender/audio_sender_unittest.cc index 0045923..7bbfd1a 100644 --- a/media/cast/sender/audio_sender_unittest.cc +++ b/media/cast/sender/audio_sender_unittest.cc @@ -77,12 +77,14 @@ class AudioSenderTest : public ::testing::Test { transport_sender_.reset(new CastTransportSenderImpl( NULL, testing_clock_, + net::IPEndPoint(), dummy_endpoint, make_scoped_ptr(new base::DictionaryValue), base::Bind(&UpdateCastTransportStatus), BulkRawEventsCallback(), base::TimeDelta(), task_runner_, + PacketReceiverCallback(), &transport_)); audio_sender_.reset(new AudioSender( cast_environment_, audio_config_, transport_sender_.get())); diff --git a/media/cast/sender/video_sender_unittest.cc b/media/cast/sender/video_sender_unittest.cc index 917e4a9..ab9ac4c 100644 --- a/media/cast/sender/video_sender_unittest.cc +++ b/media/cast/sender/video_sender_unittest.cc @@ -148,11 +148,13 @@ class VideoSenderTest : public ::testing::Test { NULL, testing_clock_, dummy_endpoint, + dummy_endpoint, make_scoped_ptr(new base::DictionaryValue), base::Bind(&UpdateCastTransportStatus), BulkRawEventsCallback(), base::TimeDelta(), task_runner_, + PacketReceiverCallback(), &transport_)); } diff --git a/media/cast/test/cast_benchmarks.cc b/media/cast/test/cast_benchmarks.cc index cb66871..c97bfc6 100644 --- a/media/cast/test/cast_benchmarks.cc +++ b/media/cast/test/cast_benchmarks.cc @@ -152,6 +152,27 @@ class CastTransportSenderWrapper : public CastTransportSender { return transport_->PacketReceiverForTesting(); } + void AddValidSsrc(uint32 ssrc) override { + return transport_->AddValidSsrc(ssrc); + } + + void SendRtcpFromRtpReceiver( + uint32 ssrc, + uint32 sender_ssrc, + const RtcpTimeData& time_data, + const RtcpCastMessage* cast_message, + base::TimeDelta target_delay, + const ReceiverRtcpEventSubscriber::RtcpEvents* rtcp_events, + const RtpReceiverStatistics* rtp_receiver_statistics) override { + return transport_->SendRtcpFromRtpReceiver(ssrc, + sender_ssrc, + time_data, + cast_message, + target_delay, + rtcp_events, + rtp_receiver_statistics); + } + private: scoped_ptr<CastTransportSender> transport_; uint32 audio_ssrc_, video_ssrc_; @@ -286,25 +307,42 @@ class RunOneBenchmark { } void Create(const MeasuringPoint& p) { - cast_receiver_ = CastReceiver::Create(cast_environment_receiver_, - audio_receiver_config_, - video_receiver_config_, - &receiver_to_sender_); net::IPEndPoint dummy_endpoint; transport_sender_.Init( new CastTransportSenderImpl( NULL, testing_clock_sender_, dummy_endpoint, + dummy_endpoint, make_scoped_ptr(new base::DictionaryValue), base::Bind(&UpdateCastTransportStatus), base::Bind(&IgnoreRawEvents), base::TimeDelta::FromSeconds(1), task_runner_sender_, + PacketReceiverCallback(), &sender_to_receiver_), &video_bytes_encoded_, &audio_bytes_encoded_); + transport_receiver_.reset( + new CastTransportSenderImpl( + NULL, + testing_clock_receiver_, + dummy_endpoint, + dummy_endpoint, + make_scoped_ptr(new base::DictionaryValue), + base::Bind(&UpdateCastTransportStatus), + base::Bind(&IgnoreRawEvents), + base::TimeDelta::FromSeconds(1), + task_runner_receiver_, + base::Bind(&RunOneBenchmark::ReceivePacket, base::Unretained(this)), + &receiver_to_sender_)); + + cast_receiver_ = CastReceiver::Create(cast_environment_receiver_, + audio_receiver_config_, + video_receiver_config_, + transport_receiver_.get()); + cast_sender_ = CastSender::Create(cast_environment_sender_, &transport_sender_); @@ -321,10 +359,15 @@ class RunOneBenchmark { transport_sender_.PacketReceiverForTesting(), task_runner_, &testing_clock_); sender_to_receiver_.Initialize( - CreateSimplePipe(p).Pass(), cast_receiver_->packet_receiver(), + CreateSimplePipe(p).Pass(), + transport_receiver_->PacketReceiverForTesting(), task_runner_, &testing_clock_); } + void ReceivePacket(scoped_ptr<Packet> packet) { + cast_receiver_->ReceivePacket(packet.Pass()); + } + virtual ~RunOneBenchmark() { cast_sender_.reset(); cast_receiver_.reset(); @@ -475,6 +518,7 @@ class RunOneBenchmark { LoopBackTransport receiver_to_sender_; LoopBackTransport sender_to_receiver_; CastTransportSenderWrapper transport_sender_; + scoped_ptr<CastTransportSender> transport_receiver_; uint64 video_bytes_encoded_; uint64 audio_bytes_encoded_; diff --git a/media/cast/test/end2end_unittest.cc b/media/cast/test/end2end_unittest.cc index 97dce08..ed7118d 100644 --- a/media/cast/test/end2end_unittest.cc +++ b/media/cast/test/end2end_unittest.cc @@ -582,24 +582,43 @@ class End2EndTest : public ::testing::Test { } } - void Create() { - cast_receiver_ = CastReceiver::Create(cast_environment_receiver_, - audio_receiver_config_, - video_receiver_config_, - &receiver_to_sender_); + void ReceivePacket(scoped_ptr<Packet> packet) { + cast_receiver_->ReceivePacket(packet.Pass()); + } + void Create() { net::IPEndPoint dummy_endpoint; transport_sender_.reset(new CastTransportSenderImpl( NULL, testing_clock_sender_, dummy_endpoint, + dummy_endpoint, make_scoped_ptr(new base::DictionaryValue), base::Bind(&UpdateCastTransportStatus), base::Bind(&End2EndTest::LogRawEvents, base::Unretained(this)), base::TimeDelta::FromMilliseconds(1), task_runner_sender_, + PacketReceiverCallback(), &sender_to_receiver_)); + transport_receiver_.reset(new CastTransportSenderImpl( + NULL, + testing_clock_sender_, + dummy_endpoint, + dummy_endpoint, + make_scoped_ptr(new base::DictionaryValue), + base::Bind(&UpdateCastTransportStatus), + base::Bind(&End2EndTest::LogRawEvents, base::Unretained(this)), + base::TimeDelta::FromMilliseconds(1), + task_runner_sender_, + base::Bind(&End2EndTest::ReceivePacket, base::Unretained(this)), + &receiver_to_sender_)); + + cast_receiver_ = CastReceiver::Create(cast_environment_receiver_, + audio_receiver_config_, + video_receiver_config_, + transport_receiver_.get()); + cast_sender_ = CastSender::Create(cast_environment_sender_, transport_sender_.get()); @@ -616,9 +635,10 @@ class End2EndTest : public ::testing::Test { transport_sender_->PacketReceiverForTesting(), task_runner_, &testing_clock_); - sender_to_receiver_.SetPacketReceiver(cast_receiver_->packet_receiver(), - task_runner_, - &testing_clock_); + sender_to_receiver_.SetPacketReceiver( + transport_receiver_->PacketReceiverForTesting(), + task_runner_, + &testing_clock_); audio_frame_input_ = cast_sender_->audio_frame_input(); video_frame_input_ = cast_sender_->video_frame_input(); @@ -792,6 +812,7 @@ class End2EndTest : public ::testing::Test { LoopBackTransport receiver_to_sender_; LoopBackTransport sender_to_receiver_; scoped_ptr<CastTransportSenderImpl> transport_sender_; + scoped_ptr<CastTransportSenderImpl> transport_receiver_; scoped_ptr<CastReceiver> cast_receiver_; scoped_ptr<CastSender> cast_sender_; diff --git a/media/cast/test/sender.cc b/media/cast/test/sender.cc index 07efb294..b61bd3a 100644 --- a/media/cast/test/sender.cc +++ b/media/cast/test/sender.cc @@ -321,11 +321,13 @@ int main(int argc, char** argv) { media::cast::CastTransportSender::Create( NULL, // net log. cast_environment->Clock(), + net::IPEndPoint(), remote_endpoint, make_scoped_ptr(new base::DictionaryValue), // options base::Bind(&UpdateCastTransportStatus), base::Bind(&LogRawEvents, cast_environment), base::TimeDelta::FromSeconds(1), + media::cast::PacketReceiverCallback(), io_message_loop.message_loop_proxy()); // CastSender initialization. diff --git a/media/cast/test/simulator.cc b/media/cast/test/simulator.cc index 7122c8f..10b56d1 100644 --- a/media/cast/test/simulator.cc +++ b/media/cast/test/simulator.cc @@ -387,12 +387,39 @@ void RunSimulation(const base::FilePath& source_path, LoopBackTransport receiver_to_sender(receiver_env); LoopBackTransport sender_to_receiver(sender_env); + struct PacketProxy { + PacketProxy() : receiver(NULL) {} + void ReceivePacket(scoped_ptr<Packet> packet) { + if (receiver) + receiver->ReceivePacket(packet.Pass()); + } + CastReceiver* receiver; + }; + + PacketProxy packet_proxy; + // Cast receiver. + scoped_ptr<CastTransportSender> transport_receiver( + new CastTransportSenderImpl( + NULL, + &testing_clock, + net::IPEndPoint(), + net::IPEndPoint(), + make_scoped_ptr(new base::DictionaryValue), + base::Bind(&UpdateCastTransportStatus), + base::Bind(&LogTransportEvents, receiver_env), + base::TimeDelta::FromSeconds(1), + task_runner, + base::Bind(&PacketProxy::ReceivePacket, + base::Unretained(&packet_proxy)), + &receiver_to_sender)); scoped_ptr<CastReceiver> cast_receiver( CastReceiver::Create(receiver_env, audio_receiver_config, video_receiver_config, - &receiver_to_sender)); + transport_receiver.get())); + + packet_proxy.receiver = cast_receiver.get(); // Cast sender and transport sender. scoped_ptr<CastTransportSender> transport_sender( @@ -400,11 +427,13 @@ void RunSimulation(const base::FilePath& source_path, NULL, &testing_clock, net::IPEndPoint(), + net::IPEndPoint(), make_scoped_ptr(new base::DictionaryValue), base::Bind(&UpdateCastTransportStatus), base::Bind(&LogTransportEvents, sender_env), base::TimeDelta::FromSeconds(1), task_runner, + PacketReceiverCallback(), &sender_to_receiver)); scoped_ptr<CastSender> cast_sender( CastSender::Create(sender_env, transport_sender.get())); @@ -429,7 +458,7 @@ void RunSimulation(const base::FilePath& source_path, task_runner, &testing_clock); sender_to_receiver.Initialize( ipp->NewBuffer(128 * 1024).Pass(), - cast_receiver->packet_receiver(), task_runner, + transport_receiver->PacketReceiverForTesting(), task_runner, &testing_clock); } else { LOG(INFO) << "No network simulation."; @@ -439,7 +468,7 @@ void RunSimulation(const base::FilePath& source_path, task_runner, &testing_clock); sender_to_receiver.Initialize( scoped_ptr<test::PacketPipe>(), - cast_receiver->packet_receiver(), task_runner, + transport_receiver->PacketReceiverForTesting(), task_runner, &testing_clock); } diff --git a/media/cast/test/utility/in_process_receiver.cc b/media/cast/test/utility/in_process_receiver.cc index fb41843..788325f 100644 --- a/media/cast/test/utility/in_process_receiver.cc +++ b/media/cast/test/utility/in_process_receiver.cc @@ -7,6 +7,7 @@ #include "base/bind_helpers.h" #include "base/synchronization/waitable_event.h" #include "base/time/time.h" +#include "base/values.h" #include "media/base/video_frame.h" #include "media/cast/cast_config.h" #include "media/cast/cast_environment.h" @@ -76,20 +77,24 @@ void InProcessReceiver::StartOnMainThread() { DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN)); DCHECK(!transport_ && !cast_receiver_); - transport_.reset( - new UdpTransport(NULL, - cast_environment_->GetTaskRunner(CastEnvironment::MAIN), - local_end_point_, - remote_end_point_, - 65536, - base::Bind(&InProcessReceiver::UpdateCastTransportStatus, - base::Unretained(this)))); + + transport_ = CastTransportSender::Create( + NULL, + cast_environment_->Clock(), + local_end_point_, + remote_end_point_, + scoped_ptr<base::DictionaryValue>(new base::DictionaryValue), + base::Bind(&InProcessReceiver::UpdateCastTransportStatus, + base::Unretained(this)), + BulkRawEventsCallback(), + base::TimeDelta(), + base::Bind(&InProcessReceiver::ReceivePacket, + base::Unretained(this)), + cast_environment_->GetTaskRunner(CastEnvironment::MAIN)); + cast_receiver_ = CastReceiver::Create( cast_environment_, audio_config_, video_config_, transport_.get()); - // TODO(hubbe): Make the cast receiver do this automatically. - transport_->StartReceiving(cast_receiver_->packet_receiver()); - PullNextAudioFrame(); PullNextVideoFrame(); } @@ -126,5 +131,10 @@ void InProcessReceiver::PullNextVideoFrame() { &InProcessReceiver::GotVideoFrame, weak_factory_.GetWeakPtr())); } +void InProcessReceiver::ReceivePacket(scoped_ptr<Packet> packet) { + // TODO(Hubbe): Make an InsertPacket method instead. + cast_receiver_->ReceivePacket(packet.Pass()); +} + } // namespace cast } // namespace media diff --git a/media/cast/test/utility/in_process_receiver.h b/media/cast/test/utility/in_process_receiver.h index 454dc93..c73115c 100644 --- a/media/cast/test/utility/in_process_receiver.h +++ b/media/cast/test/utility/in_process_receiver.h @@ -11,6 +11,7 @@ #include "media/base/audio_bus.h" #include "media/cast/cast_config.h" #include "media/cast/net/cast_transport_config.h" +#include "media/cast/net/cast_transport_sender.h" #include "net/base/ip_endpoint.h" namespace base { @@ -100,13 +101,15 @@ class InProcessReceiver { void PullNextAudioFrame(); void PullNextVideoFrame(); + void ReceivePacket(scoped_ptr<Packet> packet); + const scoped_refptr<CastEnvironment> cast_environment_; const net::IPEndPoint local_end_point_; const net::IPEndPoint remote_end_point_; const FrameReceiverConfig audio_config_; const FrameReceiverConfig video_config_; - scoped_ptr<UdpTransport> transport_; + scoped_ptr<CastTransportSender> transport_; scoped_ptr<CastReceiver> cast_receiver_; // NOTE: Weak pointers must be invalidated before all other member variables. |