summaryrefslogtreecommitdiffstats
path: root/net/curvecp/messenger.h
blob: 83773580304964f2756bdb43430fce21bdbd8b71 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef NET_CURVECP_MESSENGER_H_
#define NET_CURVECP_MESSENGER_H_
#pragma once

#include <deque>
#include <list>

#include "base/basictypes.h"
#include "base/task.h"
#include "base/threading/non_thread_safe.h"
#include "base/timer.h"
#include "net/base/completion_callback.h"
#include "net/curvecp/circular_buffer.h"
#include "net/curvecp/packetizer.h"
#include "net/curvecp/received_block_list.h"
#include "net/curvecp/rtt_and_send_rate_calculator.h"
#include "net/curvecp/sent_block_list.h"
#include "net/socket/socket.h"

namespace net {

class DrainableIOBuffer;
class IOBufferWithSize;
class Packetizer;

// The messenger provides the reliable CurveCP transport.
class Messenger : public base::NonThreadSafe,
                  public Packetizer::Listener {
 public:
  explicit Messenger(Packetizer* packetizer);
  virtual ~Messenger();

  int Read(IOBuffer* buf, int buf_len, OldCompletionCallback* callback);
  int Write(IOBuffer* buf, int buf_len, OldCompletionCallback* callback);

  // Packetizer::Listener methods:
  virtual void OnConnection(ConnectionKey key);
  virtual void OnClose(Packetizer* packetizer, ConnectionKey key);
  virtual void OnMessage(Packetizer* packetizer,
                         ConnectionKey key,
                         unsigned char* msg,
                         size_t length);

 protected:
  ConnectionKey key_;

 private:
  // Handle reading data from the read queue.
  int InternalRead(IOBuffer* buffer, int buffer_length);

  // Extracts data from send queue to create a new buffer of data to send.
  IOBufferWithSize* CreateBufferFromSendQueue();

  // Continuation function after a SendMessage() call was blocked.
  void OnSendMessageComplete(int result);

  // Protocol handling routines
  void OnTimeout();
  void OnSendTimer();
  void SendMessage(int64 position);
  void RecvMessage();
  void SendAck(uint32 last_message_received);

  RttAndSendRateCalculator rtt_;
  Packetizer* packetizer_;

  // The send_buffer is a list of pending data to pack into messages and send
  // to the remote.
  CircularBuffer send_buffer_;
  OldCompletionCallback* send_complete_callback_;
  scoped_refptr<IOBuffer> pending_send_;
  int pending_send_length_;

  // The read_buffer is a list of pending data which has been unpacked from
  // messages and is awaiting delivery to the application.
  OldCompletionCallback* receive_complete_callback_;
  scoped_refptr<IOBuffer> pending_receive_;
  int pending_receive_length_;

  // The list of received but unprocessed messages.
  std::list<scoped_refptr<IOBufferWithSize> > read_queue_;

  ReceivedBlockList received_list_;
  SentBlockList sent_list_;
  bool send_message_in_progress_;

  // A timer to fire when a timeout has occurred.
  base::OneShotTimer<Messenger> send_timeout_timer_;
  // A timer to fire when we can send data.
  base::OneShotTimer<Messenger> send_timer_;

  OldCompletionCallbackImpl<Messenger> send_message_callback_;

  ScopedRunnableMethodFactory<Messenger> factory_;
  DISALLOW_COPY_AND_ASSIGN(Messenger);
};

}  // namespace net

#endif  // NET_CURVECP_MESSENGER_H_