Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a timeout error on Windows when writing to a S3 bucket #141

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cloud/aws/aws_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include "cloud/cloud_env_impl.h"

#ifdef USE_AWS

#include <string.h>

#include <chrono>
Expand Down Expand Up @@ -76,5 +75,11 @@ class AwsEnv : public CloudEnvImpl {
};

} // namespace ROCKSDB_NAMESPACE

#else
namespace ROCKSDB_NAMESPACE {
class AwsEnv : public CloudEnvImpl {
public:
static Status NewAwsEnv(Env* env, std::unique_ptr<CloudEnv>* cenv);
};
} // namespace ROCKSDB_NAMESPACE
#endif // USE_AWS
17 changes: 8 additions & 9 deletions cloud/aws/aws_s3.cc
Original file line number Diff line number Diff line change
Expand Up @@ -900,11 +900,10 @@ Status S3StorageProvider::DoGetCloudObject(const std::string& bucket_name,
// The stream is not flushed when WaitUntilFinished() returns.
// TODO(igor) Fix this once the AWS SDK's bug is fixed.
auto ioStreamFactory = [=]() -> Aws::IOStream* {
// fallback to FStream
return Aws::New<Aws::FStream>(
Aws::Utils::ARRAY_ALLOCATION_TAG, destination,
std::ios_base::out | std::ios_base::trunc);

// fallback to FStream
return Aws::New<Aws::FStream>(
Aws::Utils::ARRAY_ALLOCATION_TAG, destination,
std::ios_base::out | std::ios_base::trunc | std::ios_base::binary);
};

auto handle = s3client_->DownloadFile(ToAwsString(bucket_name),
Expand Down Expand Up @@ -934,7 +933,7 @@ Status S3StorageProvider::DoGetCloudObject(const std::string& bucket_name,
// fallback to FStream
return Aws::New<Aws::FStream>(
Aws::Utils::ARRAY_ALLOCATION_TAG, destination,
std::ios_base::out | std::ios_base::trunc);
std::ios_base::out | std::ios_base::trunc | std::ios_base::binary);
}
return new IOStreamWithOwnedBuf<WritableFileStreamBuf>(
std::unique_ptr<WritableFileStreamBuf>(new WritableFileStreamBuf(
Expand Down Expand Up @@ -982,9 +981,9 @@ Status S3StorageProvider::DoPutCloudObject(const std::string& local_file,
return Status::IOError(local_file, errmsg);
}
} else {
auto inputData =
Aws::MakeShared<Aws::FStream>(object_path.c_str(), local_file.c_str(),
std::ios_base::in | std::ios_base::out);
auto inputData = Aws::MakeShared<Aws::FStream>(
object_path.c_str(), local_file.c_str(),
std::ios_base::in | std::ios_base::binary);

Aws::S3::Model::PutObjectRequest putRequest;
putRequest.SetBucket(ToAwsString(bucket_name));
Expand Down
86 changes: 45 additions & 41 deletions cloud/cloud_env.cc
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Copyright (c) 2017 Rockset.
#ifndef ROCKSDB_LITE

#ifndef _WIN32_WINNT
#include <unistd.h>
#else
#ifdef WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
#include <unordered_map>

Expand Down Expand Up @@ -200,7 +200,8 @@ static std::unordered_map<std::string, OptionTypeInfo>
const char* addr1, const char* addr2, std::string* /*mismatch*/) {
auto bucket1 = reinterpret_cast<const BucketOptions*>(addr1);
auto bucket2 = reinterpret_cast<const BucketOptions*>(addr2);
return bucket1->GetBucketName(false) == bucket2->GetBucketName(false);
return bucket1->GetBucketName(false) ==
bucket2->GetBucketName(false);
}}},
{"TEST",
{0, OptionType::kUnknown, OptionVerificationType::kAlias,
Expand Down Expand Up @@ -338,22 +339,23 @@ Status CloudEnvOptions::Configure(const ConfigOptions& config_options,
}
}
if (s.ok()) {
s = OptionTypeInfo::ParseStruct(config_options, CloudEnvOptions::kName(),
&cloud_env_option_type_info,
CloudEnvOptions::kName(), opts_str, reinterpret_cast<char*>(this));
if (!s.ok()) { // Something went wrong. Attempt to reset
OptionTypeInfo::ParseStruct(config_options, CloudEnvOptions::kName(),
&cloud_env_option_type_info,
CloudEnvOptions::kName(), current, reinterpret_cast<char*>(this));
s = OptionTypeInfo::ParseStruct(
config_options, CloudEnvOptions::kName(), &cloud_env_option_type_info,
CloudEnvOptions::kName(), opts_str, reinterpret_cast<char*>(this));
if (!s.ok()) { // Something went wrong. Attempt to reset
OptionTypeInfo::ParseStruct(
config_options, CloudEnvOptions::kName(), &cloud_env_option_type_info,
CloudEnvOptions::kName(), current, reinterpret_cast<char*>(this));
}
}
return s;
}

Status CloudEnvOptions::Serialize(const ConfigOptions& config_options, std::string* value) const {
return OptionTypeInfo::SerializeStruct(config_options, CloudEnvOptions::kName(),
&cloud_env_option_type_info,
CloudEnvOptions::kName(), reinterpret_cast<const char*>(this), value);

Status CloudEnvOptions::Serialize(const ConfigOptions& config_options,
std::string* value) const {
return OptionTypeInfo::SerializeStruct(
config_options, CloudEnvOptions::kName(), &cloud_env_option_type_info,
CloudEnvOptions::kName(), reinterpret_cast<const char*>(this), value);
}

CloudEnv::CloudEnv(const CloudEnvOptions& options, Env* base,
Expand Down Expand Up @@ -393,12 +395,13 @@ int DoRegisterCloudObjects(ObjectLibrary& library, const std::string& arg) {
int count = 0;
// Register the Env types
library.Register<Env>(
CloudEnvImpl::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<Env>* guard,
std::string* /*errmsg*/) {
guard->reset(new CloudEnvImpl(CloudEnvOptions(), Env::Default(), nullptr));
return guard->get();
});
CloudEnvImpl::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<Env>* guard,
std::string* /*errmsg*/) {
guard->reset(
new CloudEnvImpl(CloudEnvOptions(), Env::Default(), nullptr));
return guard->get();
});
count++;

count += CloudEnvImpl::RegisterAwsObjects(library, arg);
Expand All @@ -416,24 +419,24 @@ int DoRegisterCloudObjects(ObjectLibrary& library, const std::string& arg) {
return guard->get();
});
count++;

return count;
}

void CloudEnv::RegisterCloudObjects(const std::string& arg) {
static std::once_flag do_once;
std::call_once(do_once,
[&]() {
auto library = ObjectLibrary::Default();
DoRegisterCloudObjects(*library, arg);
});
}

Status CloudEnv::CreateFromString(const ConfigOptions& config_options, const std::string& value,
std::call_once(do_once, [&]() {
auto library = ObjectLibrary::Default();
DoRegisterCloudObjects(*library, arg);
});
}

Status CloudEnv::CreateFromString(const ConfigOptions& config_options,
const std::string& value,
std::unique_ptr<CloudEnv>* result) {
RegisterCloudObjects();
std::string id;
std::unordered_map<std::string, std::string> options;
std::unordered_map<std::string, std::string> options;
Status s;
if (value.find("=") == std::string::npos) {
id = value;
Expand Down Expand Up @@ -471,19 +474,20 @@ Status CloudEnv::CreateFromString(const ConfigOptions& config_options, const std
}
}
}

if (s.ok()) {
result->reset(static_cast<CloudEnv*>(env.release()));
}
return s;

return s;
}
Status CloudEnv::CreateFromString(const ConfigOptions& config_options, const std::string& value,
Status CloudEnv::CreateFromString(const ConfigOptions& config_options,
const std::string& value,
const CloudEnvOptions& cloud_options,
std::unique_ptr<CloudEnv>* result) {
RegisterCloudObjects();
std::string id;
std::unordered_map<std::string, std::string> options;
std::unordered_map<std::string, std::string> options;
Status s;
if (value.find("=") == std::string::npos) {
id = value;
Expand Down Expand Up @@ -523,14 +527,14 @@ Status CloudEnv::CreateFromString(const ConfigOptions& config_options, const std
}
}
}

if (s.ok()) {
result->reset(static_cast<CloudEnv*>(env.release()));
}
return s;

return s;
}

#ifndef USE_AWS
Status CloudEnv::NewAwsEnv(Env* /*base_env*/,
const CloudEnvOptions& /*options*/,
Expand Down
6 changes: 3 additions & 3 deletions cloud/cloud_env_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ std::string CloudEnvImpl::RemapFilename(const std::string& logical_path) const {
return logical_path;
}
auto file_name = basename(logical_path);
uint64_t fileNumber;
uint64_t fileNumber = 0;
FileType type;
WalFileType walType;
if (file_name == "MANIFEST") {
Expand Down Expand Up @@ -1497,14 +1497,14 @@ Status CloudEnvImpl::LoadCloudManifest(const std::string& local_dbname,
if (!st.ok()) {
return st;
}

// Do the cleanup, but don't fail if the cleanup fails.
if (!read_only) {
st = DeleteInvisibleFiles(local_dbname);
if (!st.ok()) {
Log(InfoLogLevel::INFO_LEVEL, info_log_,
"Failed to delete invisible files: %s", st.ToString().c_str());
// Ignore the fail
// Ignore the fail
st = Status::OK();
}
}
Expand Down
12 changes: 8 additions & 4 deletions cloud/cloud_env_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ class CloudEnvImpl : public CloudEnv {
// Constructor
CloudEnvImpl(const CloudEnvOptions& options, Env* base_env,
const std::shared_ptr<Logger>& logger);

virtual ~CloudEnvImpl();
static const char *kClassName() { return kCloud(); }
static const char* kClassName() { return kCloud(); }
virtual const char* Name() const override { return kClassName(); }

Status NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& options) override;
Expand Down Expand Up @@ -226,6 +226,11 @@ class CloudEnvImpl : public CloudEnv {
const ImmutableDBOptions& db_options) const override {
return base_env_->OptimizeForCompactionTableRead(env_options, db_options);
}

#ifdef _WIN32_WINNT
#undef GetFreeSpace
#endif

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other places seem to undef this based on it being defined, not based on Windows. Can we follow the same pattern?

Status GetFreeSpace(const std::string& path, uint64_t* diskfree) override {
return base_env_->GetFreeSpace(path, diskfree);
}
Expand All @@ -249,7 +254,6 @@ class CloudEnvImpl : public CloudEnv {
file_deletion_delay_ = delay;
}


Status PrepareOptions(const ConfigOptions& config_options) override;
Status ValidateOptions(const DBOptions& /*db_opts*/,
const ColumnFamilyOptions& /*cf_opts*/) const override;
Expand Down
2 changes: 1 addition & 1 deletion cloud/cloud_file_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ void CloudEnvImpl::log(InfoLogLevel level, const std::string& fname,
const std::string& msg) {
uint64_t usage = cloud_env_options.sst_file_cache->GetUsage();
uint64_t capacity = cloud_env_options.sst_file_cache->GetCapacity();
long percent = (capacity > 0 ? (100L * usage / capacity) : 0);
auto percent = (capacity > 0 ? (100L * usage / capacity) : 0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the change to auto?

Log(level, info_log_,
"[%s] FileCache %s %s cache-used %" PRIu64 "/%" PRIu64 "(%ld%%) bytes",
Name(), fname.c_str(), msg.c_str(), usage, capacity, percent);
Expand Down