summaryrefslogtreecommitdiffstats
path: root/net/curvecp/messenger.cc
blob: a99abda614d931246bb244582082badd035530a6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "net/curvecp/messenger.h"

#include "base/logging.h"
#include "base/message_loop.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
#include "net/curvecp/protocol.h"

// Basic protocol design:
//
// OnTimeout:    Called when the timeout timer pops.
//   - call SendMessage()
//   - call RecvMessage()
//
// OnSendTimer:  Called when the send-timer pops.
//   - call SendMessage()
//   - call RecvMessage()
//
// OnMessage:    Called when a message arrived from the packet layer
//   - add the message to the receive queue
//   - call RecvMessage()
//
// Write:        Called by application to write data to remote
//   - add the data to the send_buffer
//   - call SendMessage()
//
// SendMessage:  Called to Send a message to the remote
//   - msg = first message to retransmit where retransmit timer popped
//   - if msg == NULL
//   -   msg = create a new message from the send buffer
//   - if msg != NULL
//   -   send message to the packet layer
//   -   setTimer(OnSendTimer, send_rate);
//
// RecvMessage:  Called to process a Received message from the remote
//   - calculate RTT
//   - calculate Send Rate
//   - acknowledge data from received message
//   - resetTimeout
//      - timeout = now + rtt_timeout
//      - if currrent_timeout > timeout
//         setTimer(OnTimeout, timeout)

namespace net {

// Maximum number of write blocks.
static const size_t kMaxWriteQueueMessages = 128;

// Size of the send buffer.
static const size_t kSendBufferSize = (128 * 1024);
// Size of the receive buffer.
static const size_t kReceiveBufferSize = (128 * 1024);

Messenger::Messenger(Packetizer* packetizer)
    : packetizer_(packetizer),
      send_buffer_(kSendBufferSize),
      send_complete_callback_(NULL),
      old_receive_complete_callback_(NULL),
      pending_receive_length_(0),
      send_message_in_progress_(false),
      ALLOW_THIS_IN_INITIALIZER_LIST(
          send_message_callback_(this, &Messenger::OnSendMessageComplete)),
      ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) {
}

Messenger::~Messenger() {
}

int Messenger::Read(IOBuffer* buf, int buf_len, OldCompletionCallback* callback) {
  DCHECK(CalledOnValidThread());
  DCHECK(!old_receive_complete_callback_ &&
         receive_complete_callback_.is_null());

  if (!received_list_.bytes_available()) {
    old_receive_complete_callback_ = callback;
    pending_receive_ = buf;
    pending_receive_length_ = buf_len;
    return ERR_IO_PENDING;
  }

  int bytes_read = InternalRead(buf, buf_len);
  DCHECK_LT(0, bytes_read);
  return bytes_read;
}
int Messenger::Read(IOBuffer* buf, int buf_len,
                    const CompletionCallback& callback) {
  DCHECK(CalledOnValidThread());
  DCHECK(!old_receive_complete_callback_ &&
         receive_complete_callback_.is_null());

  if (!received_list_.bytes_available()) {
    receive_complete_callback_ = callback;
    pending_receive_ = buf;
    pending_receive_length_ = buf_len;
    return ERR_IO_PENDING;
  }

  int bytes_read = InternalRead(buf, buf_len);
  DCHECK_LT(0, bytes_read);
  return bytes_read;
}

int Messenger::Write(IOBuffer* buf, int buf_len, OldCompletionCallback* callback) {
  DCHECK(CalledOnValidThread());
  DCHECK(!pending_send_.get());   // Already a write pending!
  DCHECK(!send_complete_callback_);
  DCHECK_LT(0, buf_len);

  int len = send_buffer_.write(buf->data(), buf_len);
  if (!send_timer_.IsRunning())
    send_timer_.Start(FROM_HERE, base::TimeDelta(),
                      this, &Messenger::OnSendTimer);
  if (len)
    return len;

  // We couldn't add data to the send buffer, so block the application.
  pending_send_ = buf;
  pending_send_length_ = buf_len;
  send_complete_callback_ = callback;
  return ERR_IO_PENDING;
}

void Messenger::OnConnection(ConnectionKey key) {
  LOG(ERROR) << "Client Connect: " << key.ToString();
  key_ = key;
}

void Messenger::OnClose(Packetizer* packetizer, ConnectionKey key) {
  LOG(ERROR) << "Got Close!";
}

void Messenger::OnMessage(Packetizer* packetizer,
                          ConnectionKey key,
                          unsigned char* msg,
                          size_t length) {
  DCHECK(key == key_);

  // Do basic message sanity checking.
  if (length < sizeof(Message))
    return;
  if (length > static_cast<size_t>(kMaxMessageLength))
    return;

  // Add message to received queue.
  scoped_refptr<IOBufferWithSize> buffer(new IOBufferWithSize(length));
  memcpy(buffer->data(), msg, length);
  read_queue_.push_back(buffer);

  // Process a single received message
  RecvMessage();
}

int Messenger::InternalRead(IOBuffer* buffer, int buffer_length) {
  return received_list_.ReadBytes(buffer, buffer_length);
}

IOBufferWithSize* Messenger::CreateBufferFromSendQueue() {
  DCHECK_LT(0, send_buffer_.length());

  int length = std::min(packetizer_->max_message_payload(),
                        send_buffer_.length());
  IOBufferWithSize* rv = new IOBufferWithSize(length);
  int bytes = send_buffer_.read(rv->data(), length);
  DCHECK_EQ(bytes, length);

  // We consumed data, check to see if someone is waiting to write more data.
  if (send_complete_callback_) {
    DCHECK(pending_send_.get());

    int len = send_buffer_.write(pending_send_->data(), pending_send_length_);
    if (len) {
      pending_send_ = NULL;
      OldCompletionCallback* callback = send_complete_callback_;
      send_complete_callback_ = NULL;
      callback->Run(len);
    }
  }

  return rv;
}

void Messenger::OnSendMessageComplete(int result) {
  DCHECK_NE(ERR_IO_PENDING, result);

  send_message_in_progress_ = false;

  if (result <= 0) {
    // TODO(mbelshe):  Handle error.
    NOTIMPLEMENTED();
  }

  // If the send timer fired while we were waiting for this send to complete,
  // we need to manually run the timer now.
  if (!send_timer_.IsRunning())
    OnSendTimer();

  if (!send_timeout_timer_.IsRunning()) {
    LOG(ERROR) << "RttTimeout is " << rtt_.rtt_timeout();
    base::TimeDelta delay =
        base::TimeDelta::FromMicroseconds(rtt_.rtt_timeout());
    send_timeout_timer_.Start(FROM_HERE, delay, this, &Messenger::OnTimeout);
  }
}

void Messenger::OnTimeout() {
  LOG(ERROR) << "OnTimeout fired";
  int64 position = sent_list_.FindPositionOfOldestSentBlock();
  // XXXMB - need to verify that we really need to retransmit...
  if (position >= 0) {
    rtt_.OnTimeout();  // adjust our send rate.
    LOG(ERROR) << "OnTimeout retransmitting: " << position;
    SendMessage(position);
  } else {
    DCHECK_EQ(0u, sent_list_.size());
  }
  RecvMessage();
  received_list_.LogBlockList();
}

void Messenger::OnSendTimer() {
  LOG(ERROR) << "OnSendTimer!";
  DCHECK(!send_timer_.IsRunning());

  // If the send buffer is empty, then we don't need to keep firing.
  if (!send_buffer_.length()) {
    LOG(ERROR) << "OnSendTimer: send_buffer empty";
    return;
  }

  // Set the next send timer.
  LOG(ERROR) << "SendRate is: " << rtt_.send_rate() << "us";
  send_timer_.Start(FROM_HERE,
                    base::TimeDelta::FromMicroseconds(rtt_.send_rate()),
                    this, &Messenger::OnSendTimer);

  // Create a block from the send_buffer.
  if (!sent_list_.is_full()) {
    scoped_refptr<IOBufferWithSize> buffer = CreateBufferFromSendQueue();
    int64 position = sent_list_.CreateBlock(buffer.get());
    DCHECK_LE(0, position);
    SendMessage(position);
  }

  RecvMessage();  // Try to process an incoming message
}

void Messenger::SendMessage(int64 position) {
  LOG(ERROR) << "SendMessage (position=" << position << ")";
  if (send_message_in_progress_)
    return;  // We're still waiting for the last write to complete.

  IOBufferWithSize* data = sent_list_.FindBlockByPosition(position);
  DCHECK(data != NULL);
  size_t message_size = sizeof(Message) + data->size();
  size_t padded_size = (message_size + 15) & 0xfffffff0;

  scoped_refptr<IOBufferWithSize> message(new IOBufferWithSize(padded_size));
  Message* header = reinterpret_cast<Message*>(message->data());
  memset(header, 0, sizeof(Message));
  uint64 id = sent_list_.GetNewMessageId();
  uint32_pack(header->message_id, id);
  // TODO(mbelshe): Needs to carry EOF flags
  uint16_pack(header->size.val, data->size());
  uint64_pack(header->position, position);
  // TODO(mbelshe): Fill in rest of the header fields.
  //     needs to have the block-position.  He tags each chunk with an
  //     absolute offset into the data stream.
  // Copy the contents of the message into the Message frame.
  memcpy(message->data() + sizeof(Message), data->data(), data->size());

  sent_list_.MarkBlockSent(position, id);

  int rv = packetizer_->SendMessage(key_,
                                    message->data(),
                                    padded_size,
                                    &send_message_callback_);
  if (rv == ERR_IO_PENDING) {
    send_message_in_progress_ = true;
    return;
  }

  // UDP must write all or none.
  DCHECK_EQ(padded_size, static_cast<size_t>(rv));
  OnSendMessageComplete(rv);
}

void Messenger::RecvMessage() {
  if (!read_queue_.size())
    return;

  scoped_refptr<IOBufferWithSize> message(read_queue_.front());
  read_queue_.pop_front();

  Message* header = reinterpret_cast<Message*>(message->data());
  uint16 body_length = uint16_unpack(header->size.val);
  if (body_length > kMaxMessageLength)
    return;
  if (body_length > message->size())
    return;

  uint32 message_id = uint32_unpack(header->message_id);
  if (message_id) {
    LOG(ERROR) << "RecvMessage Message id: " << message_id
               << ", " << body_length << " bytes";
  } else {
    LOG(ERROR) << "RecvMessage ACK ";
  }

  // See if this message has information for recomputing RTT.
  uint32 response_to_msg = uint32_unpack(header->last_message_received);
  base::TimeTicks last_sent_time = sent_list_.FindLastSendTime(response_to_msg);
  if (!last_sent_time.is_null()) {
    int rtt = (base::TimeTicks::Now() - last_sent_time).InMicroseconds();
    DCHECK_LE(0, rtt);
    LOG(ERROR) << "RTT was: " << rtt << "us";
    rtt_.OnMessage(rtt);
  }

  // Handle acknowledgements
  uint64 start_byte = 0;
  uint64 stop_byte = uint64_unpack(header->acknowledge_1);
  sent_list_.AcknowledgeBlocks(start_byte, stop_byte);

  start_byte = stop_byte + uint16_unpack(header->gap_1);
  stop_byte = start_byte + uint16_unpack(header->acknowledge_2);
  sent_list_.AcknowledgeBlocks(start_byte, stop_byte);

  start_byte = stop_byte + uint16_unpack(header->gap_2);
  stop_byte = start_byte + uint16_unpack(header->acknowledge_3);
  sent_list_.AcknowledgeBlocks(start_byte, stop_byte);

  start_byte = stop_byte + uint16_unpack(header->gap_3);
  stop_byte = start_byte + uint16_unpack(header->acknowledge_4);
  sent_list_.AcknowledgeBlocks(start_byte, stop_byte);

  start_byte = stop_byte + uint16_unpack(header->gap_4);
  stop_byte = start_byte + uint16_unpack(header->acknowledge_5);
  sent_list_.AcknowledgeBlocks(start_byte, stop_byte);

  start_byte = stop_byte + uint16_unpack(header->gap_5);
  stop_byte = start_byte + uint16_unpack(header->acknowledge_6);
  sent_list_.AcknowledgeBlocks(start_byte, stop_byte);

  if (!header->is_ack()) {
    // Add to our received block list
    uint64 position = uint64_unpack(header->position);
    scoped_refptr<IOBuffer> buffer(new IOBuffer(body_length));
    memcpy(buffer->data(), message->data() + sizeof(Message), body_length);
    received_list_.AddBlock(position, buffer, body_length);

    SendAck(message_id);
  }

  // If we have data available, and a read is pending, notify the callback.
  if (received_list_.bytes_available() &&
      (old_receive_complete_callback_ ||
       !receive_complete_callback_.is_null())) {
    // Pass the data up to the caller.
    int bytes_read = InternalRead(pending_receive_, pending_receive_length_);
    if (old_receive_complete_callback_) {
      OldCompletionCallback* callback = old_receive_complete_callback_;
      old_receive_complete_callback_ = NULL;
      callback->Run(bytes_read);
    } else {
      CompletionCallback callback = receive_complete_callback_;
      receive_complete_callback_.Reset();
      callback.Run(bytes_read);
    }
  }
}

void Messenger::SendAck(uint32 last_message_received) {
  scoped_refptr<IOBuffer> buffer(new IOBuffer(sizeof(Message)));
  memset(buffer->data(), 0, sizeof(Message));
  Message* message = reinterpret_cast<Message*>(buffer->data());
  uint32_pack(message->last_message_received, last_message_received);
  uint64_pack(message->acknowledge_1, received_list_.bytes_received());
  LOG(ERROR) << "SendAck " << received_list_.bytes_received();
  // TODO(mbelshe): fill in remainder of selective acknowledgements

  // TODO(mbelshe): Fix this - it is totally possible to have a send message
  //                in progress here...
  DCHECK(!send_message_in_progress_);

  int rv = packetizer_->SendMessage(key_,
                                    buffer->data(),
                                    sizeof(Message),
                                    &send_message_callback_);
  // TODO(mbelshe): Fix me!  Deal with the error cases
  DCHECK(rv == sizeof(Message));
}

}  // namespace net