Skip to content

Commit 2079eff

Browse files
authored
Implement storage::ObjectWriteStream (#911)
This is a class derived from std::basic_ostream<char> that uses libcurl to upload data using chunked enconding. It will be used in Client::WriteObject(), part of the #557 changes. Use gunicorn + flask in the integration tests, Werkzeug does not seem to support chunked transfer encoding, and we need to test that. Remove a lot of hard-coded stuff. Thanks to @houglum for pointing it out.
1 parent a14282f commit 2079eff

24 files changed

Lines changed: 551 additions & 31 deletions

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,6 @@ cmake-build-*/
1515

1616
# This is a staging directory used to upload the documents to gihub.io
1717
github-io-staging/
18+
19+
google/cloud/storage/tests/__pycache__/
20+
google/cloud/storage/tests/testbench.pyc

ci/travis/Dockerfile.centos

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,13 @@ RUN yum makecache && yum install -y \
3737
pkgconfig \
3838
python \
3939
python-pip \
40-
python-gunicorn \
4140
shtool \
4241
unzip \
4342
wget \
4443
which \
4544
zlib-devel
4645

47-
RUN pip install httpbin
46+
RUN pip install httpbin flask gunicorn
4847

4948
# Install cmake3 + ctest3 as cmake + ctest.
5049
RUN ln -sf /usr/bin/cmake3 /usr/bin/cmake && \

ci/travis/Dockerfile.fedora

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,14 @@ RUN dnf makecache && dnf install -y \
3939
openssl-devel \
4040
pkgconfig \
4141
python \
42-
python-gunicorn \
43-
python-httpbin \
4442
shtool \
4543
unzip \
4644
wget \
4745
which \
4846
zlib-devel
4947

48+
RUN pip install httpbin flask gunicorn
49+
5050
# Install the Cloud Bigtable emulator and the Cloud Bigtable command-line
5151
# client. They are used in the integration tests.
5252
WORKDIR /var/tmp/install/cbt-components

ci/travis/Dockerfile.ubuntu

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ RUN apt update && \
3939
libtool \
4040
lsb-release \
4141
make \
42-
python-gunicorn \
43-
python-httpbin \
4442
python-pip \
4543
tar \
4644
wget \
@@ -64,6 +62,10 @@ RUN if grep -q 18.04 /etc/lsb-release; then \
6462
# because some third party changed something.
6563
RUN pip install numpy cmake_format==0.4.0
6664

65+
# Install Python packages used in the integration tests.
66+
RUN apt update && apt install -y apache2-dev
67+
RUN pip install flask httpbin gunicorn
68+
6769
# Install the Cloud Bigtable emulator and the Cloud Bigtable command-line
6870
# client. They are used in the integration tests.
6971
WORKDIR /var/tmp/install/cbt-components

ci/travis/Dockerfile.ubuntu-trusty

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ RUN apt update && apt install -y software-properties-common && \
3939
libtool \
4040
lsb-release \
4141
make \
42-
python-dev \
43-
python-pip \
4442
tar \
4543
wget \
4644
zlib1g-dev \
@@ -50,7 +48,9 @@ RUN apt update && apt install -y software-properties-common && \
5048
update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-4.9 100 && \
5149
update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-4.9 100
5250

53-
RUN pip install gunicorn httpbin
51+
# Install Python packages used in the integration tests.
52+
RUN apt update && apt install -y apache2-dev python-dev python-pip
53+
RUN pip install flask httpbin gunicorn
5454

5555
# Install the Cloud Bigtable emulator and the Cloud Bigtable command-line
5656
# client. They are used in the integration tests.

google/cloud/storage/CMakeLists.txt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,8 @@ add_library(storage_client
126126
internal/curl_wrappers.cc
127127
internal/curl_client.h
128128
internal/curl_client.cc
129-
internal/object_acl_requests.h
130-
internal/object_acl_requests.cc
129+
internal/curl_streambuf.h
130+
internal/curl_streambuf.cc
131131
internal/delete_object_request.h
132132
internal/delete_object_request.cc
133133
internal/empty_response.h
@@ -154,6 +154,10 @@ add_library(storage_client
154154
internal/metadata_parser.cc
155155
internal/nljson.h
156156
internal/openssl_util.h
157+
internal/object_acl_requests.h
158+
internal/object_acl_requests.cc
159+
internal/object_streambuf.h
160+
internal/object_streambuf.cc
157161
internal/parse_rfc3339.h
158162
internal/parse_rfc3339.cc
159163
internal/raw_client.h

google/cloud/storage/internal/curl_request_builder.cc

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,16 @@ namespace storage {
2020
inline namespace STORAGE_CLIENT_NS {
2121
namespace internal {
2222

23+
#ifndef GOOGLE_CLOUD_CPP_STORAGE_INITIAL_BUFFER_SIZE
24+
#define GOOGLE_CLOUD_CPP_STORAGE_INITIAL_BUFFER_SIZE (128 * 1024)
25+
#endif // GOOGLE_CLOUD_CPP_STORAGE_INITIAL_BUFFER_SIZE
26+
2327
CurlRequestBuilder::CurlRequestBuilder(std::string base_url)
2428
: headers_(nullptr, &curl_slist_free_all),
2529
url_(std::move(base_url)),
2630
query_parameter_separator_("?"),
27-
logging_enabled_(false) {}
31+
logging_enabled_(false),
32+
initial_buffer_size_(GOOGLE_CLOUD_CPP_STORAGE_INITIAL_BUFFER_SIZE) {}
2833

2934
CurlRequest CurlRequestBuilder::BuildRequest(std::string payload) {
3035
ValidateBuilderState(__func__);
@@ -41,13 +46,13 @@ CurlRequest CurlRequestBuilder::BuildRequest(std::string payload) {
4146

4247
CurlUploadRequest CurlRequestBuilder::BuildUpload() {
4348
ValidateBuilderState(__func__);
44-
CurlUploadRequest request;
49+
CurlUploadRequest request(initial_buffer_size_);
4550
request.url_ = std::move(url_);
4651
request.headers_ = std::move(headers_);
4752
request.user_agent_ = user_agent_prefix_ + UserAgentSuffix();
4853
request.handle_ = std::move(handle_);
4954
request.multi_.reset(curl_multi_init());
50-
request.ResetOptions();
55+
request.SetOptions();
5156
return request;
5257
}
5358

@@ -90,6 +95,12 @@ CurlRequestBuilder& CurlRequestBuilder::SetDebugLogging(bool enabled) {
9095
return *this;
9196
}
9297

98+
CurlRequestBuilder& CurlRequestBuilder::SetInitialBufferSize(std::size_t size) {
99+
ValidateBuilderState(__func__);
100+
initial_buffer_size_ = size;
101+
return *this;
102+
}
103+
93104
std::string CurlRequestBuilder::UserAgentSuffix() const {
94105
ValidateBuilderState(__func__);
95106
// Pre-compute and cache the user agent string:

google/cloud/storage/internal/curl_request_builder.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ class CurlRequestBuilder {
8484

8585
CurlRequestBuilder& SetDebugLogging(bool enabled);
8686

87+
CurlRequestBuilder& SetInitialBufferSize(std::size_t size);
88+
8789
/// Get the user-agent suffix.
8890
std::string UserAgentSuffix() const;
8991

@@ -104,6 +106,8 @@ class CurlRequestBuilder {
104106
std::string user_agent_prefix_;
105107

106108
bool logging_enabled_;
109+
110+
std::size_t initial_buffer_size_;
107111
};
108112

109113
} // namespace internal
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Copyright 2018 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/storage/internal/curl_streambuf.h"
16+
17+
namespace google {
18+
namespace cloud {
19+
namespace storage {
20+
inline namespace STORAGE_CLIENT_NS {
21+
namespace internal {
22+
23+
CurlStreambuf::CurlStreambuf(CurlUploadRequest&& upload,
24+
std::size_t max_buffer_size)
25+
: upload_(std::move(upload)), max_buffer_size_(max_buffer_size) {
26+
current_ios_buffer_.reserve(max_buffer_size);
27+
}
28+
29+
bool CurlStreambuf::IsOpen() const { return upload_.IsOpen(); }
30+
31+
CurlStreambuf::int_type CurlStreambuf::overflow(int_type ch) {
32+
Validate(__func__);
33+
SwapBuffers();
34+
if (not traits_type::eq_int_type(ch, traits_type::eof())) {
35+
current_ios_buffer_.push_back(traits_type::to_char_type(ch));
36+
pbump(1);
37+
}
38+
return 0;
39+
}
40+
41+
int CurlStreambuf::sync() {
42+
// The default destructor calls sync(), for already closed streams this should
43+
// be a no-op.
44+
if (not IsOpen()) {
45+
return 0;
46+
}
47+
SwapBuffers();
48+
upload_.Flush();
49+
return 0;
50+
}
51+
52+
std::streamsize CurlStreambuf::xsputn(char const* s, std::streamsize count) {
53+
Validate(__func__);
54+
current_ios_buffer_.append(s, static_cast<std::size_t>(count));
55+
pbump(static_cast<int>(count));
56+
if (current_ios_buffer_.size() > max_buffer_size_) {
57+
SwapBuffers();
58+
}
59+
return count;
60+
}
61+
62+
HttpResponse CurlStreambuf::DoClose() {
63+
GCP_LOG(INFO) << __func__ << "()";
64+
Validate(__func__);
65+
SwapBuffers();
66+
return upload_.Close();
67+
}
68+
69+
void CurlStreambuf::Validate(char const* where) const {
70+
if (upload_.IsOpen()) {
71+
return;
72+
}
73+
std::string msg = "Attempting to use closed CurlStream in ";
74+
msg += where;
75+
google::cloud::internal::RaiseRuntimeError(msg);
76+
}
77+
78+
void CurlStreambuf::SwapBuffers() {
79+
// Shorten the buffer to the actual used size.
80+
current_ios_buffer_.resize(pptr() - pbase());
81+
// Push the buffer to the libcurl wrapper to be written as needed
82+
upload_.NextBuffer(current_ios_buffer_);
83+
// Make the buffer big enough to receive more data before needing a flush.
84+
current_ios_buffer_.clear();
85+
current_ios_buffer_.reserve(max_buffer_size_);
86+
setp(&current_ios_buffer_[0], &current_ios_buffer_[0] + max_buffer_size_);
87+
}
88+
89+
} // namespace internal
90+
} // namespace STORAGE_CLIENT_NS
91+
} // namespace storage
92+
} // namespace cloud
93+
} // namespace google
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright 2018 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_CURL_STREAMBUF_H_
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_CURL_STREAMBUF_H_
17+
18+
#include "google/cloud/storage/internal/curl_upload_request.h"
19+
#include "google/cloud/storage/internal/object_streambuf.h"
20+
21+
namespace google {
22+
namespace cloud {
23+
namespace storage {
24+
inline namespace STORAGE_CLIENT_NS {
25+
namespace internal {
26+
/**
27+
* Implement a wrapper for libcurl-based streaming uploads.
28+
*/
29+
class CurlStreambuf : public ObjectWriteStreambuf {
30+
public:
31+
explicit CurlStreambuf(CurlUploadRequest&& upload,
32+
std::size_t max_buffer_size);
33+
34+
~CurlStreambuf() override = default;
35+
36+
bool IsOpen() const override;
37+
38+
protected:
39+
int sync() override;
40+
std::streamsize xsputn(char const* s, std::streamsize count) override;
41+
int_type overflow(int_type ch) override;
42+
HttpResponse DoClose() override;
43+
44+
private:
45+
/// Raise an exception if the stream is closed.
46+
void Validate(char const* where) const;
47+
48+
/// Flush the libcurl buffer and swap it with the iostream buffer.
49+
void SwapBuffers();
50+
51+
CurlUploadRequest upload_;
52+
std::string current_ios_buffer_;
53+
std::size_t max_buffer_size_;
54+
};
55+
56+
} // namespace internal
57+
} // namespace STORAGE_CLIENT_NS
58+
} // namespace storage
59+
} // namespace cloud
60+
} // namespace google
61+
62+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_CURL_STREAMBUF_H_

0 commit comments

Comments
 (0)