diff options
Diffstat (limited to 'remoting')
-rw-r--r-- | remoting/base/codec_test.cc | 5 | ||||
-rw-r--r-- | remoting/base/decoder_row_based.cc | 34 | ||||
-rw-r--r-- | remoting/base/decoder_row_based.h | 3 | ||||
-rw-r--r-- | remoting/base/encoder_row_based.cc | 10 | ||||
-rw-r--r-- | remoting/base/encoder_row_based.h | 2 | ||||
-rw-r--r-- | remoting/base/encoder_vp8.cc | 4 | ||||
-rw-r--r-- | remoting/host/screen_recorder.cc | 237 | ||||
-rw-r--r-- | remoting/host/screen_recorder.h | 51 | ||||
-rw-r--r-- | remoting/host/screen_recorder_unittest.cc | 10 | ||||
-rw-r--r-- | remoting/proto/video.proto | 14 | ||||
-rw-r--r-- | remoting/protocol/buffered_socket_writer.cc | 64 | ||||
-rw-r--r-- | remoting/protocol/buffered_socket_writer.h | 12 | ||||
-rw-r--r-- | remoting/protocol/client_control_sender.cc | 4 | ||||
-rw-r--r-- | remoting/protocol/input_sender.cc | 8 | ||||
-rw-r--r-- | remoting/protocol/protobuf_video_writer.cc | 4 | ||||
-rw-r--r-- | remoting/protocol/rtcp_writer.cc | 2 | ||||
-rw-r--r-- | remoting/protocol/rtp_writer.cc | 2 |
17 files changed, 217 insertions, 249 deletions
diff --git a/remoting/base/codec_test.cc b/remoting/base/codec_test.cc index 0521b52d..0c59659 100644 --- a/remoting/base/codec_test.cc +++ b/remoting/base/codec_test.cc @@ -81,6 +81,11 @@ class EncoderMessageTester { state_ = kWaitingForBeginRect; ++end_rect_; } + + if ((packet->flags() & VideoPacket::LAST_PARTITION) != 0) { + // LAST_PARTITION must always be marked with LAST_PACKET. + EXPECT_TRUE((packet->flags() & VideoPacket::LAST_PACKET) != 0); + } } } diff --git a/remoting/base/decoder_row_based.cc b/remoting/base/decoder_row_based.cc index a657331..25e738f 100644 --- a/remoting/base/decoder_row_based.cc +++ b/remoting/base/decoder_row_based.cc @@ -46,10 +46,22 @@ void DecoderRowBased::Reset() { frame_ = NULL; decompressor_->Reset(); state_ = kUninitialized; + updated_rects_.clear(); } bool DecoderRowBased::IsReadyForData() { - return state_ == kReady || state_ == kProcessing || state_ == kDone; + switch (state_) { + case kUninitialized: + case kError: + return false; + case kReady: + case kProcessing: + case kPartitionDone: + case kDone: + return true; + } + NOTREACHED(); + return false; } void DecoderRowBased::Initialize(scoped_refptr<media::VideoFrame> frame) { @@ -119,14 +131,18 @@ Decoder::DecodeResult DecoderRowBased::DecodePacket(const VideoPacket* packet) { } } - if (state_ == kDone) { + if (state_ == kPartitionDone || state_ == kDone) { if (row_y_ < clip_.height()) { state_ = kError; LOG(WARNING) << "Received LAST_PACKET, but didn't get enough data."; return DECODE_ERROR; } + updated_rects_.push_back(clip_); decompressor_->Reset(); + } + + if (state_ == kDone) { return DECODE_DONE; } else { return DECODE_IN_PROGRESS; @@ -139,7 +155,7 @@ void DecoderRowBased::UpdateStateForPacket(const VideoPacket* packet) { } if (packet->flags() & VideoPacket::FIRST_PACKET) { - if (state_ != kReady && state_ != kDone) { + if (state_ != kReady && state_ != kDone && state_ != kPartitionDone) { state_ = kError; LOG(WARNING) << "Received unexpected FIRST_PACKET."; return; @@ -165,6 +181,15 @@ void DecoderRowBased::UpdateStateForPacket(const VideoPacket* packet) { LOG(WARNING) << "Received unexpected LAST_PACKET."; return; } + state_ = kPartitionDone; + } + + if (packet->flags() & VideoPacket::LAST_PARTITION) { + if (state_ != kPartitionDone) { + state_ = kError; + LOG(WARNING) << "Received unexpected LAST_PARTITION."; + return; + } state_ = kDone; } @@ -172,7 +197,8 @@ void DecoderRowBased::UpdateStateForPacket(const VideoPacket* packet) { } void DecoderRowBased::GetUpdatedRects(UpdatedRects* rects) { - rects->push_back(clip_); + rects->swap(updated_rects_); + updated_rects_.clear(); } VideoPacketFormat::Encoding DecoderRowBased::Encoding() { diff --git a/remoting/base/decoder_row_based.h b/remoting/base/decoder_row_based.h index da05c05..05c2e3c 100644 --- a/remoting/base/decoder_row_based.h +++ b/remoting/base/decoder_row_based.h @@ -36,6 +36,7 @@ class DecoderRowBased : public Decoder { kUninitialized, kReady, kProcessing, + kPartitionDone, kDone, kError, }; @@ -70,6 +71,8 @@ class DecoderRowBased : public Decoder { // True if we should decode the image upside down. bool reverse_rows_; + UpdatedRects updated_rects_; + DISALLOW_COPY_AND_ASSIGN(DecoderRowBased); }; diff --git a/remoting/base/encoder_row_based.cc b/remoting/base/encoder_row_based.cc index 6a797b2..d9dcb10 100644 --- a/remoting/base/encoder_row_based.cc +++ b/remoting/base/encoder_row_based.cc @@ -65,17 +65,15 @@ void EncoderRowBased::Encode(scoped_refptr<CaptureData> capture_data, callback_.reset(data_available_callback); const InvalidRects& rects = capture_data->dirty_rects(); - int index = 0; - for (InvalidRects::const_iterator r = rects.begin(); - r != rects.end(); ++r, ++index) { - EncodeRect(*r, index); + for (InvalidRects::const_iterator r = rects.begin(); r != rects.end(); ++r) { + EncodeRect(*r, r == --rects.end()); } capture_data_ = NULL; callback_.reset(); } -void EncoderRowBased::EncodeRect(const gfx::Rect& rect, size_t rect_index) { +void EncoderRowBased::EncodeRect(const gfx::Rect& rect, bool last) { CHECK(capture_data_->data_planes().data[0]); const int strides = capture_data_->data_planes().strides[0]; const int bytes_per_pixel = GetBytesPerPixel(capture_data_->pixel_format()); @@ -117,6 +115,8 @@ void EncoderRowBased::EncodeRect(const gfx::Rect& rect, size_t rect_index) { // We have reached the end of stream. if (!compress_again) { packet->set_flags(packet->flags() | VideoPacket::LAST_PACKET); + if (last) + packet->set_flags(packet->flags() | VideoPacket::LAST_PARTITION); DCHECK(row_pos == row_size); DCHECK(row_y == rect.height() - 1); } diff --git a/remoting/base/encoder_row_based.h b/remoting/base/encoder_row_based.h index 1dfd916..11ebd9a 100644 --- a/remoting/base/encoder_row_based.h +++ b/remoting/base/encoder_row_based.h @@ -41,7 +41,7 @@ class EncoderRowBased : public Encoder { int packet_size); // Encode a single dirty rect using compressor. - void EncodeRect(const gfx::Rect& rect, size_t rect_index); + void EncodeRect(const gfx::Rect& rect, bool last); // Marks a packet as the first in a series of rectangle updates. void PrepareUpdateStart(const gfx::Rect& rect, VideoPacket* packet); diff --git a/remoting/base/encoder_vp8.cc b/remoting/base/encoder_vp8.cc index ea02add..6d07e54 100644 --- a/remoting/base/encoder_vp8.cc +++ b/remoting/base/encoder_vp8.cc @@ -189,6 +189,7 @@ void EncoderVp8::Encode(scoped_refptr<CaptureData> capture_data, switch (packet->kind) { case VPX_CODEC_CX_FRAME_PKT: got_data = true; + // TODO(sergeyu): Split each frame into multiple partitions. message->set_data(packet->data.frame.buf, packet->data.frame.sz); break; default: @@ -197,7 +198,8 @@ void EncoderVp8::Encode(scoped_refptr<CaptureData> capture_data, } message->mutable_format()->set_encoding(VideoPacketFormat::ENCODING_VP8); - message->set_flags(VideoPacket::FIRST_PACKET | VideoPacket::LAST_PACKET); + message->set_flags(VideoPacket::FIRST_PACKET | VideoPacket::LAST_PACKET | + VideoPacket::LAST_PARTITION); data_available_callback->Run(message); delete data_available_callback; diff --git a/remoting/host/screen_recorder.cc b/remoting/host/screen_recorder.cc index a972c1a..cf3417c 100644 --- a/remoting/host/screen_recorder.cc +++ b/remoting/host/screen_recorder.cc @@ -10,6 +10,7 @@ #include "base/scoped_ptr.h" #include "base/stl_util-inl.h" #include "base/task.h" +#include "base/time.h" #include "remoting/base/capture_data.h" #include "remoting/base/tracer.h" #include "remoting/proto/control.pb.h" @@ -27,17 +28,11 @@ namespace remoting { // experiment to provide good latency. static const double kDefaultCaptureRate = 20.0; -// Interval that we perform rate regulation. -static const base::TimeDelta kRateControlInterval = - base::TimeDelta::FromSeconds(1); - -// We divide the pending update stream number by this value to determine the -// rate divider. -static const int kSlowDownFactor = 10; - -// A list of dividers used to divide the max rate to determine the current -// capture rate. -static const int kRateDividers[] = {1, 2, 4, 8, 16}; +// Maximum number of frames that can be processed similtaneously. +// TODO(sergeyu): Should this be set to 1? Or should we change +// dynamically depending on how fast network and CPU are? Experement +// with it. +static const int kMaxRecordings = 2; ScreenRecorder::ScreenRecorder( MessageLoop* capture_loop, @@ -50,11 +45,10 @@ ScreenRecorder::ScreenRecorder( network_loop_(network_loop), capturer_(capturer), encoder_(encoder), - rate_(kDefaultCaptureRate), started_(false), recordings_(0), - max_rate_(kDefaultCaptureRate), - rate_control_started_(false) { + frame_skipped_(false), + max_rate_(kDefaultCaptureRate) { DCHECK(capture_loop_); DCHECK(encode_loop_); DCHECK(network_loop_); @@ -83,10 +77,12 @@ void ScreenRecorder::SetMaxRate(double rate) { void ScreenRecorder::AddConnection( scoped_refptr<ConnectionToClient> connection) { - // Gets the init information for the connection. - capture_loop_->PostTask( + ScopedTracer tracer("AddConnection"); + + // Add the client to the list so it can receive update stream. + network_loop_->PostTask( FROM_HERE, - NewTracedMethod(this, &ScreenRecorder::DoGetInitInfo, connection)); + NewTracedMethod(this, &ScreenRecorder::DoAddConnection, connection)); } void ScreenRecorder::RemoveConnection( @@ -106,11 +102,13 @@ void ScreenRecorder::RemoveAllConnections() { Capturer* ScreenRecorder::capturer() { DCHECK_EQ(capture_loop_, MessageLoop::current()); + DCHECK(capturer_.get()); return capturer_.get(); } Encoder* ScreenRecorder::encoder() { DCHECK_EQ(encode_loop_, MessageLoop::current()); + DCHECK(encoder_.get()); return encoder_.get(); } @@ -118,6 +116,7 @@ Encoder* ScreenRecorder::encoder() { void ScreenRecorder::DoStart() { DCHECK_EQ(capture_loop_, MessageLoop::current()); + DCHECK(!started_); if (started_) { NOTREACHED() << "Record session already started."; @@ -125,12 +124,10 @@ void ScreenRecorder::DoStart() { } started_ = true; - DoCapture(); + StartCaptureTimer(); - // Starts the rate regulation. - network_loop_->PostTask( - FROM_HERE, - NewTracedMethod(this, &ScreenRecorder::DoStartRateControl)); + // Capture first frame immedately. + DoCapture(); } void ScreenRecorder::DoPause() { @@ -141,56 +138,31 @@ void ScreenRecorder::DoPause() { return; } + capture_timer_.Stop(); started_ = false; - - // Pause the rate regulation. - network_loop_->PostTask( - FROM_HERE, - NewTracedMethod(this, &ScreenRecorder::DoPauseRateControl)); -} - -void ScreenRecorder::DoSetRate(double rate) { - DCHECK_EQ(capture_loop_, MessageLoop::current()); - if (rate == rate_) - return; - - // Change the current capture rate. - rate_ = rate; - - // If we have already started then schedule the next capture with the new - // rate. - if (started_) - ScheduleNextCapture(); } void ScreenRecorder::DoSetMaxRate(double max_rate) { DCHECK_EQ(capture_loop_, MessageLoop::current()); // TODO(hclam): Should also check for small epsilon. - if (max_rate != 0) { - max_rate_ = max_rate; - DoSetRate(max_rate); - } else { - NOTREACHED() << "Rate is too small."; + DCHECK_GT(max_rate, 0.0) << "Rate is too small."; + + max_rate_ = max_rate; + + // Restart the timer with the new rate. + if (started_) { + capture_timer_.Stop(); + StartCaptureTimer(); } } -void ScreenRecorder::ScheduleNextCapture() { +void ScreenRecorder::StartCaptureTimer() { DCHECK_EQ(capture_loop_, MessageLoop::current()); - ScopedTracer tracer("capture"); - - TraceContext::tracer()->PrintString("Capture Scheduled"); - - if (rate_ == 0) - return; - base::TimeDelta interval = base::TimeDelta::FromMilliseconds( - static_cast<int>(base::Time::kMillisecondsPerSecond / rate_)); - capture_loop_->PostDelayedTask( - FROM_HERE, - NewTracedMethod(this, &ScreenRecorder::DoCapture), - interval.InMilliseconds()); + static_cast<int>(base::Time::kMillisecondsPerSecond / max_rate_)); + capture_timer_.Start(interval, this, &ScreenRecorder::DoCapture); } void ScreenRecorder::DoCapture() { @@ -198,32 +170,22 @@ void ScreenRecorder::DoCapture() { // Make sure we have at most two oustanding recordings. We can simply return // if we can't make a capture now, the next capture will be started by the // end of an encode operation. - if (recordings_ >= 2 || !started_) { + if (recordings_ >= kMaxRecordings || !started_) { + frame_skipped_ = true; return; } - TraceContext::tracer()->PrintString("Capture Started"); - - base::Time now = base::Time::Now(); - base::TimeDelta interval = base::TimeDelta::FromMilliseconds( - static_cast<int>(base::Time::kMillisecondsPerSecond / rate_)); - base::TimeDelta elapsed = now - last_capture_time_; - // If this method is called sooner than the required interval we return - // immediately - if (elapsed < interval) { - return; + if (frame_skipped_) { + frame_skipped_ = false; + capture_timer_.Reset(); } + TraceContext::tracer()->PrintString("Capture Started"); + // At this point we are going to perform one capture so save the current time. - last_capture_time_ = now; ++recordings_; - // Before we actually do a capture, schedule the next one. - ScheduleNextCapture(); - // And finally perform one capture. - DCHECK(capturer()); - capturer()->CaptureInvalidRects( NewCallback(this, &ScreenRecorder::CaptureDoneCallback)); } @@ -239,7 +201,7 @@ void ScreenRecorder::CaptureDoneCallback( NewTracedMethod(this, &ScreenRecorder::DoEncode, capture_data)); } -void ScreenRecorder::DoFinishEncode() { +void ScreenRecorder::DoFinishSend() { DCHECK_EQ(capture_loop_, MessageLoop::current()); // Decrement the number of recording in process since we have completed @@ -248,103 +210,43 @@ void ScreenRecorder::DoFinishEncode() { // Try to do a capture again. Note that the following method may do nothing // if it is too early to perform a capture. - if (rate_ > 0) - DoCapture(); -} - -void ScreenRecorder::DoGetInitInfo( - scoped_refptr<ConnectionToClient> connection) { - DCHECK_EQ(capture_loop_, MessageLoop::current()); - - ScopedTracer tracer("init"); - - // Add the client to the list so it can receive update stream. - network_loop_->PostTask( - FROM_HERE, - NewTracedMethod(this, &ScreenRecorder::DoAddClient, connection)); + DoCapture(); } // Network thread -------------------------------------------------------------- -void ScreenRecorder::DoStartRateControl() { - DCHECK_EQ(network_loop_, MessageLoop::current()); - - if (rate_control_started_) { - NOTREACHED() << "Rate regulation already started"; - return; - } - rate_control_started_ = true; - ScheduleNextRateControl(); -} - -void ScreenRecorder::DoPauseRateControl() { +void ScreenRecorder::DoSendVideoPacket(VideoPacket* packet) { DCHECK_EQ(network_loop_, MessageLoop::current()); - if (!rate_control_started_) { - NOTREACHED() << "Rate regulation not started"; - return; - } - rate_control_started_ = false; -} - -void ScreenRecorder::ScheduleNextRateControl() { - ScopedTracer tracer("Rate Control"); - network_loop_->PostDelayedTask( - FROM_HERE, - NewTracedMethod(this, &ScreenRecorder::DoRateControl), - kRateControlInterval.InMilliseconds()); -} + TraceContext::tracer()->PrintString("DoSendVideoPacket"); -void ScreenRecorder::DoRateControl() { - DCHECK_EQ(network_loop_, MessageLoop::current()); + bool last = (packet->flags() & VideoPacket::LAST_PARTITION) != 0; - // If we have been paused then shutdown the rate regulation loop. - if (!rate_control_started_) - return; + for (ConnectionToClientList::const_iterator i = connections_.begin(); + i < connections_.end(); ++i) { + Task* done_task = NULL; - int max_pending_update_streams = 0; - for (size_t i = 0; i < connections_.size(); ++i) { - max_pending_update_streams = - std::max(max_pending_update_streams, - connections_[i]->video_stub()->GetPendingPackets()); - } + // Call OnFrameSent() only for the last packet in the first connection. + if (last && i == connections_.begin()) { + done_task = NewTracedMethod(this, &ScreenRecorder::OnFrameSent, packet); + } else { + done_task = new DeleteTask<VideoPacket>(packet); + } - // If |slow_down| equals zero, we have no slow down. - size_t slow_down = max_pending_update_streams / kSlowDownFactor; - // Set new_rate to -1 for checking later. - double new_rate = -1; - // If the slow down is too large. - if (slow_down >= arraysize(kRateDividers)) { - // Then we stop the capture completely. - new_rate = 0; - } else { - // Slow down the capture rate using the divider. - new_rate = max_rate_ / kRateDividers[slow_down]; + (*i)->video_stub()->ProcessVideoPacket(packet, done_task); } - DCHECK_NE(new_rate, -1.0); - // Then set the rate. - capture_loop_->PostTask( - FROM_HERE, - NewTracedMethod(this, &ScreenRecorder::DoSetRate, new_rate)); - ScheduleNextRateControl(); + TraceContext::tracer()->PrintString("DoSendVideoPacket done"); } -void ScreenRecorder::DoSendVideoPacket(VideoPacket* packet) { - DCHECK_EQ(network_loop_, MessageLoop::current()); - - TraceContext::tracer()->PrintString("DoSendUpdate"); - - for (ConnectionToClientList::const_iterator i = connections_.begin(); - i < connections_.end(); ++i) { - (*i)->video_stub()->ProcessVideoPacket( - packet, new DeleteTask<VideoPacket>(packet)); - } - - TraceContext::tracer()->PrintString("DoSendUpdate done"); +void ScreenRecorder::OnFrameSent(VideoPacket* packet) { + delete packet; + capture_loop_->PostTask( + FROM_HERE, NewTracedMethod(this, &ScreenRecorder::DoFinishSend)); } -void ScreenRecorder::DoAddClient(scoped_refptr<ConnectionToClient> connection) { +void ScreenRecorder::DoAddConnection( + scoped_refptr<ConnectionToClient> connection) { DCHECK_EQ(network_loop_, MessageLoop::current()); // TODO(hclam): Force a full frame for next encode. @@ -356,8 +258,8 @@ void ScreenRecorder::DoRemoveClient( DCHECK_EQ(network_loop_, MessageLoop::current()); // TODO(hclam): Is it correct to do to a scoped_refptr? - ConnectionToClientList::iterator it - = std::find(connections_.begin(), connections_.end(), connection); + ConnectionToClientList::iterator it = + std::find(connections_.begin(), connections_.end(), connection); if (it != connections_.end()) { connections_.erase(it); } @@ -380,14 +282,14 @@ void ScreenRecorder::DoEncode( // Early out if there's nothing to encode. if (!capture_data->dirty_rects().size()) { capture_loop_->PostTask( - FROM_HERE, NewTracedMethod(this, &ScreenRecorder::DoFinishEncode)); + FROM_HERE, NewTracedMethod(this, &ScreenRecorder::DoFinishSend)); return; } // TODO(hclam): Enable |force_refresh| if a new connection was // added. TraceContext::tracer()->PrintString("Encode start"); - encoder_->Encode(capture_data, false, + encoder()->Encode(capture_data, false, NewCallback(this, &ScreenRecorder::EncodeDataAvailableTask)); TraceContext::tracer()->PrintString("Encode Done"); } @@ -395,20 +297,9 @@ void ScreenRecorder::DoEncode( void ScreenRecorder::EncodeDataAvailableTask(VideoPacket* packet) { DCHECK_EQ(encode_loop_, MessageLoop::current()); - bool last = (packet->flags() & VideoPacket::LAST_PACKET) != 0; - - // Before a new encode task starts, notify connected clients a new update - // stream is coming. - // Notify this will keep a reference to the DataBuffer in the - // task. The ownership will eventually pass to the ConnectionToClients. network_loop_->PostTask( FROM_HERE, NewTracedMethod(this, &ScreenRecorder::DoSendVideoPacket, packet)); - - if (last) { - capture_loop_->PostTask( - FROM_HERE, NewTracedMethod(this, &ScreenRecorder::DoFinishEncode)); - } } } // namespace remoting diff --git a/remoting/host/screen_recorder.h b/remoting/host/screen_recorder.h index 2cd02b1..3a844eb 100644 --- a/remoting/host/screen_recorder.h +++ b/remoting/host/screen_recorder.h @@ -11,7 +11,7 @@ #include "base/message_loop.h" #include "base/ref_counted.h" #include "base/scoped_ptr.h" -#include "base/time.h" +#include "base/timer.h" #include "remoting/base/encoder.h" #include "remoting/host/capturer.h" #include "remoting/proto/video.pb.h" @@ -105,44 +105,39 @@ class ScreenRecorder : public base::RefCountedThreadSafe<ScreenRecorder> { void DoStart(); void DoPause(); - void DoSetRate(double rate); void DoSetMaxRate(double max_rate); // Hepler method to schedule next capture using the current rate. - void ScheduleNextCapture(); + void StartCaptureTimer(); void DoCapture(); void CaptureDoneCallback(scoped_refptr<CaptureData> capture_data); - void DoFinishEncode(); - - void DoGetInitInfo(scoped_refptr<protocol::ConnectionToClient> client); + void DoFinishSend(); // Network thread ----------------------------------------------------------- - void DoStartRateControl(); - void DoPauseRateControl(); - - // Helper method to schedule next rate regulation task. - void ScheduleNextRateControl(); - - void DoRateControl(); - - // DoSendUpdate takes ownership of header and is responsible for deleting it. + // DoSendVideoPacket takes ownership of the |packet| and is responsible + // for deleting it. void DoSendVideoPacket(VideoPacket* packet); + void DoSendInit(scoped_refptr<protocol::ConnectionToClient> connection, int width, int height); - void DoAddClient(scoped_refptr<protocol::ConnectionToClient> connection); + void DoAddConnection(scoped_refptr<protocol::ConnectionToClient> connection); void DoRemoveClient(scoped_refptr<protocol::ConnectionToClient> connection); void DoRemoveAllClients(); + // Callback for the last packet in one update. Deletes |packet| and + // schedules next screen capture. + void OnFrameSent(VideoPacket* packet); + // Encoder thread ----------------------------------------------------------- void DoEncode(scoped_refptr<CaptureData> capture_data); - // EncodeDataAvailableTask takes ownership of header and is responsible for - // deleting it. + // EncodeDataAvailableTask takes ownership of |packet|. void EncodeDataAvailableTask(VideoPacket* packet); + void SendVideoPacket(VideoPacket* packet); // Message loops used by this class. MessageLoop* capture_loop_; @@ -166,18 +161,20 @@ class ScreenRecorder : public base::RefCountedThreadSafe<ScreenRecorder> { ConnectionToClientList connections_; // The following members are accessed on the capture thread. - double rate_; // Number of captures to perform every second. bool started_; - base::Time last_capture_time_; // Saves the time last capture started. - int recordings_; // Count the number of recordings - // (i.e. capture or encode) happening. - // The maximum rate is written on the capture thread and read on the network - // thread. - double max_rate_; // Number of captures to perform every second. + // Timer that calls DoCapture. + base::RepeatingTimer<ScreenRecorder> capture_timer_; + + // Count the number of recordings (i.e. capture or encode) happening. + int recordings_; + + // Set to true if we've skipped last capture because there are too + // many pending frames. + int frame_skipped_; - // The following member is accessed on the network thread. - bool rate_control_started_; + // Number of captures to perform every second. Written on the capture thread. + double max_rate_; DISALLOW_COPY_AND_ASSIGN(ScreenRecorder); }; diff --git a/remoting/host/screen_recorder_unittest.cc b/remoting/host/screen_recorder_unittest.cc index 848226f..4dcfe13 100644 --- a/remoting/host/screen_recorder_unittest.cc +++ b/remoting/host/screen_recorder_unittest.cc @@ -19,6 +19,7 @@ using ::testing::_; using ::testing::AtLeast; using ::testing::NotNull; using ::testing::Return; +using ::testing::SaveArg; namespace remoting { @@ -102,8 +103,12 @@ TEST_F(ScreenRecorderTest, OneRecordCycle) { EXPECT_CALL(*connection_, video_stub()) .WillRepeatedly(Return(&video_stub)); + Task* done_task = NULL; + // Expect the client be notified. - EXPECT_CALL(video_stub, ProcessVideoPacket(_, _)); + EXPECT_CALL(video_stub, ProcessVideoPacket(_, _)) + .Times(1) + .WillOnce(SaveArg<1>(&done_task)); EXPECT_CALL(video_stub, GetPendingPackets()) .Times(AtLeast(0)) .WillRepeatedly(Return(0)); @@ -113,6 +118,9 @@ TEST_F(ScreenRecorderTest, OneRecordCycle) { // Make sure all tasks are completed. message_loop_.RunAllPending(); + + done_task->Run(); + delete done_task; } // TODO(hclam): Add test for double buffering. diff --git a/remoting/proto/video.proto b/remoting/proto/video.proto index e1cf5cf..394f891 100644 --- a/remoting/proto/video.proto +++ b/remoting/proto/video.proto @@ -35,14 +35,17 @@ message VideoPacketFormat { message VideoPacket { // Bitmasks for use in the flags field below. // - // The encoder may fragment one update into multiple packets depending on - // how the encoder outputs data. Thus, one update can logically consist of - // multiple packets. The FIRST_PACKET and LAST_PACKET flags are used to - // indicate the start and end of a logical update. Here are notable - // consequences: + // The encoder may fragment one update into multiple partitions. + // Each partition may be divided into multiple packets depending on + // how the encoder outputs data. Thus, one update can logically + // consist of multiple packets. The FIRST_PACKET and LAST_PACKET + // flags are used to indicate the start and end of a partition. The + // LAST_PARTITION flag is set for the last packet in the last + // partition. Here are notable consequences: // * Both FIRST_PACKET and LAST_PACKET may be set if an update is only // one packet long. // * The VideoPacketFormat is only supplied in a FIRST_PACKET. + // * LAST_PARTITION can be set only in packet that has LAST_PACKET set. // * An local update cannot change format between a FIRST_PACKET and // a LAST_PACKET. // * All packets in one logical update must be processed in order, and @@ -50,6 +53,7 @@ message VideoPacket { enum Flags { FIRST_PACKET = 1; LAST_PACKET = 2; + LAST_PARTITION = 4; } optional int32 flags = 1 [default = 0]; diff --git a/remoting/protocol/buffered_socket_writer.cc b/remoting/protocol/buffered_socket_writer.cc index 0cf39bc..11b8395 100644 --- a/remoting/protocol/buffered_socket_writer.cc +++ b/remoting/protocol/buffered_socket_writer.cc @@ -5,11 +5,34 @@ #include "remoting/protocol/buffered_socket_writer.h" #include "base/message_loop.h" +#include "base/stl_util-inl.h" #include "net/base/net_errors.h" namespace remoting { namespace protocol { +class BufferedSocketWriterBase::PendingPacket { + public: + PendingPacket(scoped_refptr<net::IOBufferWithSize> data, Task* done_task) + : data_(data), + done_task_(done_task) { + } + ~PendingPacket() { + if (done_task_.get()) + done_task_->Run(); + } + + net::IOBufferWithSize* data() { + return data_; + } + + private: + scoped_refptr<net::IOBufferWithSize> data_; + scoped_ptr<Task> done_task_; + + DISALLOW_COPY_AND_ASSIGN(PendingPacket); +}; + BufferedSocketWriterBase::BufferedSocketWriterBase() : buffer_size_(0), socket_(NULL), @@ -32,11 +55,11 @@ void BufferedSocketWriterBase::Init(net::Socket* socket, } bool BufferedSocketWriterBase::Write( - scoped_refptr<net::IOBufferWithSize> data) { + scoped_refptr<net::IOBufferWithSize> data, Task* done_task) { AutoLock auto_lock(lock_); if (!socket_) return false; - queue_.push(data); + queue_.push_back(new PendingPacket(data, done_task)); buffer_size_ += data->size(); message_loop_->PostTask( FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriterBase::DoWrite)); @@ -112,9 +135,7 @@ void BufferedSocketWriterBase::OnWritten(int result) { void BufferedSocketWriterBase::HandleError(int result) { AutoLock auto_lock(lock_); closed_ = true; - while (!queue_.empty()) { - queue_.pop(); - } + STLDeleteElements(&queue_); // Notify subclass that an error is received. OnError_Locked(result); @@ -135,19 +156,27 @@ void BufferedSocketWriterBase::Close() { closed_ = true; } +void BufferedSocketWriterBase::PopQueue() { + // This also calls |done_task|. + delete queue_.front(); + queue_.pop_front(); +} + BufferedSocketWriter::BufferedSocketWriter() { } -BufferedSocketWriter::~BufferedSocketWriter() { } + +BufferedSocketWriter::~BufferedSocketWriter() { + STLDeleteElements(&queue_); +} void BufferedSocketWriter::GetNextPacket_Locked( net::IOBuffer** buffer, int* size) { - while (!current_buf_ || current_buf_->BytesRemaining() == 0) { + if (!current_buf_) { if (queue_.empty()) { *buffer = NULL; return; // Nothing to write. } - current_buf_ = - new net::DrainableIOBuffer(queue_.front(), queue_.front()->size()); - queue_.pop(); + current_buf_ = new net::DrainableIOBuffer( + queue_.front()->data(), queue_.front()->data()->size()); } *buffer = current_buf_; @@ -157,6 +186,11 @@ void BufferedSocketWriter::GetNextPacket_Locked( void BufferedSocketWriter::AdvanceBufferPosition_Locked(int written) { buffer_size_ -= written; current_buf_->DidConsume(written); + + if (current_buf_->BytesRemaining() == 0) { + PopQueue(); + current_buf_ = NULL; + } } void BufferedSocketWriter::OnError_Locked(int result) { @@ -172,14 +206,14 @@ void BufferedDatagramWriter::GetNextPacket_Locked( *buffer = NULL; return; // Nothing to write. } - *buffer = queue_.front(); - *size = queue_.front()->size(); + *buffer = queue_.front()->data(); + *size = queue_.front()->data()->size(); } void BufferedDatagramWriter::AdvanceBufferPosition_Locked(int written) { - DCHECK_EQ(written, queue_.front()->size()); - buffer_size_ -= queue_.front()->size(); - queue_.pop(); + DCHECK_EQ(written, queue_.front()->data()->size()); + buffer_size_ -= queue_.front()->data()->size(); + PopQueue(); } void BufferedDatagramWriter::OnError_Locked(int result) { diff --git a/remoting/protocol/buffered_socket_writer.h b/remoting/protocol/buffered_socket_writer.h index c786c44..3ea127e 100644 --- a/remoting/protocol/buffered_socket_writer.h +++ b/remoting/protocol/buffered_socket_writer.h @@ -5,7 +5,7 @@ #ifndef REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_ #define REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_ -#include <queue> +#include <list> #include "base/lock.h" #include "base/ref_counted.h" @@ -13,6 +13,7 @@ #include "net/socket/socket.h" class MessageLoop; +class Task; namespace net { class Socket; @@ -45,7 +46,7 @@ class BufferedSocketWriterBase // Puts a new data chunk in the buffer. Returns false and doesn't enqueue // the data if called before Init(). Can be called on any thread. - bool Write(scoped_refptr<net::IOBufferWithSize> buffer); + bool Write(scoped_refptr<net::IOBufferWithSize> buffer, Task* done_task); // Returns current size of the buffer. Can be called on any thread. int GetBufferSize(); @@ -58,11 +59,16 @@ class BufferedSocketWriterBase void Close(); protected: - typedef std::queue<scoped_refptr<net::IOBufferWithSize> > DataQueue; + class PendingPacket; + typedef std::list<PendingPacket*> DataQueue; DataQueue queue_; int buffer_size_; + // Removes element from the front of the queue and calls |done_task| + // for that element. + void PopQueue(); + // Following three methods must be implemented in child classes. // GetNextPacket() returns next packet that needs to be written to the // socket. |buffer| must be set to NULL if there is nothing left in the queue. diff --git a/remoting/protocol/client_control_sender.cc b/remoting/protocol/client_control_sender.cc index d1fd2f5..c16d235 100644 --- a/remoting/protocol/client_control_sender.cc +++ b/remoting/protocol/client_control_sender.cc @@ -28,9 +28,7 @@ void ClientControlSender::NotifyResolution( const NotifyResolutionRequest* msg, Task* done) { protocol::ControlMessage message; message.mutable_notify_resolution()->CopyFrom(*msg); - buffered_writer_->Write(SerializeAndFrameMessage(message)); - done->Run(); - delete done; + buffered_writer_->Write(SerializeAndFrameMessage(message), done); } } // namespace protocol diff --git a/remoting/protocol/input_sender.cc b/remoting/protocol/input_sender.cc index e45149e..8831649 100644 --- a/remoting/protocol/input_sender.cc +++ b/remoting/protocol/input_sender.cc @@ -31,9 +31,7 @@ void InputSender::InjectKeyEvent(const KeyEvent* event, Task* done) { // TODO(hclam): Provide timestamp. evt->set_timestamp(0); evt->mutable_key()->CopyFrom(*event); - buffered_writer_->Write(SerializeAndFrameMessage(message)); - done->Run(); - delete done; + buffered_writer_->Write(SerializeAndFrameMessage(message), done); } void InputSender::InjectMouseEvent(const MouseEvent* event, Task* done) { @@ -42,9 +40,7 @@ void InputSender::InjectMouseEvent(const MouseEvent* event, Task* done) { // TODO(hclam): Provide timestamp. evt->set_timestamp(0); evt->mutable_mouse()->CopyFrom(*event); - buffered_writer_->Write(SerializeAndFrameMessage(message)); - done->Run(); - delete done; + buffered_writer_->Write(SerializeAndFrameMessage(message), done); } } // namespace protocol diff --git a/remoting/protocol/protobuf_video_writer.cc b/remoting/protocol/protobuf_video_writer.cc index 73a9b0d..025445d 100644 --- a/remoting/protocol/protobuf_video_writer.cc +++ b/remoting/protocol/protobuf_video_writer.cc @@ -25,9 +25,7 @@ void ProtobufVideoWriter::Init(protocol::Session* session) { void ProtobufVideoWriter::ProcessVideoPacket(const VideoPacket* packet, Task* done) { - buffered_writer_->Write(SerializeAndFrameMessage(*packet)); - done->Run(); - delete done; + buffered_writer_->Write(SerializeAndFrameMessage(*packet), done); } int ProtobufVideoWriter::GetPendingPackets() { diff --git a/remoting/protocol/rtcp_writer.cc b/remoting/protocol/rtcp_writer.cc index c7d53f0..1996665 100644 --- a/remoting/protocol/rtcp_writer.cc +++ b/remoting/protocol/rtcp_writer.cc @@ -32,7 +32,7 @@ void RtcpWriter::SendReport(const RtcpReceiverReport& report) { PackRtcpReceiverReport(report, reinterpret_cast<uint8*>(buffer->data()), size); - buffered_rtcp_writer_->Write(buffer); + buffered_rtcp_writer_->Write(buffer, NULL); } } // namespace protocol diff --git a/remoting/protocol/rtp_writer.cc b/remoting/protocol/rtp_writer.cc index 7341741..e26bdb7 100644 --- a/remoting/protocol/rtp_writer.cc +++ b/remoting/protocol/rtp_writer.cc @@ -69,7 +69,7 @@ void RtpWriter::SendPacket(uint32 timestamp, bool marker, payload_size); // And write the packet. - buffered_rtp_writer_->Write(buffer); + buffered_rtp_writer_->Write(buffer, NULL); } int RtpWriter::GetPendingPackets() { |