blob: ebdc200a1c16019af88d57b81de1de477dc46779 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_
#define REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_
#include <list>
#include "base/memory/ref_counted.h"
#include "base/synchronization/lock.h"
#include "net/base/io_buffer.h"
#include "net/socket/socket.h"
class Task;
namespace base {
class MessageLoopProxy;
} // namespace base
namespace net {
class Socket;
} // namespace net
namespace remoting {
namespace protocol {
// BufferedSocketWriter and BufferedDatagramWriter implement write data queue
// for stream and datagram sockets. BufferedSocketWriterBase is a base class
// that implements base functionality common for streams and datagrams.
// These classes are particularly useful when data comes from a thread
// that doesn't own the socket, as Write() can be called from any thread.
// Whenever new data is written it is just put in the queue, and then written
// on the thread that owns the socket. GetBufferChunks() and GetBufferSize()
// can be used to throttle writes.
class BufferedSocketWriterBase
: public base::RefCountedThreadSafe<BufferedSocketWriterBase> {
public:
typedef Callback1<int>::Type WriteFailedCallback;
explicit BufferedSocketWriterBase(base::MessageLoopProxy* message_loop);
virtual ~BufferedSocketWriterBase();
// Initializes the writer. Must be called on the thread that will be used
// to access the socket in the future. |callback| will be called after each
// failed write. Caller retains ownership of |socket|.
// TODO(sergeyu): Change it so that it take ownership of |socket|.
void Init(net::Socket* socket, WriteFailedCallback* callback);
// Puts a new data chunk in the buffer. Returns false and doesn't enqueue
// the data if called before Init(). Can be called on any thread.
bool Write(scoped_refptr<net::IOBufferWithSize> buffer, Task* done_task);
// Returns current size of the buffer. Can be called on any thread.
int GetBufferSize();
// Returns number of chunks that are currently in the buffer waiting
// to be written. Can be called on any thread.
int GetBufferChunks();
// Stops writing and drops current buffers. Must be called on the
// network thread.
void Close();
protected:
class PendingPacket;
typedef std::list<PendingPacket*> DataQueue;
DataQueue queue_;
int buffer_size_;
// Removes element from the front of the queue and calls |done_task|
// for that element.
void PopQueue();
// Following three methods must be implemented in child classes.
// GetNextPacket() returns next packet that needs to be written to the
// socket. |buffer| must be set to NULL if there is nothing left in the queue.
virtual void GetNextPacket_Locked(net::IOBuffer** buffer, int* size) = 0;
virtual void AdvanceBufferPosition_Locked(int written) = 0;
// This method is called whenever there is an error writing to the socket.
virtual void OnError_Locked(int result) = 0;
private:
void DoWrite();
void OnWritten(int result);
// This method is called when an error is encountered.
void HandleError(int result);
// Must be locked when accessing |socket_|, |queue_| and |buffer_size_|;
base::Lock lock_;
net::Socket* socket_;
scoped_refptr<base::MessageLoopProxy> message_loop_;
scoped_ptr<WriteFailedCallback> write_failed_callback_;
bool write_pending_;
net::OldCompletionCallbackImpl<BufferedSocketWriterBase> written_callback_;
bool closed_;
};
class BufferedSocketWriter : public BufferedSocketWriterBase {
public:
BufferedSocketWriter(base::MessageLoopProxy* message_loop);
virtual ~BufferedSocketWriter();
protected:
virtual void GetNextPacket_Locked(net::IOBuffer** buffer, int* size);
virtual void AdvanceBufferPosition_Locked(int written);
virtual void OnError_Locked(int result);
private:
scoped_refptr<net::DrainableIOBuffer> current_buf_;
};
class BufferedDatagramWriter : public BufferedSocketWriterBase {
public:
BufferedDatagramWriter(base::MessageLoopProxy* message_loop);
virtual ~BufferedDatagramWriter();
protected:
virtual void GetNextPacket_Locked(net::IOBuffer** buffer, int* size);
virtual void AdvanceBufferPosition_Locked(int written);
virtual void OnError_Locked(int result);
};
} // namespace protocol
} // namespace remoting
#endif // REMOTING_PROTOCOL_BUFFERED_SOCKET_WRITER_H_
|