// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "blimp/net/blimp_connection.h"

#include "base/callback_helpers.h"
#include "base/logging.h"
#include "base/macros.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "blimp/common/logging.h"
#include "blimp/common/proto/blimp_message.pb.h"
#include "blimp/net/blimp_message_processor.h"
#include "blimp/net/blimp_message_pump.h"
#include "blimp/net/common.h"
#include "blimp/net/connection_error_observer.h"
#include "blimp/net/packet_reader.h"
#include "blimp/net/packet_writer.h"
#include "net/base/completion_callback.h"

namespace blimp {
namespace {

// Forwards incoming blimp messages to PacketWriter.
class BlimpMessageSender : public BlimpMessageProcessor {
 public:
  explicit BlimpMessageSender(PacketWriter* writer);
  ~BlimpMessageSender() override;

  void set_error_observer(ConnectionErrorObserver* observer) {
    error_observer_ = observer;
  }

  // BlimpMessageProcessor implementation.
  void ProcessMessage(scoped_ptr<BlimpMessage> message,
                      const net::CompletionCallback& callback) override;

 private:
  void OnWritePacketComplete(int result);

  PacketWriter* writer_;
  ConnectionErrorObserver* error_observer_ = nullptr;
  scoped_refptr<net::IOBuffer> buffer_;
  net::CompletionCallback pending_process_msg_callback_;
  base::WeakPtrFactory<BlimpMessageSender> weak_factory_;

  DISALLOW_COPY_AND_ASSIGN(BlimpMessageSender);
};

BlimpMessageSender::BlimpMessageSender(PacketWriter* writer)
    : writer_(writer),
      buffer_(new net::IOBuffer(kMaxPacketPayloadSizeBytes)),
      weak_factory_(this) {
  DCHECK(writer_);
}

BlimpMessageSender::~BlimpMessageSender() {
  DVLOG(1) << "BlimpMessageSender destroyed.";
}

void BlimpMessageSender::ProcessMessage(
    scoped_ptr<BlimpMessage> message,
    const net::CompletionCallback& callback) {
  DCHECK(error_observer_);
  DVLOG(2) << "Sender::ProcessMessage " << *message;

  if (message->ByteSize() > static_cast<int>(kMaxPacketPayloadSizeBytes)) {
    DLOG(ERROR) << "Message rejected (too large): " << *message;
    callback.Run(net::ERR_MSG_TOO_BIG);
    return;
  }

  if (!message->SerializeToArray(buffer_->data(), message->GetCachedSize())) {
    DLOG(ERROR) << "Failed to serialize message.";
    callback.Run(net::ERR_INVALID_ARGUMENT);
    return;
  }

  // Check that no other message writes are in-flight at this time.
  DCHECK(pending_process_msg_callback_.is_null());
  pending_process_msg_callback_ = callback;

  writer_->WritePacket(
      scoped_refptr<net::DrainableIOBuffer>(
          new net::DrainableIOBuffer(buffer_.get(), message->ByteSize())),
      base::Bind(&BlimpMessageSender::OnWritePacketComplete,
                 weak_factory_.GetWeakPtr()));
}

void BlimpMessageSender::OnWritePacketComplete(int result) {
  DVLOG(2) << "OnWritePacketComplete, result=" << result;
  DCHECK_NE(net::ERR_IO_PENDING, result);
  base::ResetAndReturn(&pending_process_msg_callback_).Run(result);
  if (result != net::OK) {
    error_observer_->OnConnectionError(result);
  }
}

}  // namespace

BlimpConnection::BlimpConnection(scoped_ptr<PacketReader> reader,
                                 scoped_ptr<PacketWriter> writer)
    : reader_(std::move(reader)),
      message_pump_(new BlimpMessagePump(reader_.get())),
      writer_(std::move(writer)),
      outgoing_msg_processor_(new BlimpMessageSender(writer_.get())) {
  DCHECK(writer_);

  // Observe the connection errors received by any of this connection's network
  // objects.
  message_pump_->set_error_observer(this);
  BlimpMessageSender* sender =
      static_cast<BlimpMessageSender*>(outgoing_msg_processor_.get());
  sender->set_error_observer(this);
}

BlimpConnection::BlimpConnection() {}

BlimpConnection::~BlimpConnection() {
  DVLOG(1) << "BlimpConnection destroyed.";
}

void BlimpConnection::AddConnectionErrorObserver(
    ConnectionErrorObserver* observer) {
  error_observers_.AddObserver(observer);
}

void BlimpConnection::RemoveConnectionErrorObserver(
    ConnectionErrorObserver* observer) {
  error_observers_.RemoveObserver(observer);
}

void BlimpConnection::SetIncomingMessageProcessor(
    BlimpMessageProcessor* processor) {
  message_pump_->SetMessageProcessor(processor);
}

BlimpMessageProcessor* BlimpConnection::GetOutgoingMessageProcessor() {
  return outgoing_msg_processor_.get();
}

void BlimpConnection::OnConnectionError(int error) {
  VLOG(1) << "OnConnectionError, error=" << error;

  // Propagate the error to all observers.
  FOR_EACH_OBSERVER(ConnectionErrorObserver, error_observers_,
                    OnConnectionError(error));
}

}  // namespace blimp