summaryrefslogtreecommitdiffstats
path: root/remoting
diff options
context:
space:
mode:
Diffstat (limited to 'remoting')
-rw-r--r--remoting/base/codec_test.cc5
-rw-r--r--remoting/base/decoder_row_based.cc34
-rw-r--r--remoting/base/decoder_row_based.h3
-rw-r--r--remoting/base/encoder_row_based.cc10
-rw-r--r--remoting/base/encoder_row_based.h2
-rw-r--r--remoting/base/encoder_vp8.cc4
-rw-r--r--remoting/host/screen_recorder.cc237
-rw-r--r--remoting/host/screen_recorder.h51
-rw-r--r--remoting/host/screen_recorder_unittest.cc10
-rw-r--r--remoting/proto/video.proto14
-rw-r--r--remoting/protocol/buffered_socket_writer.cc64
-rw-r--r--remoting/protocol/buffered_socket_writer.h12
-rw-r--r--remoting/protocol/client_control_sender.cc4
-rw-r--r--remoting/protocol/input_sender.cc8
-rw-r--r--remoting/protocol/protobuf_video_writer.cc4
-rw-r--r--remoting/protocol/rtcp_writer.cc2
-rw-r--r--remoting/protocol/rtp_writer.cc2
17 files changed, 217 insertions, 249 deletions
diff --git a/remoting/base/codec_test.cc b/remoting/base/codec_test.cc
index 0521b52d..0c59659 100644
--- a/remoting/base/codec_test.cc
+++ b/remoting/base/codec_test.cc
@@ -81,6 +81,11 @@ class EncoderMessageTester {
state_ = kWaitingForBeginRect;
++end_rect_;
}
+
+ if ((packet->flags() & VideoPacket::LAST_PARTITION) != 0) {
+ // LAST_PARTITION must always be marked with LAST_PACKET.
+ EXPECT_TRUE((packet->flags() & VideoPacket::LAST_PACKET) != 0);
+ }
}
}
diff --git a/remoting/base/decoder_row_based.cc b/remoting/base/decoder_row_based.cc
index a657331..25e738f 100644
--- a/remoting/base/decoder_row_based.cc
+++ b/remoting/base/decoder_row_based.cc
@@ -46,10 +46,22 @@ void DecoderRowBased::Reset() {
frame_ = NULL;
decompressor_->Reset();
state_ = kUninitialized;
+ updated_rects_.clear();
}
bool DecoderRowBased::IsReadyForData() {
- return state_ == kReady || state_ == kProcessing || state_ == kDone;
+ switch (state_) {
+ case kUninitialized:
+ case kError:
+ return false;
+ case kReady:
+ case kProcessing:
+ case kPartitionDone:
+ case kDone:
+ return true;
+ }
+ NOTREACHED();
+ return false;
}
void DecoderRowBased::Initialize(scoped_refptr<media::VideoFrame> frame) {
@@ -119,14 +131,18 @@ Decoder::DecodeResult DecoderRowBased::DecodePacket(const VideoPacket* packet) {
}
}
- if (state_ == kDone) {
+ if (state_ == kPartitionDone || state_ == kDone) {
if (row_y_ < clip_.height()) {
state_ = kError;
LOG(WARNING) << "Received LAST_PACKET, but didn't get enough data.";
return DECODE_ERROR;
}
+ updated_rects_.push_back(clip_);
decompressor_->Reset();
+ }
+
+ if (state_ == kDone) {
return DECODE_DONE;
} else {
return DECODE_IN_PROGRESS;
@@ -139,7 +155,7 @@ void DecoderRowBased::UpdateStateForPacket(const VideoPacket* packet) {
}
if (packet->flags() & VideoPacket::FIRST_PACKET) {
- if (state_ != kReady && state_ != kDone) {
+ if (state_ != kReady && state_ != kDone && state_ != kPartitionDone) {
state_ = kError;
LOG(WARNING) << "Received unexpected FIRST_PACKET.";
return;
@@ -165,6 +181,15 @@ void DecoderRowBased::UpdateStateForPacket(const VideoPacket* packet) {
LOG(WARNING) << "Received unexpected LAST_PACKET.";
return;
}
+ state_ = kPartitionDone;
+ }
+
+ if (packet->flags() & VideoPacket::LAST_PARTITION) {
+ if (state_ != kPartitionDone) {
+ state_ = kError;
+ LOG(WARNING) << "Received unexpected LAST_PARTITION.";
+ return;
+ }
state_ = kDone;
}
@@ -172,7 +197,8 @@ void DecoderRowBased::UpdateStateForPacket(const VideoPacket* packet) {
}
void DecoderRowBased::GetUpdatedRects(UpdatedRects* rects) {
- rects->push_back(clip_);
+ rects->swap(updated_rects_);
+ updated_rects_.clear();
}
VideoPacketFormat::Encoding DecoderRowBased::Encoding() {
diff --git a/remoting/base/decoder_row_based.h b/remoting/base/decoder_row_based.h
index da05c05..05c2e3c 100644
--- a/remoting/base/decoder_row_based.h
+++ b/remoting/base/decoder_row_based.h
@@ -36,6 +36,7 @@ class DecoderRowBased : public Decoder {
kUninitialized,
kReady,
kProcessing,
+ kPartitionDone,
kDone,
kError,
};
@@ -70,6 +71,8 @@ class DecoderRowBased : public Decoder {
// True if we should decode the image upside down.
bool reverse_rows_;
+ UpdatedRects updated_rects_;
+
DISALLOW_COPY_AND_ASSIGN(DecoderRowBased);
};
diff --git a/remoting/base/encoder_row_based.cc b/remoting/base/encoder_row_based.cc
index 6a797b2..d9dcb10 100644
--- a/remoting/base/encoder_row_based.cc
+++ b/remoting/base/encoder_row_based.cc
@@ -65,17 +65,15 @@ void EncoderRowBased::Encode(scoped_refptr<CaptureData> capture_data,
callback_.reset(data_available_callback);
const InvalidRects& rects = capture_data->dirty_rects();
- int index = 0;
- for (InvalidRects::const_iterator r = rects.begin();
- r != rects.end(); ++r, ++index) {
- EncodeRect(*r, index);
+ for (InvalidRects::const_iterator r = rects.begin(); r != rects.end(); ++r) {
+ EncodeRect(*r, r == --rects.end());
}
capture_data_ = NULL;
callback_.reset();
}
-void EncoderRowBased::EncodeRect(const gfx::Rect& rect, size_t rect_index) {
+void EncoderRowBased::EncodeRect(const gfx::Rect& rect, bool last) {
CHECK(capture_data_->data_planes().data[0]);
const int strides = capture_data_->data_planes().strides[0];
const int bytes_per_pixel = GetBytesPerPixel(capture_data_->pixel_format());
@@ -117,6 +115,8 @@ void EncoderRowBased::EncodeRect(const gfx::Rect& rect, size_t rect_index) {
// We have reached the end of stream.
if (!compress_again) {
packet->set_flags(packet->flags() | VideoPacket::LAST_PACKET);
+ if (last)
+ packet->set_flags(packet->flags() | VideoPacket::LAST_PARTITION);
DCHECK(row_pos == row_size);
DCHECK(row_y == rect.height() - 1);
}
diff --git a/remoting/base/encoder_row_based.h b/remoting/base/encoder_row_based.h
index 1dfd916..11ebd9a 100644
--- a/remoting/base/encoder_row_based.h
+++ b/remoting/base/encoder_row_based.h
@@ -41,7 +41,7 @@ class EncoderRowBased : public Encoder {
int packet_size);
// Encode a single dirty rect using compressor.
- void EncodeRect(const gfx::Rect& rect, size_t rect_index);
+ void EncodeRect(const gfx::Rect& rect, bool last);
// Marks a packet as the first in a series of rectangle updates.
void PrepareUpdateStart(const gfx::Rect& rect, VideoPacket* packet);
diff --git a/remoting/base/encoder_vp8.cc b/remoting/base/encoder_vp8.cc
index ea02add..6d07e54 100644
--- a/remoting/base/encoder_vp8.cc
+++ b/remoting/base/encoder_vp8.cc
@@ -189,6 +189,7 @@ void EncoderVp8::Encode(scoped_refptr<CaptureData> capture_data,
switch (packet->kind) {
case VPX_CODEC_CX_FRAME_PKT:
got_data = true;
+ // TODO(sergeyu): Split each frame into multiple partitions.
message->set_data(packet->data.frame.buf, packet->data.frame.sz);
break;
default:
@@ -197,7 +198,8 @@ void EncoderVp8::Encode(scoped_refptr<CaptureData> capture_data,
}
message->mutable_format()->set_encoding(VideoPacketFormat::ENCODING_VP8);
- message->set_flags(VideoPacket::FIRST_PACKET | VideoPacket::LAST_PACKET);
+ message->set_flags(VideoPacket::FIRST_PACKET | VideoPacket::LAST_PACKET |
+ VideoPacket::LAST_PARTITION);
data_available_callback->Run(message);
delete data_available_callback;
diff --git a/remoting/host/screen_recorder.cc b/remoting/host/screen_recorder.cc
index a972c1a..cf3417c 100644
--- a/remoting/host/screen_recorder.cc
+++ b/remoting/host/screen_recorder.cc
@@ -10,6 +10,7 @@
#include "base/scoped_ptr.h"
#include "base/stl_util-inl.h"
#include "base/task.h"
+#include "base/time.h"
#include "remoting/base/capture_data.h"
#include "remoting/base/tracer.h"
#include "remoting/proto/control.pb.h"
@@ -27,17 +28,11 @@ namespace remoting {
// experiment to provide good latency.
static const double kDefaultCaptureRate = 20.0;
-// Interval that we perform rate regulation.
-static const base::TimeDelta kRateControlInterval =
- base::TimeDelta::FromSeconds(1);
-
-// We divide the pending update stream number by this value to determine the
-// rate divider.
-static const int kSlowDownFactor = 10;
-
-// A list of dividers used to divide the max rate to determine the current
-// capture rate.
-static const int kRateDividers[] = {1, 2, 4, 8, 16};
+// Maximum number of frames that can be processed similtaneously.
+// TODO(sergeyu): Should this be set to 1? Or should we change
+// dynamically depending on how fast network and CPU are? Experement
+// with it.
+static const int kMaxRecordings = 2;
ScreenRecorder::ScreenRecorder(
MessageLoop* capture_loop,
@@ -50,11 +45,10 @@ ScreenRecorder::ScreenRecorder(
network_loop_(network_loop),
capturer_(capturer),
encoder_(encoder),
- rate_(kDefaultCaptureRate),
started_(false),
recordings_(0),
- max_rate_(kDefaultCaptureRate),
- rate_control_started_(false) {
+ frame_skipped_(false),
+ max_rate_(kDefaultCaptureRate) {
DCHECK(capture_loop_);
DCHECK(encode_loop_);
DCHECK(network_loop_);
@@ -83,10 +77,12 @@ void ScreenRecorder::SetMaxRate(double rate) {
void ScreenRecorder::AddConnection(
scoped_refptr<ConnectionToClient> connection) {
- // Gets the init information for the connection.
- capture_loop_->PostTask(
+ ScopedTracer tracer("AddConnection");
+
+ // Add the client to the list so it can receive update stream.
+ network_loop_->PostTask(
FROM_HERE,
- NewTracedMethod(this, &ScreenRecorder::DoGetInitInfo, connection));
+ NewTracedMethod(this, &ScreenRecorder::DoAddConnection, connection));
}
void ScreenRecorder::RemoveConnection(
@@ -106,11 +102,13 @@ void ScreenRecorder::RemoveAllConnections() {
Capturer* ScreenRecorder::capturer() {
DCHECK_EQ(capture_loop_, MessageLoop::current());
+ DCHECK(capturer_.get());
return capturer_.get();
}
Encoder* ScreenRecorder::encoder() {
DCHECK_EQ(encode_loop_, MessageLoop::current());
+ DCHECK(encoder_.get());
return encoder_.get();
}
@@ -118,6 +116,7 @@ Encoder* ScreenRecorder::encoder() {
void ScreenRecorder::DoStart() {
DCHECK_EQ(capture_loop_, MessageLoop::current());
+ DCHECK(!started_);
if (started_) {
NOTREACHED() << "Record session already started.";
@@ -125,12 +124,10 @@ void ScreenRecorder::DoStart() {
}
started_ = true;
- DoCapture();
+ StartCaptureTimer();
- // Starts the rate regulation.
- network_loop_->PostTask(
- FROM_HERE,
- NewTracedMethod(this, &ScreenRecorder::DoStartRateControl));
+ // Capture first frame immedately.
+ DoCapture();
}
void ScreenRecorder::DoPause() {
@@ -141,56 +138,31 @@ void ScreenRecorder::DoPause() {
return;
}
+ capture_timer_.Stop();
started_ = false;
-
- // Pause the rate regulation.
- network_loop_->PostTask(
- FROM_HERE,
- NewTracedMethod(this, &ScreenRecorder::DoPauseRateControl));
-}
-
-void ScreenRecorder::DoSetRate(double rate) {
- DCHECK_EQ(capture_loop_, MessageLoop::current());
- if (rate == rate_)
- return;
-
- // Change the current capture rate.
- rate_ = rate;
-
- // If we have already started then schedule the next capture with the new
- // rate.
- if (started_)
- ScheduleNextCapture();
}
void ScreenRecorder::DoSetMaxRate(double max_rate) {
DCHECK_EQ(capture_loop_, MessageLoop::current());
// TODO(hclam): Should also check for small epsilon.
- if (max_rate != 0) {
- max_rate_ = max_rate;
- DoSetRate(max_rate);
- } else {
- NOTREACHED() << "Rate is too small.";
+ DCHECK_GT(max_rate, 0.0) << "Rate is too small.";
+
+ max_rate_ = max_rate;
+
+ // Restart the timer with the new rate.
+ if (started_) {
+ capture_timer_.Stop();
+ StartCaptureTimer();
}
}
-void ScreenRecorder::ScheduleNextCapture() {
+void ScreenRecorder::StartCaptureTimer() {
DCHECK_EQ(capture_loop_, MessageLoop::current());
- ScopedTracer tracer("capture");
-
- TraceContext::tracer()->PrintString("Capture Scheduled");
-
- if (rate_ == 0)
- return;
-
base::TimeDelta interval = base::TimeDelta::FromMilliseconds(
- static_cast<int>(base::Time::kMillisecondsPerSecond / rate_));
- capture_loop_->PostDelayedTask(
- FROM_HERE,
- NewTracedMethod(this, &ScreenRecorder::DoCapture),
- interval.InMilliseconds());
+ static_cast<int>(base::Time::kMillisecondsPerSecond / max_rate_));
+ capture_timer_.Start(interval, this, &ScreenRecorder::DoCapture);
}
void ScreenRecorder::DoCapture() {
@@ -198,32 +170,22 @@ void ScreenRecorder::DoCapture() {
// Make sure we have at most two oustanding recordings. We can simply return
// if we can't make a capture now, the next capture will be started by the
// end of an encode operation.
- if (recordings_ >= 2 || !started_) {
+ if (recordings_ >= kMaxRecordings || !started_) {
+ frame_skipped_ = true;
return;
}
- TraceContext::tracer()->PrintString("Capture Started");
-
- base::Time now = base::Time::Now();
- base::TimeDelta interval = base::TimeDelta::FromMilliseconds(
- static_cast<int>(base::Time::kMillisecondsPerSecond / rate_));
- base::TimeDelta elapsed = now - last_capture_time_;
- // If this method is called sooner than the required interval we return
- // immediately
- if (elapsed < interval) {
- return;
+ if (frame_skipped_) {
+ frame_skipped_ = false;
+ capture_timer_.Reset();
}
+ TraceContext::tracer()->PrintString("Capture Started");
+
// At this point we are going to perform one capture so save the current time.
- last_capture_time_ = now;
++recordings_;
- // Before we actually do a capture, schedule the next one.
- ScheduleNextCapture();
-
// And finally perform one capture.
- DCHECK(capturer());
-
capturer()->CaptureInvalidRects(
NewCallback(this, &ScreenRecorder::CaptureDoneCallback));
}
@@ -239,7 +201,7 @@ void ScreenRecorder::CaptureDoneCallback(
NewTracedMethod(this, &ScreenRecorder::DoEncode, capture_data));
}
-void ScreenRecorder::DoFinishEncode() {
+void ScreenRecorder::DoFinishSend() {
DCHECK_EQ(capture_loop_, MessageLoop::current());
// Decrement the number of recording in process since we have completed
@@ -248,103 +210,43 @@ void ScreenRecorder::DoFinishEncode() {
// Try to do a capture again. Note that the following method may do nothing
// if it is too early to perform a capture.
- if (rate_ > 0)
- DoCapture();
-}
-
-void ScreenRecorder::DoGetInitInfo(
- scoped_refptr<ConnectionToClient> connection) {
- DCHECK_EQ(capture_loop_, MessageLoop::current());
-
- ScopedTracer tracer("init");
-
- // Add the client to the list so it can receive update stream.
- network_loop_->PostTask(
- FROM_HERE,
- NewTracedMethod(this, &ScreenRecorder::DoAddClient, connection));
+ DoCapture();
}
// Network thread --------------------------------------------------------------
-void ScreenRecorder::DoStartRateControl() {
- DCHECK_EQ(network_loop_, MessageLoop::current());
-
- if (rate_control_started_) {
- NOTREACHED() << "Rate regulation already started";
- return;
- }
- rate_control_started_ = true;
- ScheduleNextRateControl();
-}
-
-void ScreenRecorder::DoPauseRateControl() {
+void ScreenRecorder::DoSendVideoPacket(VideoPacket* packet) {
DCHECK_EQ(network_loop_, MessageLoop::current());
- if (!rate_control_started_) {
- NOTREACHED() << "Rate regulation not started";
- return;
- }
- rate_control_started_ = false;
-}
-
-void ScreenRecorder::ScheduleNextRateControl() {
- ScopedTracer tracer("Rate Control");
- network_loop_->PostDelayedTask(
- FROM_HERE,
- NewTracedMethod(this, &ScreenRecorder::DoRateControl),
- kRateControlInterval.InMilliseconds());
-}
+ TraceContext::tracer()->PrintString("DoSendVideoPacket");
-void ScreenRecorder::DoRateControl() {
- DCHECK_EQ(network_loop_, MessageLoop::current());
+ bool last = (packet->flags() & VideoPacket::LAST_PARTITION) != 0;
- // If we have been paused then shutdown the rate regulation loop.
- if (!rate_control_started_)
- return;
+ for (ConnectionToClientList::const_iterator i = connections_.begin();
+ i < connections_.end(); ++i) {
+ Task* done_task = NULL;
- int max_pending_update_streams = 0;
- for (size_t i = 0; i < connections_.size(); ++i) {
- max_pending_update_streams =
- std::max(max_pending_update_streams,
- connections_[i]->video_stub()->GetPendingPackets());
- }
+ // Call OnFrameSent() only for the last packet in the first connection.
+ if (last && i == connections_.begin()) {
+ done_task = NewTracedMethod(this, &ScreenRecorder::OnFrameSent, packet);
+ } else {
+ done_task = new DeleteTask<VideoPacket>(packet);
+ }
- // If |slow_down| equals zero, we have no slow down.
- size_t slow_down = max_pending_update_streams / kSlowDownFactor;
- // Set new_rate to -1 for checking later.
- double new_rate = -1;
- // If the slow down is too large.
- if (slow_down >= arraysize(kRateDividers)) {
- // Then we stop the capture completely.
- new_rate = 0;
- } else {
- // Slow down the capture rate using the divider.
- new_rate = max_rate_ / kRateDividers[slow_down];
+ (*i)->video_stub()->ProcessVideoPacket(packet, done_task);
}
- DCHECK_NE(new_rate, -1.0);
- // Then set the rate.
- capture_loop_->PostTask(
- FROM_HERE,
- NewTracedMethod(this, &ScreenRecorder::DoSetRate, new_rate));
- ScheduleNextRateControl();
+ TraceContext::tracer()->PrintString("DoSendVideoPacket done");
}
-void ScreenRecorder::DoSendVideoPacket(VideoPacket* packet) {
- DCHECK_EQ(network_loop_, MessageLoop::current());
-
- TraceContext::tracer()->PrintString("DoSendUpdate");
-
- for (ConnectionToClientList::const_iterator i = connections_.begin();
- i < connections_.end(); ++i) {
- (*i)->video_stub()->ProcessVideoPacket(
- packet, new DeleteTask<VideoPacket>(packet));
- }
-
- TraceContext::tracer()->PrintString("DoSendUpdate done");
+void ScreenRecorder::OnFrameSent(VideoPacket* packet) {
+ delete packet;
+ capture_loop_->PostTask(
+ FROM_HERE, NewTracedMethod(this, &ScreenRecorder::DoFinishSend));
}
-void ScreenRecorder::DoAddClient(scoped_refptr<ConnectionToClient> connection) {
+void ScreenRecorder::DoAddConnection(
+ scoped_refptr<ConnectionToClient> connection) {
DCHECK_EQ(network_loop_, MessageLoop::current());
// TODO(hclam): Force a full frame for next encode.
@@ -356,8 +258,8 @@ void ScreenRecorder::DoRemoveClient(
DCHECK_EQ(network_loop_, MessageLoop::current());
// TODO(hclam): Is it correct to do to a scoped_refptr?
- ConnectionToClientList::iterator it
- = std::find(connections_.begin(), connections_.end(), connection);
+ ConnectionToClientList::iterator it =
+ std::find(connections_.begin(), connections_.end(), connection);
if (it != connections_.end()) {
connections_.erase(it);
}
@@ -380,14 +282,14 @@ void ScreenRecorder::DoEncode(
// Early out if there's nothing to encode.
if (!capture_data->dirty_rects().size()) {
capture_loop_->PostTask(
- FROM_HERE, NewTracedMethod(this, &ScreenRecorder::DoFinishEncode));
+ FROM_HERE, NewTracedMethod(this, &ScreenRecorder::DoFinishSend));
return;
}
// TODO(hclam): Enable |force_refresh| if a new connection was
// added.
TraceContext::tracer()->PrintString("Encode start");
- encoder_->Encode(capture_data, false,
+ encoder()->Encode(capture_data, false,
NewCallback(this, &ScreenRecorder::EncodeDataAvailableTask));
TraceContext::tracer()->PrintString("Encode Done");
}
@@ -395,20 +297,9 @@ void ScreenRecorder::DoEncode(
void ScreenRecorder::EncodeDataAvailableTask(VideoPacket* packet) {
DCHECK_EQ(encode_loop_, MessageLoop::current());
- bool last = (packet->flags() & VideoPacket::LAST_PACKET) != 0;
-
- // Before a new encode task starts, notify connected clients a new update
- // stream is coming.
- // Notify this will keep a reference to the DataBuffer in the
- // task. The ownership will eventually pass to the ConnectionToClients.
network_loop_->PostTask(
FROM_HERE,
NewTracedMethod(this, &ScreenRecorder::DoSendVideoPacket, packet));
-
- if (last) {
- capture_loop_->PostTask(
- FROM_HERE, NewTracedMethod(this, &ScreenRecorder::DoFinishEncode));
- }
}
} // namespace remoting
diff --git a/remoting/host/screen_recorder.h b/remoting/host/screen_recorder.h
index 2cd02b1..3a844eb 100644
--- a/remoting/host/screen_recorder.h
+++ b/remoting/host/screen_recorder.h
@@ -11,7 +11,7 @@
#include "base/message_loop.h"
#include "base/ref_counted.h"
#include "base/scoped_ptr.h"
-#include "base/time.h"
+#include "base/timer.h"
#include "remoting/base/encoder.h"
#include "remoting/host/capturer.h"
#include "remoting/proto/video.pb.h"
@@ -105,44 +105,39 @@ class ScreenRecorder : public base::RefCountedThreadSafe<ScreenRecorder> {
void DoStart();
void DoPause();
- void DoSetRate(double rate);
void DoSetMaxRate(double max_rate);
// Hepler method to schedule next capture using the current rate.
- void ScheduleNextCapture();
+ void StartCaptureTimer();
void DoCapture();
void CaptureDoneCallback(scoped_refptr<CaptureData> capture_data);
- void DoFinishEncode();
-
- void DoGetInitInfo(scoped_refptr<protocol::ConnectionToClient> client);
+ void DoFinishSend();
// Network thread -----------------------------------------------------------
- void DoStartRateControl();
- void DoPauseRateControl();
-
- // Helper method to schedule next rate regulation task.
- void ScheduleNextRateControl();
-
- void DoRateControl();
-
- // DoSendUpdate takes ownership of header and is responsible for deleting it.
+ // DoSendVideoPacket takes ownership of the |packet| and is responsible
+ // for deleting it.
void DoSendVideoPacket(VideoPacket* packet);
+
void DoSendInit(scoped_refptr<protocol::ConnectionToClient> connection,
int width, int height);
- void DoAddClient(scoped_refptr<protocol::ConnectionToClient> connection);
+ void DoAddConnection(scoped_refptr<protocol::ConnectionToClient> connection);
void DoRemoveClient(scoped_refptr<protocol::ConnectionToClient> connection);
void DoRemoveAllClients();
+ // Callback for the last packet in one update. Deletes |packet| and
+ // schedules next screen capture.
+ void OnFrameSent(VideoPacket* packet);
+
// Encoder thread -----------------------------------------------------------
void DoEncode(scoped_refptr<CaptureData> capture_data);
- // EncodeDataAvailableTask takes ownership of header and is responsible for
- // deleting it.
+ // EncodeDataAvailableTask takes ownership of |packet|.
void EncodeDataAvailableTask(VideoPacket* packet);
+ void SendVideoPacket(VideoPacket* packet);
// Message loops used by this class.
MessageLoop* capture_loop_;
@@ -166,18 +161,20 @@ class ScreenRecorder : public base::RefCountedThreadSafe<ScreenRecorder> {
ConnectionToClientList connections_;
// The following members are accessed on the capture thread.
- double rate_; // Number of captures to perform every second.
bool started_;
- base::Time last_capture_time_; // Saves the time last capture started.
- int recordings_; // Count the number of recordings
- // (i.e. capture or encode) happening.
- // The maximum rate is written on the capture thread and read on the network
- // thread.
- double max_rate_; // Number of captures to perform every second.
+ // Timer that calls DoCapture.
+ base::RepeatingTimer<ScreenRecorder> capture_timer_;
+
+ // Count the number of recordings (i.e. capture or encode) happening.
+ int recordings_;
+
+ // Set to true if we've skipped last capture because there are too
+ // many pending frames.
+ int frame_skipped_;
- // The following member is accessed on the network thread.
- bool rate_control_started_;
+ // Number of captures to perform every second. Written on the capture thread.
+ double max_rate_;
DISALLOW_COPY_AND_ASSIGN(ScreenRecorder);
};
diff --git a/remoting/host/screen_recorder_unittest.cc b/remoting/host/screen_recorder_unittest.cc
index 848226f..4dcfe13 100644
--- a/remoting/host/screen_recorder_unittest.cc
+++ b/remoting/host/screen_recorder_unittest.cc
@@ -19,6 +19,7 @@ using ::testing::_;
using ::testing::AtLeast;
using ::testing::NotNull;
using ::testing::Return;
+using ::testing::SaveArg;
namespace remoting {
@@ -102,8 +103,12 @@ TEST_F(ScreenRecorderTest, OneRecordCycle) {
EXPECT_CALL(*connection_, video_stub())
.WillRepeatedly(Return(&video_stub));
+ Task* done_task = NULL;
+
// Expect the client be notified.
- EXPECT_CALL(video_stub, ProcessVideoPacket(_, _));
+ EXPECT_CALL(video_stub, ProcessVideoPacket(_, _))
+ .Times(1)
+ .WillOnce(SaveArg<1>(&done_task));
EXPECT_CALL(video_stub, GetPendingPackets())
.Times(AtLeast(0))
.WillRepeatedly(Return(0));
@@ -113,6 +118,9 @@ TEST_F(ScreenRecorderTest, OneRecordCycle) {
// Make sure all tasks are completed.
message_loop_.RunAllPending();
+
+ done_task->Run();
+ delete done_task;
}
// TODO(hclam): Add test for double buffering.
diff --git a/remoting/proto/video.proto b/remoting/proto/video.proto
index e1cf5cf..394f891 100644
--- a/remoting/proto/video.proto
+++ b/remoting/proto/video.proto
@@ -35,14 +35,17 @@ message VideoPacketFormat {
message VideoPacket {
// Bitmasks for use in the flags field below.
//
- // The encoder may fragment one update into multiple packets depending on
- // how the encoder outputs data. Thus, one update can logically consist of
- // multiple packets. The FIRST_PACKET and LAST_PACKET flags are used to
- // indicate the start and end of a logical update. Here are notable
- // consequences:
+ // The encoder may fragment one update into multiple partitions.
+ // Each partition may be divided into multiple packets depending on
+ // how the encoder outputs data. Thus, one update can logically
+ // consist of multiple packets. The FIRST_PACKET and LAST_PACKET
+ // flags are used to indicate the start and end of a partition. The
+ // LAST_PARTITION flag is set for the last packet in the last
+ // partition. Here are notable consequences:
// * Both FIRST_PACKET and LAST_PACKET may be set if an update is only
// one packet long.
// * The VideoPacketFormat is only supplied in a FIRST_PACKET.
+ // * LAST_PARTITION can be set only in packet that has LAST_PACKET set.
// * An local update cannot change format between a FIRST_PACKET and
// a LAST_PACKET.
// * All packets in one logical update must be processed in order, and
@@ -50,6 +53,7 @@ message VideoPacket {
enum Flags {
FIRST_PACKET = 1;
LAST_PACKET = 2;
+ LAST_PARTITION = 4;
}
optional int32 flags = 1 [default = 0];
diff --git a/remoting/protocol/buffered_socket_writer.cc b/remoting/protocol/buffered_socket_writer.cc
index 0cf39bc..11b8395 100644
--- a/remoting/protocol/buffered_socket_writer.cc
+++ b/remoting/protocol/buffered_socket_writer.cc
@@ -5,11 +5,34 @@
#include "remoting/protocol/buffered_socket_writer.h"
#include "base/message_loop.h"
+#include "base/stl_util-inl.h"
#include "net/base/net_errors.h"
namespace remoting {
namespace protocol {
+class BufferedSocketWriterBase::PendingPacket {
+ public:
+ PendingPacket(scoped_refptr<net::IOBufferWithSize> data, Task* done_task)
+ : data_(data),
+ done_task_(done_task) {
+ }
+ ~PendingPacket() {
+ if (done_task_.get())
+ done_task_->Run();
+ }
+
+ net::IOBufferWithSize* data() {
+ return data_;
+ }
+
+ private:
+ scoped_refptr<net::IOBufferWithSize> data_;
+ scoped_ptr<Task> done_task_;
+
+ DISALLOW_COPY_AND_ASSIGN(PendingPacket);
+};
+
BufferedSocketWriterBase::BufferedSocketWriterBase()
: buffer_size_(0),
socket_(NULL),
@@ -32,11 +55,11 @@ void BufferedSocketWriterBase::Init(net::Socket* socket,
}
bool BufferedSocketWriterBase::Write(
- scoped_refptr<net::IOBufferWithSize> data) {
+ scoped_refptr<net::IOBufferWithSize> data, Task* done_task) {
AutoLock auto_lock(lock_);
if (!socket_)
return false;
- queue_.push(data);
+ queue_.push_back(new PendingPacket(data, done_task));
buffer_size_ += data->size();
message_loop_->PostTask(
FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriterBase::DoWrite));
@@ -112,9 +135,7 @@ void BufferedSocketWriterBase::OnWritten(int result) {
void BufferedSocketWriterBase::HandleError(int result) {
AutoLock auto_lock(lock_);
closed_ = true;
- while (!queue_.empty()) {
- queue_.pop();
- }
+ STLDeleteElements(&queue_);
// Notify subclass that an error is received.
OnError_Locked(result);
@@ -135,19 +156,27 @@ void BufferedSocketWriterBase::Close() {
closed_ = true;
}
+void BufferedSocketWriterBase::PopQueue() {
+ // This also calls |done_task|.
+ delete queue_.front();
+ queue_.pop_front();
+}
+
BufferedSocketWriter::BufferedSocketWriter() { }
-BufferedSocketWriter::~BufferedSocketWriter() { }
+
+BufferedSocketWriter::~BufferedSocketWriter() {
+ STLDeleteElements(&queue_);
+}
void BufferedSocketWriter::GetNextPacket_Locked(
net::IOBuffer** buffer, int* size) {
- while (!current_buf_ || current_buf_->BytesRemaining() == 0) {
+ if (!current_buf_) {
if (queue_.empty()) {
*buffer = NULL;
return; // Nothing to write.
}
- current_buf_ =
- new net::DrainableIOBuffer(queue_.front(), queue_.front()->size());
- queue_.pop();
+ current_buf_ = new net::DrainableIOBuffer(
+ queue_.front()->data(), queue_.front()->data()->size());
}
*buffer = current_buf_;
@@ -157,6 +186,11 @@ void BufferedSocketWriter::GetNextPacket_Locked(
void BufferedSocketWriter::AdvanceBufferPosition_Locked(int written) {
buffer_size_ -= written;
current_buf_->DidConsume(written);
+
+ if (current_buf_->BytesRemaining() == 0) {
+ PopQueue();
+ current_buf_ = NULL;
+ }
}
void BufferedSocketWriter::OnError_Locked(int result) {
@@ -172,14 +206,14 @@ void BufferedDatagramWriter::GetNextPacket_Locked(
*buffer = NULL;
return; // Nothing to write.
}
- *buffer = queue_.front();
- *size = queue_.front()->size();
+ *buffer = queue_.front()->data();
+ *size = queue_.front()->data()->size();
}
void BufferedDatagramWriter::AdvanceBufferPosition_Locked(int written) {
- DCHECK_EQ(written, queue_.front()->size());
- buffer_size_ -= queue_.front()->size();
- queue_.pop();
+ DCHECK_EQ(written, queue_.front()->data()->size());
+ buffer_size_ -= queue_.front()->data()->size();
+ PopQueue();
}
void BufferedDatagramWriter::OnError_Locked(int result) {
diff --git a/remoting/protocol/buffered_socket_writer.h b/remoting/protocol/buffered_socket_writer.h
index c786c44..3ea127e 100644
--- a/remoting/protocol/buffered_socket_writer.h
+++ b/remoting/protocol/buffered_socket_writer.h
@@ -5,7 +5,7 @@
#ifndef REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_
#define REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_
-#include <queue>
+#include <list>
#include "base/lock.h"
#include "base/ref_counted.h"
@@ -13,6 +13,7 @@
#include "net/socket/socket.h"
class MessageLoop;
+class Task;
namespace net {
class Socket;
@@ -45,7 +46,7 @@ class BufferedSocketWriterBase
// Puts a new data chunk in the buffer. Returns false and doesn't enqueue
// the data if called before Init(). Can be called on any thread.
- bool Write(scoped_refptr<net::IOBufferWithSize> buffer);
+ bool Write(scoped_refptr<net::IOBufferWithSize> buffer, Task* done_task);
// Returns current size of the buffer. Can be called on any thread.
int GetBufferSize();
@@ -58,11 +59,16 @@ class BufferedSocketWriterBase
void Close();
protected:
- typedef std::queue<scoped_refptr<net::IOBufferWithSize> > DataQueue;
+ class PendingPacket;
+ typedef std::list<PendingPacket*> DataQueue;
DataQueue queue_;
int buffer_size_;
+ // Removes element from the front of the queue and calls |done_task|
+ // for that element.
+ void PopQueue();
+
// Following three methods must be implemented in child classes.
// GetNextPacket() returns next packet that needs to be written to the
// socket. |buffer| must be set to NULL if there is nothing left in the queue.
diff --git a/remoting/protocol/client_control_sender.cc b/remoting/protocol/client_control_sender.cc
index d1fd2f5..c16d235 100644
--- a/remoting/protocol/client_control_sender.cc
+++ b/remoting/protocol/client_control_sender.cc
@@ -28,9 +28,7 @@ void ClientControlSender::NotifyResolution(
const NotifyResolutionRequest* msg, Task* done) {
protocol::ControlMessage message;
message.mutable_notify_resolution()->CopyFrom(*msg);
- buffered_writer_->Write(SerializeAndFrameMessage(message));
- done->Run();
- delete done;
+ buffered_writer_->Write(SerializeAndFrameMessage(message), done);
}
} // namespace protocol
diff --git a/remoting/protocol/input_sender.cc b/remoting/protocol/input_sender.cc
index e45149e..8831649 100644
--- a/remoting/protocol/input_sender.cc
+++ b/remoting/protocol/input_sender.cc
@@ -31,9 +31,7 @@ void InputSender::InjectKeyEvent(const KeyEvent* event, Task* done) {
// TODO(hclam): Provide timestamp.
evt->set_timestamp(0);
evt->mutable_key()->CopyFrom(*event);
- buffered_writer_->Write(SerializeAndFrameMessage(message));
- done->Run();
- delete done;
+ buffered_writer_->Write(SerializeAndFrameMessage(message), done);
}
void InputSender::InjectMouseEvent(const MouseEvent* event, Task* done) {
@@ -42,9 +40,7 @@ void InputSender::InjectMouseEvent(const MouseEvent* event, Task* done) {
// TODO(hclam): Provide timestamp.
evt->set_timestamp(0);
evt->mutable_mouse()->CopyFrom(*event);
- buffered_writer_->Write(SerializeAndFrameMessage(message));
- done->Run();
- delete done;
+ buffered_writer_->Write(SerializeAndFrameMessage(message), done);
}
} // namespace protocol
diff --git a/remoting/protocol/protobuf_video_writer.cc b/remoting/protocol/protobuf_video_writer.cc
index 73a9b0d..025445d 100644
--- a/remoting/protocol/protobuf_video_writer.cc
+++ b/remoting/protocol/protobuf_video_writer.cc
@@ -25,9 +25,7 @@ void ProtobufVideoWriter::Init(protocol::Session* session) {
void ProtobufVideoWriter::ProcessVideoPacket(const VideoPacket* packet,
Task* done) {
- buffered_writer_->Write(SerializeAndFrameMessage(*packet));
- done->Run();
- delete done;
+ buffered_writer_->Write(SerializeAndFrameMessage(*packet), done);
}
int ProtobufVideoWriter::GetPendingPackets() {
diff --git a/remoting/protocol/rtcp_writer.cc b/remoting/protocol/rtcp_writer.cc
index c7d53f0..1996665 100644
--- a/remoting/protocol/rtcp_writer.cc
+++ b/remoting/protocol/rtcp_writer.cc
@@ -32,7 +32,7 @@ void RtcpWriter::SendReport(const RtcpReceiverReport& report) {
PackRtcpReceiverReport(report, reinterpret_cast<uint8*>(buffer->data()),
size);
- buffered_rtcp_writer_->Write(buffer);
+ buffered_rtcp_writer_->Write(buffer, NULL);
}
} // namespace protocol
diff --git a/remoting/protocol/rtp_writer.cc b/remoting/protocol/rtp_writer.cc
index 7341741..e26bdb7 100644
--- a/remoting/protocol/rtp_writer.cc
+++ b/remoting/protocol/rtp_writer.cc
@@ -69,7 +69,7 @@ void RtpWriter::SendPacket(uint32 timestamp, bool marker,
payload_size);
// And write the packet.
- buffered_rtp_writer_->Write(buffer);
+ buffered_rtp_writer_->Write(buffer, NULL);
}
int RtpWriter::GetPendingPackets() {