summaryrefslogtreecommitdiffstats
path: root/net/flip/flip_stream.cc
blob: 2b8ced32060fa40d0765883962d87e9c66f950bf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright (c) 2009 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "net/flip/flip_stream.h"

#include "net/flip/flip_session.h"
#include "net/http/http_response_info.h"

namespace net {

bool FlipStream::AttachDelegate(FlipDelegate* delegate) {
  DCHECK(delegate_ == NULL);  // Don't attach if already attached.
  DCHECK(path_.length() > 0);  // Path needs to be set for push streams.
  delegate_ = delegate;

  // If there is pending data, send it up here.

  // Check for the OnReply, and pass it up.
  if (response_.get())
    delegate_->OnResponseReceived(response_.get());

  // Pass data up
  while (response_body_.size()) {
    scoped_refptr<IOBufferWithSize> buffer = response_body_.front();
    response_body_.pop_front();
    delegate_->OnDataReceived(buffer->data(), buffer->size());
  }

  // Finally send up the end-of-stream.
  if (download_finished_) {
    delegate_->OnClose(net::OK);
    return true;  // tell the caller to shut us down
  }
  return false;
}

void FlipStream::OnReply(const flip::FlipHeaderBlock* headers) {
  DCHECK(headers);
  DCHECK(headers->find("version") != headers->end());
  DCHECK(headers->find("status") != headers->end());

  // TODO(mbelshe): if no version or status is found, we need to error
  // out the stream.

  metrics_.StartStream();

  // Server initiated streams must send a URL to us in the headers.
  if (headers->find("path") != headers->end())
    path_ = headers->find("path")->second;

  // TODO(mbelshe): For now we convert from our nice hash map back
  // to a string of headers; this is because the HttpResponseInfo
  // is a bit rigid for its http (non-flip) design.
  std::string raw_headers(headers->find("version")->second);
  raw_headers.append(" ", 1);
  raw_headers.append(headers->find("status")->second);
  raw_headers.append("\0", 1);
  flip::FlipHeaderBlock::const_iterator it;
  for (it = headers->begin(); it != headers->end(); ++it) {
    raw_headers.append(it->first);
    raw_headers.append(":", 1);
    raw_headers.append(it->second);
    raw_headers.append("\0", 1);
  }

  LOG(INFO) << "FlipStream: SynReply received for " << stream_id_;

  DCHECK(response_ == NULL);
  response_.reset(new HttpResponseInfo());
  response_->headers = new HttpResponseHeaders(raw_headers);
  // When pushing content from the server, we may not yet have a delegate_
  // to notify.  When the delegate is attached, it will notify then.
  if (delegate_)
    delegate_->OnResponseReceived(response_.get());
}

bool FlipStream::OnData(const char* data, int length) {
  DCHECK(length >= 0);
  LOG(INFO) << "FlipStream: Data (" << length << " bytes) received for "
            << stream_id_;

  // If we don't have a response, then the SYN_REPLY did not come through.
  // We cannot pass data up to the caller unless the reply headers have been
  // received.
  if (!response_.get()) {
    if (delegate_)
      delegate_->OnClose(ERR_SYN_REPLY_NOT_RECEIVED);
    return true;
  }

  // A zero-length read means that the stream is being closed.
  if (!length) {
    metrics_.StopStream();
    download_finished_ = true;
    if (delegate_) {
      delegate_->OnClose(net::OK);
      return true;  // Tell the caller to clean us up.
    }
  }

  // Track our bandwidth.
  metrics_.RecordBytes(length);

  // We've received data.  We try to pass it up to the caller.
  // In the case of server-push streams, we may not have a delegate yet assigned
  // to this stream.  In that case we just queue the data for later.
  if (delegate_) {
    delegate_->OnDataReceived(data, length);
  } else {
    // Save the data for use later.
    // TODO(mbelshe): We need to have some throttling on this.  We shouldn't
    //                buffer an infinite amount of data.
    IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
    memcpy(io_buffer->data(), data, length);
    response_body_.push_back(io_buffer);
  }
  return false;
}

void FlipStream::OnError(int err) {
  if (delegate_)
    delegate_->OnClose(err);
}

int FlipStream::OnWriteComplete(int result) {
  // We only write to the socket when there is a delegate.
  DCHECK(delegate_);

  delegate_->OnWriteComplete(result);

  // TODO(mbelshe): we might want to remove the status code
  //                since we're not doing anything useful with it.
  return OK;
}

}  // namespace net