summaryrefslogtreecommitdiffstats
path: root/remoting/protocol/message_reader.h
blob: d14cb90b8a651f812454513faac1455f0a3e0319 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Copyright (c) 2010 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef REMOTING_PROTOCOL_MESSAGE_READER_H_
#define REMOTING_PROTOCOL_MESSAGE_READER_H_

#include "base/callback.h"
#include "base/ref_counted.h"
#include "base/scoped_ptr.h"
#include "base/task.h"
#include "net/base/completion_callback.h"
#include "remoting/base/compound_buffer.h"
#include "remoting/protocol/message_decoder.h"

class MessageLoop;

namespace net {
class IOBuffer;
class Socket;
}  // namespace net

namespace remoting {
namespace protocol {

// MessageReader reads data from the socket asynchronously and calls
// callback for each message it receives. It stops calling the
// callback as soon as the socket is closed, so the socket should
// always be closed before the callback handler is destroyed.
//
// In order to throttle the stream, MessageReader doesn't try to read
// new data from the socket until all previously received messages are
// processed by the receiver (|done_task| is called for each message).
// It is still possible that the MessageReceivedCallback is called
// twice (so that there is more than one outstanding message),
// e.g. when we the sender sends multiple messages in one TCP packet.
class MessageReader : public base::RefCountedThreadSafe<MessageReader> {
 public:
  // The callback is given ownership of the second argument
  // (|done_task|).  The buffer (first argument) is owned by
  // MessageReader and is freed when the task specified by the second
  // argument is called.
  typedef Callback2<CompoundBuffer*, Task*>::Type MessageReceivedCallback;

  MessageReader();
  virtual ~MessageReader();

  // Initialize the MessageReader with a socket. If a message is received
  // |callback| is called.
  void Init(net::Socket* socket, MessageReceivedCallback* callback);

 private:
  void DoRead();
  void OnRead(int result);
  void HandleReadResult(int result);
  void OnDataReceived(net::IOBuffer* data, int data_size);
  void OnMessageDone(CompoundBuffer* message);
  void ProcessDoneEvent();

  net::Socket* socket_;

  // The network message loop this object runs on.
  MessageLoop* message_loop_;

  // Set to true, when we have a socket read pending, and expecting
  // OnRead() to be called when new data is received.
  bool read_pending_;

  // Number of messages that we received, but haven't finished
  // processing yet, i.e. |done_task| hasn't been called for these
  // messages.
  int pending_messages_;

  bool closed_;
  scoped_refptr<net::IOBuffer> read_buffer_;
  net::CompletionCallbackImpl<MessageReader> read_callback_;

  MessageDecoder message_decoder_;

  // Callback is called when a message is received.
  scoped_ptr<MessageReceivedCallback> message_received_callback_;
};

// Version of MessageReader for protocol buffer messages, that parses
// each incoming message.
template <class T>
class ProtobufMessageReader {
 public:
  typedef typename Callback2<T*, Task*>::Type MessageReceivedCallback;

  ProtobufMessageReader() { };
  ~ProtobufMessageReader() { };

  void Init(net::Socket* socket, MessageReceivedCallback* callback) {
    message_received_callback_.reset(callback);
    message_reader_ = new MessageReader();
    message_reader_->Init(
        socket, NewCallback(this, &ProtobufMessageReader<T>::OnNewData));
  }

 private:
  void OnNewData(CompoundBuffer* buffer, Task* done_task) {
    T* message = new T();
    CompoundBufferInputStream stream(buffer);
    bool ret = message->ParseFromZeroCopyStream(&stream);
    if (!ret) {
      LOG(WARNING) << "Received message that is not a valid protocol buffer.";
      delete message;
    } else {
      DCHECK_EQ(stream.position(), buffer->total_bytes());
      message_received_callback_->Run(
          message, NewRunnableFunction(
              &ProtobufMessageReader<T>::OnDone, message, done_task));
    }
  }

  static void OnDone(T* message, Task* done_task) {
    delete message;
    done_task->Run();
    delete done_task;
  }

  scoped_refptr<MessageReader> message_reader_;
  scoped_ptr<MessageReceivedCallback> message_received_callback_;
};

}  // namespace protocol
}  // namespace remoting

#endif  // REMOTING_PROTOCOL_MESSAGE_READER_H_