summaryrefslogtreecommitdiffstats
path: root/content/browser/streams
diff options
context:
space:
mode:
authorzork@chromium.org <zork@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-03-14 15:06:15 +0000
committerzork@chromium.org <zork@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-03-14 15:06:15 +0000
commit2f67b439d86916fad3abd6dc6d4a55a32d2596f6 (patch)
tree33a0f7b10c8113a6f04eadddffb08a118bb58e55 /content/browser/streams
parent61fdcbf4572ca0c6a80109f4c9437a58caded5ac (diff)
downloadchromium_src-2f67b439d86916fad3abd6dc6d4a55a32d2596f6.zip
chromium_src-2f67b439d86916fad3abd6dc6d4a55a32d2596f6.tar.gz
chromium_src-2f67b439d86916fad3abd6dc6d4a55a32d2596f6.tar.bz2
Add URL request handling for Streams.
BUG=181495 Review URL: https://chromiumcodereview.appspot.com/12734003 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@188075 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'content/browser/streams')
-rw-r--r--content/browser/streams/stream.cc3
-rw-r--r--content/browser/streams/stream_read_observer.h4
-rw-r--r--content/browser/streams/stream_url_request_job.cc243
-rw-r--r--content/browser/streams/stream_url_request_job.h65
-rw-r--r--content/browser/streams/stream_url_request_job_unittest.cc175
5 files changed, 488 insertions, 2 deletions
diff --git a/content/browser/streams/stream.cc b/content/browser/streams/stream.cc
index e72a887..590452b 100644
--- a/content/browser/streams/stream.cc
+++ b/content/browser/streams/stream.cc
@@ -109,7 +109,8 @@ void Stream::OnSpaceAvailable() {
}
void Stream::OnDataAvailable() {
- read_observer_->OnDataAvailable(this);
+ if (read_observer_)
+ read_observer_->OnDataAvailable(this);
}
} // namespace content
diff --git a/content/browser/streams/stream_read_observer.h b/content/browser/streams/stream_read_observer.h
index a2b1bdc..08fd657 100644
--- a/content/browser/streams/stream_read_observer.h
+++ b/content/browser/streams/stream_read_observer.h
@@ -5,11 +5,13 @@
#ifndef CONTENT_BROWSER_STREAMS_STREAM_READ_OBSERVER_H_
#define CONTENT_BROWSER_STREAMS_STREAM_READ_OBSERVER_H_
+#include "content/common/content_export.h"
+
namespace content {
class Stream;
-class StreamReadObserver {
+class CONTENT_EXPORT StreamReadObserver {
public:
// Sent when there is data available to be read from the stream.
virtual void OnDataAvailable(Stream* stream) = 0;
diff --git a/content/browser/streams/stream_url_request_job.cc b/content/browser/streams/stream_url_request_job.cc
new file mode 100644
index 0000000..da2e9ce
--- /dev/null
+++ b/content/browser/streams/stream_url_request_job.cc
@@ -0,0 +1,243 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "content/browser/streams/stream_url_request_job.h"
+
+#include "base/string_number_conversions.h"
+#include "content/browser/streams/stream.h"
+#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
+#include "net/http/http_byte_range.h"
+#include "net/http/http_response_headers.h"
+#include "net/http/http_response_info.h"
+#include "net/http/http_util.h"
+#include "net/url_request/url_request.h"
+
+namespace content {
+
+namespace {
+
+const int kHTTPOk = 200;
+const int kHTTPNotAllowed = 403;
+const int kHTTPNotFound = 404;
+const int kHTTPMethodNotAllow = 405;
+const int kHTTPInternalError = 500;
+
+const char kHTTPOKText[] = "OK";
+const char kHTTPNotAllowedText[] = "Not Allowed";
+const char kHTTPNotFoundText[] = "Not Found";
+const char kHTTPMethodNotAllowText[] = "Method Not Allowed";
+const char kHTTPInternalErrorText[] = "Internal Server Error";
+
+} // namespace
+
+StreamURLRequestJob::StreamURLRequestJob(
+ net::URLRequest* request,
+ net::NetworkDelegate* network_delegate,
+ scoped_refptr<Stream> stream)
+ : net::URLRequestJob(request, network_delegate),
+ ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)),
+ stream_(stream),
+ headers_set_(false),
+ pending_buffer_size_(0),
+ total_bytes_read_(0),
+ max_range_(0),
+ request_failed_(false) {
+ DCHECK(stream_);
+ stream_->SetReadObserver(this);
+}
+
+StreamURLRequestJob::~StreamURLRequestJob() {
+ ClearStream();
+}
+
+void StreamURLRequestJob::OnDataAvailable(Stream* stream) {
+ // Clear the IO_PENDING status.
+ SetStatus(net::URLRequestStatus());
+ if (pending_buffer_) {
+ int bytes_read;
+ stream_->ReadRawData(pending_buffer_, pending_buffer_size_, &bytes_read);
+
+ // Clear the buffers before notifying the read is complete, so that it is
+ // safe for the observer to read.
+ pending_buffer_ = NULL;
+ pending_buffer_size_ = 0;
+
+ total_bytes_read_ += bytes_read;
+ NotifyReadComplete(bytes_read);
+ }
+}
+
+// net::URLRequestJob methods.
+void StreamURLRequestJob::Start() {
+ // Continue asynchronously.
+ MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&StreamURLRequestJob::DidStart, weak_factory_.GetWeakPtr()));
+}
+
+void StreamURLRequestJob::Kill() {
+ net::URLRequestJob::Kill();
+ weak_factory_.InvalidateWeakPtrs();
+ ClearStream();
+}
+
+bool StreamURLRequestJob::ReadRawData(net::IOBuffer* buf,
+ int buf_size,
+ int* bytes_read) {
+ if (request_failed_)
+ return true;
+
+ DCHECK(bytes_read);
+ int to_read = buf_size;
+ if (max_range_ && to_read) {
+ if (to_read + total_bytes_read_ > max_range_)
+ to_read = max_range_ - total_bytes_read_;
+
+ if (to_read <= 0) {
+ *bytes_read = 0;
+ return true;
+ }
+ }
+
+ switch (stream_->ReadRawData(buf, to_read, bytes_read)) {
+ case Stream::STREAM_HAS_DATA:
+ case Stream::STREAM_COMPLETE:
+ total_bytes_read_ += *bytes_read;
+ return true;
+ case Stream::STREAM_EMPTY:
+ pending_buffer_ = buf;
+ pending_buffer_size_ = to_read;
+ SetStatus(net::URLRequestStatus(net::URLRequestStatus::IO_PENDING, 0));
+ return false;
+ }
+ NOTREACHED();
+ return false;
+}
+
+bool StreamURLRequestJob::GetMimeType(std::string* mime_type) const {
+ if (!response_info_.get())
+ return false;
+
+ // TODO(zork): Support registered MIME types if needed.
+ return response_info_->headers->GetMimeType(mime_type);
+}
+
+void StreamURLRequestJob::GetResponseInfo(net::HttpResponseInfo* info) {
+ if (response_info_.get())
+ *info = *response_info_;
+}
+
+int StreamURLRequestJob::GetResponseCode() const {
+ if (!response_info_.get())
+ return -1;
+
+ return response_info_->headers->response_code();
+}
+
+void StreamURLRequestJob::SetExtraRequestHeaders(
+ const net::HttpRequestHeaders& headers) {
+ std::string range_header;
+ if (headers.GetHeader(net::HttpRequestHeaders::kRange, &range_header)) {
+ std::vector<net::HttpByteRange> ranges;
+ if (net::HttpUtil::ParseRangeHeader(range_header, &ranges)) {
+ if (ranges.size() == 1) {
+ // Streams don't support seeking, so a non-zero starting position
+ // doesn't make sense.
+ if (ranges[0].first_byte_position() == 0) {
+ max_range_ = ranges[0].last_byte_position() + 1;
+ } else {
+ NotifyFailure(net::ERR_METHOD_NOT_SUPPORTED);
+ return;
+ }
+ } else {
+ NotifyFailure(net::ERR_METHOD_NOT_SUPPORTED);
+ return;
+ }
+ }
+ }
+}
+
+void StreamURLRequestJob::DidStart() {
+ // We only support GET request.
+ if (request()->method() != "GET") {
+ NotifyFailure(net::ERR_METHOD_NOT_SUPPORTED);
+ return;
+ }
+
+ HeadersCompleted(kHTTPOk, kHTTPOKText);
+}
+
+void StreamURLRequestJob::NotifyFailure(int error_code) {
+ request_failed_ = true;
+
+ // If we already return the headers on success, we can't change the headers
+ // now. Instead, we just error out.
+ if (headers_set_) {
+ NotifyDone(net::URLRequestStatus(net::URLRequestStatus::FAILED,
+ error_code));
+ return;
+ }
+
+ // TODO(zork): Share these with BlobURLRequestJob.
+ int status_code = 0;
+ std::string status_txt;
+ switch (error_code) {
+ case net::ERR_ACCESS_DENIED:
+ status_code = kHTTPNotAllowed;
+ status_txt = kHTTPNotAllowedText;
+ break;
+ case net::ERR_FILE_NOT_FOUND:
+ status_code = kHTTPNotFound;
+ status_txt = kHTTPNotFoundText;
+ break;
+ case net::ERR_METHOD_NOT_SUPPORTED:
+ status_code = kHTTPMethodNotAllow;
+ status_txt = kHTTPMethodNotAllowText;
+ break;
+ case net::ERR_FAILED:
+ status_code = kHTTPInternalError;
+ status_txt = kHTTPInternalErrorText;
+ break;
+ default:
+ DCHECK(false);
+ status_code = kHTTPInternalError;
+ status_txt = kHTTPInternalErrorText;
+ break;
+ }
+ HeadersCompleted(status_code, status_txt);
+}
+
+void StreamURLRequestJob::HeadersCompleted(int status_code,
+ const std::string& status_text) {
+ std::string status("HTTP/1.1 ");
+ status.append(base::IntToString(status_code));
+ status.append(" ");
+ status.append(status_text);
+ status.append("\0\0", 2);
+ net::HttpResponseHeaders* headers = new net::HttpResponseHeaders(status);
+
+ if (status_code == kHTTPOk) {
+ std::string content_type_header(net::HttpRequestHeaders::kContentType);
+ content_type_header.append(": ");
+ content_type_header.append("plain/text");
+ headers->AddHeader(content_type_header);
+ }
+
+ response_info_.reset(new net::HttpResponseInfo());
+ response_info_->headers = headers;
+
+ headers_set_ = true;
+
+ NotifyHeadersComplete();
+}
+
+void StreamURLRequestJob::ClearStream() {
+ if (stream_) {
+ stream_->RemoveReadObserver(this);
+ stream_ = NULL;
+ }
+}
+
+} // namespace content
diff --git a/content/browser/streams/stream_url_request_job.h b/content/browser/streams/stream_url_request_job.h
new file mode 100644
index 0000000..d087198
--- /dev/null
+++ b/content/browser/streams/stream_url_request_job.h
@@ -0,0 +1,65 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CONTENT_BROWSER_STREAMS_STREAM_URL_REQUEST_JOB_H_
+#define CONTENT_BROWSER_STREAMS_STREAM_URL_REQUEST_JOB_H_
+
+#include "net/url_request/url_request_job.h"
+#include "content/browser/streams/stream_read_observer.h"
+#include "content/common/content_export.h"
+
+namespace content {
+
+class Stream;
+
+// A request job that handles reading stream URLs.
+class CONTENT_EXPORT StreamURLRequestJob
+ : public net::URLRequestJob,
+ public StreamReadObserver {
+ public:
+ StreamURLRequestJob(net::URLRequest* request,
+ net::NetworkDelegate* network_delegate,
+ scoped_refptr<Stream> stream);
+
+ // StreamObserver methods.
+ virtual void OnDataAvailable(Stream* stream) OVERRIDE;
+
+ // net::URLRequestJob methods.
+ virtual void Start() OVERRIDE;
+ virtual void Kill() OVERRIDE;
+ virtual bool ReadRawData(net::IOBuffer* buf,
+ int buf_size,
+ int* bytes_read) OVERRIDE;
+ virtual bool GetMimeType(std::string* mime_type) const OVERRIDE;
+ virtual void GetResponseInfo(net::HttpResponseInfo* info) OVERRIDE;
+ virtual int GetResponseCode() const OVERRIDE;
+ virtual void SetExtraRequestHeaders(
+ const net::HttpRequestHeaders& headers) OVERRIDE;
+
+ protected:
+ virtual ~StreamURLRequestJob();
+
+ private:
+ void DidStart();
+ void NotifyFailure(int);
+ void HeadersCompleted(int status_code, const std::string& status_txt);
+ void ClearStream();
+
+ base::WeakPtrFactory<StreamURLRequestJob> weak_factory_;
+ scoped_refptr<content::Stream> stream_;
+ bool headers_set_;
+ scoped_refptr<net::IOBuffer> pending_buffer_;
+ int pending_buffer_size_;
+ scoped_ptr<net::HttpResponseInfo> response_info_;
+
+ int total_bytes_read_;
+ int max_range_;
+ bool request_failed_;
+
+ DISALLOW_COPY_AND_ASSIGN(StreamURLRequestJob);
+};
+
+} // namespace content
+
+#endif // CONTENT_BROWSER_STREAMS_STREAM_URL_REQUEST_JOB_H_
diff --git a/content/browser/streams/stream_url_request_job_unittest.cc b/content/browser/streams/stream_url_request_job_unittest.cc
new file mode 100644
index 0000000..10a02c9
--- /dev/null
+++ b/content/browser/streams/stream_url_request_job_unittest.cc
@@ -0,0 +1,175 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/message_loop.h"
+#include "base/test/test_simple_task_runner.h"
+#include "content/browser/streams/stream.h"
+#include "content/browser/streams/stream_registry.h"
+#include "content/browser/streams/stream_url_request_job.h"
+#include "content/browser/streams/stream_write_observer.h"
+#include "net/http/http_response_headers.h"
+#include "net/url_request/url_request.h"
+#include "net/url_request/url_request_context.h"
+#include "net/url_request/url_request_job_factory_impl.h"
+#include "net/url_request/url_request_test_util.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace content {
+
+namespace {
+
+const int kBufferSize = 1024;
+const char kTestData1[] = "Hello";
+const char kTestData2[] = "Here it is data.";
+
+const GURL kStreamURL("blob://stream");
+
+} // namespace
+
+class StreamURLRequestJobTest : public testing::Test {
+ public:
+ // A simple ProtocolHandler implementation to create StreamURLRequestJob.
+ class MockProtocolHandler :
+ public net::URLRequestJobFactory::ProtocolHandler {
+ public:
+ MockProtocolHandler(StreamRegistry* registry) : registry_(registry) {}
+
+ // net::URLRequestJobFactory::ProtocolHandler override.
+ virtual net::URLRequestJob* MaybeCreateJob(
+ net::URLRequest* request,
+ net::NetworkDelegate* network_delegate) const OVERRIDE {
+ scoped_refptr<Stream> stream = registry_->GetStream(request->url());
+ if (stream)
+ return new StreamURLRequestJob(request, network_delegate, stream);
+ return NULL;
+ }
+
+ private:
+ StreamRegistry* registry_;
+ };
+
+ StreamURLRequestJobTest()
+ : message_loop_(MessageLoop::TYPE_IO) {
+ }
+
+ virtual void SetUp() {
+ registry_.reset(new StreamRegistry());
+
+ url_request_job_factory_.SetProtocolHandler(
+ "blob", new MockProtocolHandler(registry_.get()));
+ url_request_context_.set_job_factory(&url_request_job_factory_);
+ }
+
+ virtual void TearDown() {
+ }
+
+ void TestSuccessRequest(const GURL& url,
+ const std::string& expected_response) {
+ TestRequest("GET", url, net::HttpRequestHeaders(), 200, expected_response);
+ }
+
+ void TestRequest(const std::string& method,
+ const GURL& url,
+ const net::HttpRequestHeaders& extra_headers,
+ int expected_status_code,
+ const std::string& expected_response) {
+ net::TestDelegate delegate;
+ request_.reset(url_request_context_.CreateRequest(url, &delegate));
+ request_->set_method(method);
+ if (!extra_headers.IsEmpty())
+ request_->SetExtraRequestHeaders(extra_headers);
+ request_->Start();
+
+ MessageLoop::current()->RunUntilIdle();
+
+ // Verify response.
+ EXPECT_TRUE(request_->status().is_success());
+ EXPECT_EQ(expected_status_code,
+ request_->response_headers()->response_code());
+ EXPECT_EQ(expected_response, delegate.data_received());
+ }
+
+ protected:
+ MessageLoop message_loop_;
+ scoped_ptr<StreamRegistry> registry_;
+
+ net::URLRequestContext url_request_context_;
+ net::URLRequestJobFactoryImpl url_request_job_factory_;
+ scoped_ptr<net::URLRequest> request_;
+};
+
+TEST_F(StreamURLRequestJobTest, TestGetSimpleDataRequest) {
+ scoped_refptr<Stream> stream(
+ new Stream(registry_.get(), NULL, GURL(), kStreamURL));
+
+ scoped_refptr<net::StringIOBuffer> buffer(
+ new net::StringIOBuffer(kTestData1));
+
+ stream->AddData(buffer, buffer->size());
+ stream->Finalize();
+
+ TestSuccessRequest(kStreamURL, kTestData1);
+}
+
+TEST_F(StreamURLRequestJobTest, TestGetLargeStreamRequest) {
+ scoped_refptr<Stream> stream(
+ new Stream(registry_.get(), NULL, GURL(), kStreamURL));
+
+ std::string large_data;
+ large_data.reserve(kBufferSize * 5);
+ for (int i = 0; i < kBufferSize * 5; ++i)
+ large_data.append(1, static_cast<char>(i % 256));
+
+ scoped_refptr<net::StringIOBuffer> buffer(
+ new net::StringIOBuffer(large_data));
+
+ stream->AddData(buffer, buffer->size());
+ stream->Finalize();
+ TestSuccessRequest(kStreamURL, large_data);
+}
+
+TEST_F(StreamURLRequestJobTest, TestGetNonExistentStreamRequest) {
+ net::TestDelegate delegate;
+ request_.reset(url_request_context_.CreateRequest(kStreamURL, &delegate));
+ request_->set_method("GET");
+ request_->Start();
+
+ MessageLoop::current()->RunUntilIdle();
+
+ // Verify response.
+ EXPECT_FALSE(request_->status().is_success());
+}
+
+TEST_F(StreamURLRequestJobTest, TestRangeDataRequest) {
+ scoped_refptr<Stream> stream(
+ new Stream(registry_.get(), NULL, GURL(), kStreamURL));
+
+ scoped_refptr<net::StringIOBuffer> buffer(
+ new net::StringIOBuffer(kTestData2));
+
+ stream->AddData(buffer, buffer->size());
+ stream->Finalize();
+
+ net::HttpRequestHeaders extra_headers;
+ extra_headers.SetHeader(net::HttpRequestHeaders::kRange, "bytes=0-3");
+ TestRequest("GET", kStreamURL, extra_headers,
+ 200, std::string(kTestData2, 4));
+}
+
+TEST_F(StreamURLRequestJobTest, TestInvalidRangeDataRequest) {
+ scoped_refptr<Stream> stream(
+ new Stream(registry_.get(), NULL, GURL(), kStreamURL));
+
+ scoped_refptr<net::StringIOBuffer> buffer(
+ new net::StringIOBuffer(kTestData2));
+
+ stream->AddData(buffer, buffer->size());
+ stream->Finalize();
+
+ net::HttpRequestHeaders extra_headers;
+ extra_headers.SetHeader(net::HttpRequestHeaders::kRange, "bytes=1-3");
+ TestRequest("GET", kStreamURL, extra_headers, 405, "");
+}
+
+} // namespace content