summaryrefslogtreecommitdiffstats
path: root/blimp/net/blimp_message_checkpointer.cc
blob: abf77d668ab023ba7f77476d39b9b9c5b8244427 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "blimp/net/blimp_message_checkpointer.h"

#include "base/logging.h"
#include "blimp/common/create_blimp_message.h"
#include "blimp/common/proto/blimp_message.pb.h"
#include "blimp/common/proto/protocol_control.pb.h"
#include "blimp/net/blimp_message_checkpoint_observer.h"
#include "net/base/net_errors.h"

namespace blimp {

namespace {
const int kDeferCheckpointAckSeconds = 1;
}

BlimpMessageCheckpointer::BlimpMessageCheckpointer(
    BlimpMessageProcessor* incoming_processor,
    BlimpMessageProcessor* outgoing_processor,
    BlimpMessageCheckpointObserver* checkpoint_observer)
    : incoming_processor_(incoming_processor),
      outgoing_processor_(outgoing_processor),
      checkpoint_observer_(checkpoint_observer),
      weak_factory_(this) {
  DCHECK(incoming_processor_);
  DCHECK(outgoing_processor_);
  DCHECK(checkpoint_observer_);
}

BlimpMessageCheckpointer::~BlimpMessageCheckpointer() {}

void BlimpMessageCheckpointer::ProcessMessage(
    scoped_ptr<BlimpMessage> message,
    const net::CompletionCallback& callback) {
  if (message->type() == BlimpMessage::PROTOCOL_CONTROL &&
      message->protocol_control().type() ==
          ProtocolControlMessage::CHECKPOINT_ACK) {
    if (message->protocol_control().has_checkpoint_ack() &&
        message->protocol_control().checkpoint_ack().has_checkpoint_id()) {
      checkpoint_observer_->OnMessageCheckpoint(
          message->protocol_control().checkpoint_ack().checkpoint_id());
      callback.Run(net::OK);
    } else {
      DLOG(WARNING) << "Invalid checkpoint ACK. Dropping connection.";
      callback.Run(net::ERR_FAILED);
    }

    return;
  }

  // TODO(wez): Provide independent checkpoints for each message->type()?
  DCHECK(message->has_message_id());

  // Store the message-Id to include in the checkpoint ACK.
  checkpoint_id_ = message->message_id();

  // Kick the timer, if not running, to ACK after a short delay.
  if (!defer_timer_.IsRunning()) {
    defer_timer_.Start(FROM_HERE,
                       base::TimeDelta::FromSeconds(kDeferCheckpointAckSeconds),
                       this, &BlimpMessageCheckpointer::SendCheckpointAck);
  }

  // Pass the message along for actual processing.
  incoming_processor_->ProcessMessage(
      std::move(message),
      base::Bind(&BlimpMessageCheckpointer::InvokeCompletionCallback,
                 weak_factory_.GetWeakPtr(), callback));
}

void BlimpMessageCheckpointer::InvokeCompletionCallback(
    const net::CompletionCallback& callback,
    int result) {
  callback.Run(result);
}

void BlimpMessageCheckpointer::SendCheckpointAck() {
  outgoing_processor_->ProcessMessage(
      CreateCheckpointAckMessage(checkpoint_id_), net::CompletionCallback());
}

}  // namespace blimp