From 5bc7183a8a86f77d79187b545f4442c02b4b5da4 Mon Sep 17 00:00:00 2001 From: "sergeyu@chromium.org" Date: Thu, 9 Dec 2010 01:34:08 +0000 Subject: Simplified frame rate control in the chromoting host. Insted of keeping semi-fixed frame rate, now capturing rate is controlled by how fast we can send data to the client. Capturing of frame n is started only after frame n-2 is sent (while n-1 is being encoded). This guarantees that we don't clog the video channel buffers, and that we start capturing only if we know that the frame will not need to wait for too long in the buffer. TEST=None BUG=None Review URL: http://codereview.chromium.org/5634002 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@68688 0039d316-1c4b-4281-b951-d872f2087c98 --- remoting/base/codec_test.cc | 5 + remoting/base/decoder_row_based.cc | 34 +++- remoting/base/decoder_row_based.h | 3 + remoting/base/encoder_row_based.cc | 10 +- remoting/base/encoder_row_based.h | 2 +- remoting/base/encoder_vp8.cc | 4 +- remoting/host/screen_recorder.cc | 237 ++++++++-------------------- remoting/host/screen_recorder.h | 51 +++--- remoting/host/screen_recorder_unittest.cc | 10 +- remoting/proto/video.proto | 14 +- remoting/protocol/buffered_socket_writer.cc | 64 ++++++-- remoting/protocol/buffered_socket_writer.h | 12 +- remoting/protocol/client_control_sender.cc | 4 +- remoting/protocol/input_sender.cc | 8 +- remoting/protocol/protobuf_video_writer.cc | 4 +- remoting/protocol/rtcp_writer.cc | 2 +- remoting/protocol/rtp_writer.cc | 2 +- 17 files changed, 217 insertions(+), 249 deletions(-) (limited to 'remoting') diff --git a/remoting/base/codec_test.cc b/remoting/base/codec_test.cc index 0521b52d..0c59659 100644 --- a/remoting/base/codec_test.cc +++ b/remoting/base/codec_test.cc @@ -81,6 +81,11 @@ class EncoderMessageTester { state_ = kWaitingForBeginRect; ++end_rect_; } + + if ((packet->flags() & VideoPacket::LAST_PARTITION) != 0) { + // LAST_PARTITION must always be marked with LAST_PACKET. + EXPECT_TRUE((packet->flags() & VideoPacket::LAST_PACKET) != 0); + } } } diff --git a/remoting/base/decoder_row_based.cc b/remoting/base/decoder_row_based.cc index a657331..25e738f 100644 --- a/remoting/base/decoder_row_based.cc +++ b/remoting/base/decoder_row_based.cc @@ -46,10 +46,22 @@ void DecoderRowBased::Reset() { frame_ = NULL; decompressor_->Reset(); state_ = kUninitialized; + updated_rects_.clear(); } bool DecoderRowBased::IsReadyForData() { - return state_ == kReady || state_ == kProcessing || state_ == kDone; + switch (state_) { + case kUninitialized: + case kError: + return false; + case kReady: + case kProcessing: + case kPartitionDone: + case kDone: + return true; + } + NOTREACHED(); + return false; } void DecoderRowBased::Initialize(scoped_refptr frame) { @@ -119,14 +131,18 @@ Decoder::DecodeResult DecoderRowBased::DecodePacket(const VideoPacket* packet) { } } - if (state_ == kDone) { + if (state_ == kPartitionDone || state_ == kDone) { if (row_y_ < clip_.height()) { state_ = kError; LOG(WARNING) << "Received LAST_PACKET, but didn't get enough data."; return DECODE_ERROR; } + updated_rects_.push_back(clip_); decompressor_->Reset(); + } + + if (state_ == kDone) { return DECODE_DONE; } else { return DECODE_IN_PROGRESS; @@ -139,7 +155,7 @@ void DecoderRowBased::UpdateStateForPacket(const VideoPacket* packet) { } if (packet->flags() & VideoPacket::FIRST_PACKET) { - if (state_ != kReady && state_ != kDone) { + if (state_ != kReady && state_ != kDone && state_ != kPartitionDone) { state_ = kError; LOG(WARNING) << "Received unexpected FIRST_PACKET."; return; @@ -165,6 +181,15 @@ void DecoderRowBased::UpdateStateForPacket(const VideoPacket* packet) { LOG(WARNING) << "Received unexpected LAST_PACKET."; return; } + state_ = kPartitionDone; + } + + if (packet->flags() & VideoPacket::LAST_PARTITION) { + if (state_ != kPartitionDone) { + state_ = kError; + LOG(WARNING) << "Received unexpected LAST_PARTITION."; + return; + } state_ = kDone; } @@ -172,7 +197,8 @@ void DecoderRowBased::UpdateStateForPacket(const VideoPacket* packet) { } void DecoderRowBased::GetUpdatedRects(UpdatedRects* rects) { - rects->push_back(clip_); + rects->swap(updated_rects_); + updated_rects_.clear(); } VideoPacketFormat::Encoding DecoderRowBased::Encoding() { diff --git a/remoting/base/decoder_row_based.h b/remoting/base/decoder_row_based.h index da05c05..05c2e3c 100644 --- a/remoting/base/decoder_row_based.h +++ b/remoting/base/decoder_row_based.h @@ -36,6 +36,7 @@ class DecoderRowBased : public Decoder { kUninitialized, kReady, kProcessing, + kPartitionDone, kDone, kError, }; @@ -70,6 +71,8 @@ class DecoderRowBased : public Decoder { // True if we should decode the image upside down. bool reverse_rows_; + UpdatedRects updated_rects_; + DISALLOW_COPY_AND_ASSIGN(DecoderRowBased); }; diff --git a/remoting/base/encoder_row_based.cc b/remoting/base/encoder_row_based.cc index 6a797b2..d9dcb10 100644 --- a/remoting/base/encoder_row_based.cc +++ b/remoting/base/encoder_row_based.cc @@ -65,17 +65,15 @@ void EncoderRowBased::Encode(scoped_refptr capture_data, callback_.reset(data_available_callback); const InvalidRects& rects = capture_data->dirty_rects(); - int index = 0; - for (InvalidRects::const_iterator r = rects.begin(); - r != rects.end(); ++r, ++index) { - EncodeRect(*r, index); + for (InvalidRects::const_iterator r = rects.begin(); r != rects.end(); ++r) { + EncodeRect(*r, r == --rects.end()); } capture_data_ = NULL; callback_.reset(); } -void EncoderRowBased::EncodeRect(const gfx::Rect& rect, size_t rect_index) { +void EncoderRowBased::EncodeRect(const gfx::Rect& rect, bool last) { CHECK(capture_data_->data_planes().data[0]); const int strides = capture_data_->data_planes().strides[0]; const int bytes_per_pixel = GetBytesPerPixel(capture_data_->pixel_format()); @@ -117,6 +115,8 @@ void EncoderRowBased::EncodeRect(const gfx::Rect& rect, size_t rect_index) { // We have reached the end of stream. if (!compress_again) { packet->set_flags(packet->flags() | VideoPacket::LAST_PACKET); + if (last) + packet->set_flags(packet->flags() | VideoPacket::LAST_PARTITION); DCHECK(row_pos == row_size); DCHECK(row_y == rect.height() - 1); } diff --git a/remoting/base/encoder_row_based.h b/remoting/base/encoder_row_based.h index 1dfd916..11ebd9a 100644 --- a/remoting/base/encoder_row_based.h +++ b/remoting/base/encoder_row_based.h @@ -41,7 +41,7 @@ class EncoderRowBased : public Encoder { int packet_size); // Encode a single dirty rect using compressor. - void EncodeRect(const gfx::Rect& rect, size_t rect_index); + void EncodeRect(const gfx::Rect& rect, bool last); // Marks a packet as the first in a series of rectangle updates. void PrepareUpdateStart(const gfx::Rect& rect, VideoPacket* packet); diff --git a/remoting/base/encoder_vp8.cc b/remoting/base/encoder_vp8.cc index ea02add..6d07e54 100644 --- a/remoting/base/encoder_vp8.cc +++ b/remoting/base/encoder_vp8.cc @@ -189,6 +189,7 @@ void EncoderVp8::Encode(scoped_refptr capture_data, switch (packet->kind) { case VPX_CODEC_CX_FRAME_PKT: got_data = true; + // TODO(sergeyu): Split each frame into multiple partitions. message->set_data(packet->data.frame.buf, packet->data.frame.sz); break; default: @@ -197,7 +198,8 @@ void EncoderVp8::Encode(scoped_refptr capture_data, } message->mutable_format()->set_encoding(VideoPacketFormat::ENCODING_VP8); - message->set_flags(VideoPacket::FIRST_PACKET | VideoPacket::LAST_PACKET); + message->set_flags(VideoPacket::FIRST_PACKET | VideoPacket::LAST_PACKET | + VideoPacket::LAST_PARTITION); data_available_callback->Run(message); delete data_available_callback; diff --git a/remoting/host/screen_recorder.cc b/remoting/host/screen_recorder.cc index a972c1a..cf3417c 100644 --- a/remoting/host/screen_recorder.cc +++ b/remoting/host/screen_recorder.cc @@ -10,6 +10,7 @@ #include "base/scoped_ptr.h" #include "base/stl_util-inl.h" #include "base/task.h" +#include "base/time.h" #include "remoting/base/capture_data.h" #include "remoting/base/tracer.h" #include "remoting/proto/control.pb.h" @@ -27,17 +28,11 @@ namespace remoting { // experiment to provide good latency. static const double kDefaultCaptureRate = 20.0; -// Interval that we perform rate regulation. -static const base::TimeDelta kRateControlInterval = - base::TimeDelta::FromSeconds(1); - -// We divide the pending update stream number by this value to determine the -// rate divider. -static const int kSlowDownFactor = 10; - -// A list of dividers used to divide the max rate to determine the current -// capture rate. -static const int kRateDividers[] = {1, 2, 4, 8, 16}; +// Maximum number of frames that can be processed similtaneously. +// TODO(sergeyu): Should this be set to 1? Or should we change +// dynamically depending on how fast network and CPU are? Experement +// with it. +static const int kMaxRecordings = 2; ScreenRecorder::ScreenRecorder( MessageLoop* capture_loop, @@ -50,11 +45,10 @@ ScreenRecorder::ScreenRecorder( network_loop_(network_loop), capturer_(capturer), encoder_(encoder), - rate_(kDefaultCaptureRate), started_(false), recordings_(0), - max_rate_(kDefaultCaptureRate), - rate_control_started_(false) { + frame_skipped_(false), + max_rate_(kDefaultCaptureRate) { DCHECK(capture_loop_); DCHECK(encode_loop_); DCHECK(network_loop_); @@ -83,10 +77,12 @@ void ScreenRecorder::SetMaxRate(double rate) { void ScreenRecorder::AddConnection( scoped_refptr connection) { - // Gets the init information for the connection. - capture_loop_->PostTask( + ScopedTracer tracer("AddConnection"); + + // Add the client to the list so it can receive update stream. + network_loop_->PostTask( FROM_HERE, - NewTracedMethod(this, &ScreenRecorder::DoGetInitInfo, connection)); + NewTracedMethod(this, &ScreenRecorder::DoAddConnection, connection)); } void ScreenRecorder::RemoveConnection( @@ -106,11 +102,13 @@ void ScreenRecorder::RemoveAllConnections() { Capturer* ScreenRecorder::capturer() { DCHECK_EQ(capture_loop_, MessageLoop::current()); + DCHECK(capturer_.get()); return capturer_.get(); } Encoder* ScreenRecorder::encoder() { DCHECK_EQ(encode_loop_, MessageLoop::current()); + DCHECK(encoder_.get()); return encoder_.get(); } @@ -118,6 +116,7 @@ Encoder* ScreenRecorder::encoder() { void ScreenRecorder::DoStart() { DCHECK_EQ(capture_loop_, MessageLoop::current()); + DCHECK(!started_); if (started_) { NOTREACHED() << "Record session already started."; @@ -125,12 +124,10 @@ void ScreenRecorder::DoStart() { } started_ = true; - DoCapture(); + StartCaptureTimer(); - // Starts the rate regulation. - network_loop_->PostTask( - FROM_HERE, - NewTracedMethod(this, &ScreenRecorder::DoStartRateControl)); + // Capture first frame immedately. + DoCapture(); } void ScreenRecorder::DoPause() { @@ -141,56 +138,31 @@ void ScreenRecorder::DoPause() { return; } + capture_timer_.Stop(); started_ = false; - - // Pause the rate regulation. - network_loop_->PostTask( - FROM_HERE, - NewTracedMethod(this, &ScreenRecorder::DoPauseRateControl)); -} - -void ScreenRecorder::DoSetRate(double rate) { - DCHECK_EQ(capture_loop_, MessageLoop::current()); - if (rate == rate_) - return; - - // Change the current capture rate. - rate_ = rate; - - // If we have already started then schedule the next capture with the new - // rate. - if (started_) - ScheduleNextCapture(); } void ScreenRecorder::DoSetMaxRate(double max_rate) { DCHECK_EQ(capture_loop_, MessageLoop::current()); // TODO(hclam): Should also check for small epsilon. - if (max_rate != 0) { - max_rate_ = max_rate; - DoSetRate(max_rate); - } else { - NOTREACHED() << "Rate is too small."; + DCHECK_GT(max_rate, 0.0) << "Rate is too small."; + + max_rate_ = max_rate; + + // Restart the timer with the new rate. + if (started_) { + capture_timer_.Stop(); + StartCaptureTimer(); } } -void ScreenRecorder::ScheduleNextCapture() { +void ScreenRecorder::StartCaptureTimer() { DCHECK_EQ(capture_loop_, MessageLoop::current()); - ScopedTracer tracer("capture"); - - TraceContext::tracer()->PrintString("Capture Scheduled"); - - if (rate_ == 0) - return; - base::TimeDelta interval = base::TimeDelta::FromMilliseconds( - static_cast(base::Time::kMillisecondsPerSecond / rate_)); - capture_loop_->PostDelayedTask( - FROM_HERE, - NewTracedMethod(this, &ScreenRecorder::DoCapture), - interval.InMilliseconds()); + static_cast(base::Time::kMillisecondsPerSecond / max_rate_)); + capture_timer_.Start(interval, this, &ScreenRecorder::DoCapture); } void ScreenRecorder::DoCapture() { @@ -198,32 +170,22 @@ void ScreenRecorder::DoCapture() { // Make sure we have at most two oustanding recordings. We can simply return // if we can't make a capture now, the next capture will be started by the // end of an encode operation. - if (recordings_ >= 2 || !started_) { + if (recordings_ >= kMaxRecordings || !started_) { + frame_skipped_ = true; return; } - TraceContext::tracer()->PrintString("Capture Started"); - - base::Time now = base::Time::Now(); - base::TimeDelta interval = base::TimeDelta::FromMilliseconds( - static_cast(base::Time::kMillisecondsPerSecond / rate_)); - base::TimeDelta elapsed = now - last_capture_time_; - // If this method is called sooner than the required interval we return - // immediately - if (elapsed < interval) { - return; + if (frame_skipped_) { + frame_skipped_ = false; + capture_timer_.Reset(); } + TraceContext::tracer()->PrintString("Capture Started"); + // At this point we are going to perform one capture so save the current time. - last_capture_time_ = now; ++recordings_; - // Before we actually do a capture, schedule the next one. - ScheduleNextCapture(); - // And finally perform one capture. - DCHECK(capturer()); - capturer()->CaptureInvalidRects( NewCallback(this, &ScreenRecorder::CaptureDoneCallback)); } @@ -239,7 +201,7 @@ void ScreenRecorder::CaptureDoneCallback( NewTracedMethod(this, &ScreenRecorder::DoEncode, capture_data)); } -void ScreenRecorder::DoFinishEncode() { +void ScreenRecorder::DoFinishSend() { DCHECK_EQ(capture_loop_, MessageLoop::current()); // Decrement the number of recording in process since we have completed @@ -248,103 +210,43 @@ void ScreenRecorder::DoFinishEncode() { // Try to do a capture again. Note that the following method may do nothing // if it is too early to perform a capture. - if (rate_ > 0) - DoCapture(); -} - -void ScreenRecorder::DoGetInitInfo( - scoped_refptr connection) { - DCHECK_EQ(capture_loop_, MessageLoop::current()); - - ScopedTracer tracer("init"); - - // Add the client to the list so it can receive update stream. - network_loop_->PostTask( - FROM_HERE, - NewTracedMethod(this, &ScreenRecorder::DoAddClient, connection)); + DoCapture(); } // Network thread -------------------------------------------------------------- -void ScreenRecorder::DoStartRateControl() { - DCHECK_EQ(network_loop_, MessageLoop::current()); - - if (rate_control_started_) { - NOTREACHED() << "Rate regulation already started"; - return; - } - rate_control_started_ = true; - ScheduleNextRateControl(); -} - -void ScreenRecorder::DoPauseRateControl() { +void ScreenRecorder::DoSendVideoPacket(VideoPacket* packet) { DCHECK_EQ(network_loop_, MessageLoop::current()); - if (!rate_control_started_) { - NOTREACHED() << "Rate regulation not started"; - return; - } - rate_control_started_ = false; -} - -void ScreenRecorder::ScheduleNextRateControl() { - ScopedTracer tracer("Rate Control"); - network_loop_->PostDelayedTask( - FROM_HERE, - NewTracedMethod(this, &ScreenRecorder::DoRateControl), - kRateControlInterval.InMilliseconds()); -} + TraceContext::tracer()->PrintString("DoSendVideoPacket"); -void ScreenRecorder::DoRateControl() { - DCHECK_EQ(network_loop_, MessageLoop::current()); + bool last = (packet->flags() & VideoPacket::LAST_PARTITION) != 0; - // If we have been paused then shutdown the rate regulation loop. - if (!rate_control_started_) - return; + for (ConnectionToClientList::const_iterator i = connections_.begin(); + i < connections_.end(); ++i) { + Task* done_task = NULL; - int max_pending_update_streams = 0; - for (size_t i = 0; i < connections_.size(); ++i) { - max_pending_update_streams = - std::max(max_pending_update_streams, - connections_[i]->video_stub()->GetPendingPackets()); - } + // Call OnFrameSent() only for the last packet in the first connection. + if (last && i == connections_.begin()) { + done_task = NewTracedMethod(this, &ScreenRecorder::OnFrameSent, packet); + } else { + done_task = new DeleteTask(packet); + } - // If |slow_down| equals zero, we have no slow down. - size_t slow_down = max_pending_update_streams / kSlowDownFactor; - // Set new_rate to -1 for checking later. - double new_rate = -1; - // If the slow down is too large. - if (slow_down >= arraysize(kRateDividers)) { - // Then we stop the capture completely. - new_rate = 0; - } else { - // Slow down the capture rate using the divider. - new_rate = max_rate_ / kRateDividers[slow_down]; + (*i)->video_stub()->ProcessVideoPacket(packet, done_task); } - DCHECK_NE(new_rate, -1.0); - // Then set the rate. - capture_loop_->PostTask( - FROM_HERE, - NewTracedMethod(this, &ScreenRecorder::DoSetRate, new_rate)); - ScheduleNextRateControl(); + TraceContext::tracer()->PrintString("DoSendVideoPacket done"); } -void ScreenRecorder::DoSendVideoPacket(VideoPacket* packet) { - DCHECK_EQ(network_loop_, MessageLoop::current()); - - TraceContext::tracer()->PrintString("DoSendUpdate"); - - for (ConnectionToClientList::const_iterator i = connections_.begin(); - i < connections_.end(); ++i) { - (*i)->video_stub()->ProcessVideoPacket( - packet, new DeleteTask(packet)); - } - - TraceContext::tracer()->PrintString("DoSendUpdate done"); +void ScreenRecorder::OnFrameSent(VideoPacket* packet) { + delete packet; + capture_loop_->PostTask( + FROM_HERE, NewTracedMethod(this, &ScreenRecorder::DoFinishSend)); } -void ScreenRecorder::DoAddClient(scoped_refptr connection) { +void ScreenRecorder::DoAddConnection( + scoped_refptr connection) { DCHECK_EQ(network_loop_, MessageLoop::current()); // TODO(hclam): Force a full frame for next encode. @@ -356,8 +258,8 @@ void ScreenRecorder::DoRemoveClient( DCHECK_EQ(network_loop_, MessageLoop::current()); // TODO(hclam): Is it correct to do to a scoped_refptr? - ConnectionToClientList::iterator it - = std::find(connections_.begin(), connections_.end(), connection); + ConnectionToClientList::iterator it = + std::find(connections_.begin(), connections_.end(), connection); if (it != connections_.end()) { connections_.erase(it); } @@ -380,14 +282,14 @@ void ScreenRecorder::DoEncode( // Early out if there's nothing to encode. if (!capture_data->dirty_rects().size()) { capture_loop_->PostTask( - FROM_HERE, NewTracedMethod(this, &ScreenRecorder::DoFinishEncode)); + FROM_HERE, NewTracedMethod(this, &ScreenRecorder::DoFinishSend)); return; } // TODO(hclam): Enable |force_refresh| if a new connection was // added. TraceContext::tracer()->PrintString("Encode start"); - encoder_->Encode(capture_data, false, + encoder()->Encode(capture_data, false, NewCallback(this, &ScreenRecorder::EncodeDataAvailableTask)); TraceContext::tracer()->PrintString("Encode Done"); } @@ -395,20 +297,9 @@ void ScreenRecorder::DoEncode( void ScreenRecorder::EncodeDataAvailableTask(VideoPacket* packet) { DCHECK_EQ(encode_loop_, MessageLoop::current()); - bool last = (packet->flags() & VideoPacket::LAST_PACKET) != 0; - - // Before a new encode task starts, notify connected clients a new update - // stream is coming. - // Notify this will keep a reference to the DataBuffer in the - // task. The ownership will eventually pass to the ConnectionToClients. network_loop_->PostTask( FROM_HERE, NewTracedMethod(this, &ScreenRecorder::DoSendVideoPacket, packet)); - - if (last) { - capture_loop_->PostTask( - FROM_HERE, NewTracedMethod(this, &ScreenRecorder::DoFinishEncode)); - } } } // namespace remoting diff --git a/remoting/host/screen_recorder.h b/remoting/host/screen_recorder.h index 2cd02b1..3a844eb 100644 --- a/remoting/host/screen_recorder.h +++ b/remoting/host/screen_recorder.h @@ -11,7 +11,7 @@ #include "base/message_loop.h" #include "base/ref_counted.h" #include "base/scoped_ptr.h" -#include "base/time.h" +#include "base/timer.h" #include "remoting/base/encoder.h" #include "remoting/host/capturer.h" #include "remoting/proto/video.pb.h" @@ -105,44 +105,39 @@ class ScreenRecorder : public base::RefCountedThreadSafe { void DoStart(); void DoPause(); - void DoSetRate(double rate); void DoSetMaxRate(double max_rate); // Hepler method to schedule next capture using the current rate. - void ScheduleNextCapture(); + void StartCaptureTimer(); void DoCapture(); void CaptureDoneCallback(scoped_refptr capture_data); - void DoFinishEncode(); - - void DoGetInitInfo(scoped_refptr client); + void DoFinishSend(); // Network thread ----------------------------------------------------------- - void DoStartRateControl(); - void DoPauseRateControl(); - - // Helper method to schedule next rate regulation task. - void ScheduleNextRateControl(); - - void DoRateControl(); - - // DoSendUpdate takes ownership of header and is responsible for deleting it. + // DoSendVideoPacket takes ownership of the |packet| and is responsible + // for deleting it. void DoSendVideoPacket(VideoPacket* packet); + void DoSendInit(scoped_refptr connection, int width, int height); - void DoAddClient(scoped_refptr connection); + void DoAddConnection(scoped_refptr connection); void DoRemoveClient(scoped_refptr connection); void DoRemoveAllClients(); + // Callback for the last packet in one update. Deletes |packet| and + // schedules next screen capture. + void OnFrameSent(VideoPacket* packet); + // Encoder thread ----------------------------------------------------------- void DoEncode(scoped_refptr capture_data); - // EncodeDataAvailableTask takes ownership of header and is responsible for - // deleting it. + // EncodeDataAvailableTask takes ownership of |packet|. void EncodeDataAvailableTask(VideoPacket* packet); + void SendVideoPacket(VideoPacket* packet); // Message loops used by this class. MessageLoop* capture_loop_; @@ -166,18 +161,20 @@ class ScreenRecorder : public base::RefCountedThreadSafe { ConnectionToClientList connections_; // The following members are accessed on the capture thread. - double rate_; // Number of captures to perform every second. bool started_; - base::Time last_capture_time_; // Saves the time last capture started. - int recordings_; // Count the number of recordings - // (i.e. capture or encode) happening. - // The maximum rate is written on the capture thread and read on the network - // thread. - double max_rate_; // Number of captures to perform every second. + // Timer that calls DoCapture. + base::RepeatingTimer capture_timer_; + + // Count the number of recordings (i.e. capture or encode) happening. + int recordings_; + + // Set to true if we've skipped last capture because there are too + // many pending frames. + int frame_skipped_; - // The following member is accessed on the network thread. - bool rate_control_started_; + // Number of captures to perform every second. Written on the capture thread. + double max_rate_; DISALLOW_COPY_AND_ASSIGN(ScreenRecorder); }; diff --git a/remoting/host/screen_recorder_unittest.cc b/remoting/host/screen_recorder_unittest.cc index 848226f..4dcfe13 100644 --- a/remoting/host/screen_recorder_unittest.cc +++ b/remoting/host/screen_recorder_unittest.cc @@ -19,6 +19,7 @@ using ::testing::_; using ::testing::AtLeast; using ::testing::NotNull; using ::testing::Return; +using ::testing::SaveArg; namespace remoting { @@ -102,8 +103,12 @@ TEST_F(ScreenRecorderTest, OneRecordCycle) { EXPECT_CALL(*connection_, video_stub()) .WillRepeatedly(Return(&video_stub)); + Task* done_task = NULL; + // Expect the client be notified. - EXPECT_CALL(video_stub, ProcessVideoPacket(_, _)); + EXPECT_CALL(video_stub, ProcessVideoPacket(_, _)) + .Times(1) + .WillOnce(SaveArg<1>(&done_task)); EXPECT_CALL(video_stub, GetPendingPackets()) .Times(AtLeast(0)) .WillRepeatedly(Return(0)); @@ -113,6 +118,9 @@ TEST_F(ScreenRecorderTest, OneRecordCycle) { // Make sure all tasks are completed. message_loop_.RunAllPending(); + + done_task->Run(); + delete done_task; } // TODO(hclam): Add test for double buffering. diff --git a/remoting/proto/video.proto b/remoting/proto/video.proto index e1cf5cf..394f891 100644 --- a/remoting/proto/video.proto +++ b/remoting/proto/video.proto @@ -35,14 +35,17 @@ message VideoPacketFormat { message VideoPacket { // Bitmasks for use in the flags field below. // - // The encoder may fragment one update into multiple packets depending on - // how the encoder outputs data. Thus, one update can logically consist of - // multiple packets. The FIRST_PACKET and LAST_PACKET flags are used to - // indicate the start and end of a logical update. Here are notable - // consequences: + // The encoder may fragment one update into multiple partitions. + // Each partition may be divided into multiple packets depending on + // how the encoder outputs data. Thus, one update can logically + // consist of multiple packets. The FIRST_PACKET and LAST_PACKET + // flags are used to indicate the start and end of a partition. The + // LAST_PARTITION flag is set for the last packet in the last + // partition. Here are notable consequences: // * Both FIRST_PACKET and LAST_PACKET may be set if an update is only // one packet long. // * The VideoPacketFormat is only supplied in a FIRST_PACKET. + // * LAST_PARTITION can be set only in packet that has LAST_PACKET set. // * An local update cannot change format between a FIRST_PACKET and // a LAST_PACKET. // * All packets in one logical update must be processed in order, and @@ -50,6 +53,7 @@ message VideoPacket { enum Flags { FIRST_PACKET = 1; LAST_PACKET = 2; + LAST_PARTITION = 4; } optional int32 flags = 1 [default = 0]; diff --git a/remoting/protocol/buffered_socket_writer.cc b/remoting/protocol/buffered_socket_writer.cc index 0cf39bc..11b8395 100644 --- a/remoting/protocol/buffered_socket_writer.cc +++ b/remoting/protocol/buffered_socket_writer.cc @@ -5,11 +5,34 @@ #include "remoting/protocol/buffered_socket_writer.h" #include "base/message_loop.h" +#include "base/stl_util-inl.h" #include "net/base/net_errors.h" namespace remoting { namespace protocol { +class BufferedSocketWriterBase::PendingPacket { + public: + PendingPacket(scoped_refptr data, Task* done_task) + : data_(data), + done_task_(done_task) { + } + ~PendingPacket() { + if (done_task_.get()) + done_task_->Run(); + } + + net::IOBufferWithSize* data() { + return data_; + } + + private: + scoped_refptr data_; + scoped_ptr done_task_; + + DISALLOW_COPY_AND_ASSIGN(PendingPacket); +}; + BufferedSocketWriterBase::BufferedSocketWriterBase() : buffer_size_(0), socket_(NULL), @@ -32,11 +55,11 @@ void BufferedSocketWriterBase::Init(net::Socket* socket, } bool BufferedSocketWriterBase::Write( - scoped_refptr data) { + scoped_refptr data, Task* done_task) { AutoLock auto_lock(lock_); if (!socket_) return false; - queue_.push(data); + queue_.push_back(new PendingPacket(data, done_task)); buffer_size_ += data->size(); message_loop_->PostTask( FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriterBase::DoWrite)); @@ -112,9 +135,7 @@ void BufferedSocketWriterBase::OnWritten(int result) { void BufferedSocketWriterBase::HandleError(int result) { AutoLock auto_lock(lock_); closed_ = true; - while (!queue_.empty()) { - queue_.pop(); - } + STLDeleteElements(&queue_); // Notify subclass that an error is received. OnError_Locked(result); @@ -135,19 +156,27 @@ void BufferedSocketWriterBase::Close() { closed_ = true; } +void BufferedSocketWriterBase::PopQueue() { + // This also calls |done_task|. + delete queue_.front(); + queue_.pop_front(); +} + BufferedSocketWriter::BufferedSocketWriter() { } -BufferedSocketWriter::~BufferedSocketWriter() { } + +BufferedSocketWriter::~BufferedSocketWriter() { + STLDeleteElements(&queue_); +} void BufferedSocketWriter::GetNextPacket_Locked( net::IOBuffer** buffer, int* size) { - while (!current_buf_ || current_buf_->BytesRemaining() == 0) { + if (!current_buf_) { if (queue_.empty()) { *buffer = NULL; return; // Nothing to write. } - current_buf_ = - new net::DrainableIOBuffer(queue_.front(), queue_.front()->size()); - queue_.pop(); + current_buf_ = new net::DrainableIOBuffer( + queue_.front()->data(), queue_.front()->data()->size()); } *buffer = current_buf_; @@ -157,6 +186,11 @@ void BufferedSocketWriter::GetNextPacket_Locked( void BufferedSocketWriter::AdvanceBufferPosition_Locked(int written) { buffer_size_ -= written; current_buf_->DidConsume(written); + + if (current_buf_->BytesRemaining() == 0) { + PopQueue(); + current_buf_ = NULL; + } } void BufferedSocketWriter::OnError_Locked(int result) { @@ -172,14 +206,14 @@ void BufferedDatagramWriter::GetNextPacket_Locked( *buffer = NULL; return; // Nothing to write. } - *buffer = queue_.front(); - *size = queue_.front()->size(); + *buffer = queue_.front()->data(); + *size = queue_.front()->data()->size(); } void BufferedDatagramWriter::AdvanceBufferPosition_Locked(int written) { - DCHECK_EQ(written, queue_.front()->size()); - buffer_size_ -= queue_.front()->size(); - queue_.pop(); + DCHECK_EQ(written, queue_.front()->data()->size()); + buffer_size_ -= queue_.front()->data()->size(); + PopQueue(); } void BufferedDatagramWriter::OnError_Locked(int result) { diff --git a/remoting/protocol/buffered_socket_writer.h b/remoting/protocol/buffered_socket_writer.h index c786c44..3ea127e 100644 --- a/remoting/protocol/buffered_socket_writer.h +++ b/remoting/protocol/buffered_socket_writer.h @@ -5,7 +5,7 @@ #ifndef REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_ #define REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_ -#include +#include #include "base/lock.h" #include "base/ref_counted.h" @@ -13,6 +13,7 @@ #include "net/socket/socket.h" class MessageLoop; +class Task; namespace net { class Socket; @@ -45,7 +46,7 @@ class BufferedSocketWriterBase // Puts a new data chunk in the buffer. Returns false and doesn't enqueue // the data if called before Init(). Can be called on any thread. - bool Write(scoped_refptr buffer); + bool Write(scoped_refptr buffer, Task* done_task); // Returns current size of the buffer. Can be called on any thread. int GetBufferSize(); @@ -58,11 +59,16 @@ class BufferedSocketWriterBase void Close(); protected: - typedef std::queue > DataQueue; + class PendingPacket; + typedef std::list DataQueue; DataQueue queue_; int buffer_size_; + // Removes element from the front of the queue and calls |done_task| + // for that element. + void PopQueue(); + // Following three methods must be implemented in child classes. // GetNextPacket() returns next packet that needs to be written to the // socket. |buffer| must be set to NULL if there is nothing left in the queue. diff --git a/remoting/protocol/client_control_sender.cc b/remoting/protocol/client_control_sender.cc index d1fd2f5..c16d235 100644 --- a/remoting/protocol/client_control_sender.cc +++ b/remoting/protocol/client_control_sender.cc @@ -28,9 +28,7 @@ void ClientControlSender::NotifyResolution( const NotifyResolutionRequest* msg, Task* done) { protocol::ControlMessage message; message.mutable_notify_resolution()->CopyFrom(*msg); - buffered_writer_->Write(SerializeAndFrameMessage(message)); - done->Run(); - delete done; + buffered_writer_->Write(SerializeAndFrameMessage(message), done); } } // namespace protocol diff --git a/remoting/protocol/input_sender.cc b/remoting/protocol/input_sender.cc index e45149e..8831649 100644 --- a/remoting/protocol/input_sender.cc +++ b/remoting/protocol/input_sender.cc @@ -31,9 +31,7 @@ void InputSender::InjectKeyEvent(const KeyEvent* event, Task* done) { // TODO(hclam): Provide timestamp. evt->set_timestamp(0); evt->mutable_key()->CopyFrom(*event); - buffered_writer_->Write(SerializeAndFrameMessage(message)); - done->Run(); - delete done; + buffered_writer_->Write(SerializeAndFrameMessage(message), done); } void InputSender::InjectMouseEvent(const MouseEvent* event, Task* done) { @@ -42,9 +40,7 @@ void InputSender::InjectMouseEvent(const MouseEvent* event, Task* done) { // TODO(hclam): Provide timestamp. evt->set_timestamp(0); evt->mutable_mouse()->CopyFrom(*event); - buffered_writer_->Write(SerializeAndFrameMessage(message)); - done->Run(); - delete done; + buffered_writer_->Write(SerializeAndFrameMessage(message), done); } } // namespace protocol diff --git a/remoting/protocol/protobuf_video_writer.cc b/remoting/protocol/protobuf_video_writer.cc index 73a9b0d..025445d 100644 --- a/remoting/protocol/protobuf_video_writer.cc +++ b/remoting/protocol/protobuf_video_writer.cc @@ -25,9 +25,7 @@ void ProtobufVideoWriter::Init(protocol::Session* session) { void ProtobufVideoWriter::ProcessVideoPacket(const VideoPacket* packet, Task* done) { - buffered_writer_->Write(SerializeAndFrameMessage(*packet)); - done->Run(); - delete done; + buffered_writer_->Write(SerializeAndFrameMessage(*packet), done); } int ProtobufVideoWriter::GetPendingPackets() { diff --git a/remoting/protocol/rtcp_writer.cc b/remoting/protocol/rtcp_writer.cc index c7d53f0..1996665 100644 --- a/remoting/protocol/rtcp_writer.cc +++ b/remoting/protocol/rtcp_writer.cc @@ -32,7 +32,7 @@ void RtcpWriter::SendReport(const RtcpReceiverReport& report) { PackRtcpReceiverReport(report, reinterpret_cast(buffer->data()), size); - buffered_rtcp_writer_->Write(buffer); + buffered_rtcp_writer_->Write(buffer, NULL); } } // namespace protocol diff --git a/remoting/protocol/rtp_writer.cc b/remoting/protocol/rtp_writer.cc index 7341741..e26bdb7 100644 --- a/remoting/protocol/rtp_writer.cc +++ b/remoting/protocol/rtp_writer.cc @@ -69,7 +69,7 @@ void RtpWriter::SendPacket(uint32 timestamp, bool marker, payload_size); // And write the packet. - buffered_rtp_writer_->Write(buffer); + buffered_rtp_writer_->Write(buffer, NULL); } int RtpWriter::GetPendingPackets() { -- cgit v1.1