Skip to content

Commit

Permalink
Guice Bindings and API wirings to trigger usage of Incremental Archiv…
Browse files Browse the repository at this point in the history
…er (#411)

* add package backup, add classes to deal with incremental backups and tests

* add FileCompressAndUploader, BackupDiffManager as an impl of Archiver

* add tests for end to end backup and download, incremental and parallel

* added backup-restore tool, added NoTar implementation

* refactor backup tests to reuse base class for S3Mock, add IndexArchiver to handle index file path and also backup index state files

* fix backup-restore tool to use updated archiver

* remove unused code, cleanup tests

* address PR comments

* address PR comments for better tests

* address PR comments

* hook up IncrementalArchiver in Backup and StartIndex APIs

* add upload index support to commit

* IndexArchiver now supports backingup and restoring global state

* optionally restore metadata using incarchiver at startup

* add grpc-gateway files

* fix s3_incremental_backups rebase conflicts

* resolve merge conflicts

* remove redundant code

* address PR comments

* spotlessApply

* address PR comments: remove flag for using IncArchiver from requests and use from configuration file instead

* address PR comment: extract variable
  • Loading branch information
umeshdangat authored Nov 30, 2021
1 parent 5f3dbe4 commit df07404
Show file tree
Hide file tree
Showing 23 changed files with 709 additions and 189 deletions.
2 changes: 1 addition & 1 deletion clientlib/src/main/proto/yelp/nrtsearch/luceneserver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ message RefreshResponse {
}

message CommitRequest {
string indexName = 1; //index to commit
string indexName = 1; //index to commit
}

message CommitResponse {
Expand Down
250 changes: 250 additions & 0 deletions src/main/java/com/yelp/nrtsearch/ArchiverModule.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
/*
* Copyright 2021 Yelp Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yelp.nrtsearch;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.AnonymousAWSCredentials;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.auth.profile.ProfilesConfigFile;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.google.inject.*;
import com.google.inject.name.Named;
import com.google.inject.name.Names;
import com.yelp.nrtsearch.server.backup.*;
import com.yelp.nrtsearch.server.config.LuceneServerConfiguration;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ArchiverModule extends AbstractModule {
private static final Logger logger = LoggerFactory.getLogger(ArchiverModule.class);

@Override
protected void configure() {
bind(ContentDownloader.class)
.annotatedWith(Names.named("contentDownloaderNoTar"))
.toProvider(ContentDownloaderNoTar.class)
.asEagerSingleton();
bind(ContentDownloader.class)
.annotatedWith(Names.named("contentDownloaderWithTar"))
.toProvider(ContentDownloaderWithTar.class)
.asEagerSingleton();
bind(FileCompressAndUploader.class)
.annotatedWith(Names.named("fileCompressAndUploaderNoTar"))
.toProvider(FileCompressAndUploaderNoTar.class)
.asEagerSingleton();
bind(FileCompressAndUploader.class)
.annotatedWith(Names.named("fileCompressAndUploaderWithTar"))
.toProvider(FileCompressAndUploaderWithTar.class)
.asEagerSingleton();
bind(Archiver.class)
.annotatedWith(Names.named("legacyArchiver"))
.toProvider(LegacyArchiverProvider.class)
.asEagerSingleton();
bind(BackupDiffManager.class).toProvider(BackupDiffManagerProvider.class).asEagerSingleton();
bind(Archiver.class)
.annotatedWith(Names.named("incArchiver"))
.toProvider(IncArchiverProvider.class)
.asEagerSingleton();
}

@Inject
@Singleton
@Provides
public Tar providesTar() {
return new TarImpl(Tar.CompressionMode.LZ4);
}

@Inject
@Singleton
@Provides
protected AmazonS3 providesAmazonS3(LuceneServerConfiguration luceneServerConfiguration) {
if (luceneServerConfiguration
.getBotoCfgPath()
.equals(LuceneServerConfiguration.DEFAULT_BOTO_CFG_PATH.toString())) {
return AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new AnonymousAWSCredentials()))
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration("dummyService", "dummyRegion"))
.build();
} else {
Path botoCfgPath = Paths.get(luceneServerConfiguration.getBotoCfgPath());
final ProfilesConfigFile profilesConfigFile = new ProfilesConfigFile(botoCfgPath.toFile());
final AWSCredentialsProvider awsCredentialsProvider =
new ProfileCredentialsProvider(profilesConfigFile, "default");
AmazonS3 s3ClientInterim =
AmazonS3ClientBuilder.standard().withCredentials(awsCredentialsProvider).build();
String region = s3ClientInterim.getBucketLocation(luceneServerConfiguration.getBucketName());
// In useast-1, the region is returned as "US" which is an equivalent to "us-east-1"
// https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/model/Region.html#US_Standard
// However, this causes an UnknownHostException so we override it to the full region name
if (region.equals("US")) {
region = "us-east-1";
}
String serviceEndpoint = String.format("s3.%s.amazonaws.com", region);
logger.info(String.format("S3 ServiceEndpoint: %s", serviceEndpoint));
return AmazonS3ClientBuilder.standard()
.withCredentials(awsCredentialsProvider)
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, region))
.build();
}
}

private static class ContentDownloaderNoTar implements Provider<ContentDownloader> {
private static final int NUM_S3_THREADS = 20;

@Inject AmazonS3 s3;
@Inject LuceneServerConfiguration luceneServerConfiguration;

public ContentDownloader get() {
return new ContentDownloaderImpl(
new NoTarImpl(),
TransferManagerBuilder.standard()
.withS3Client(s3)
.withExecutorFactory(() -> Executors.newFixedThreadPool(NUM_S3_THREADS))
.withShutDownThreadPools(false)
.build(),
luceneServerConfiguration.getBucketName(),
true);
}
}

private static class ContentDownloaderWithTar implements Provider<ContentDownloader> {
private static final int NUM_S3_THREADS = 20;

@Inject AmazonS3 s3;
@Inject LuceneServerConfiguration luceneServerConfiguration;

public ContentDownloader get() {
return new ContentDownloaderImpl(
new TarImpl(Tar.CompressionMode.LZ4),
TransferManagerBuilder.standard()
.withS3Client(s3)
.withExecutorFactory(() -> Executors.newFixedThreadPool(NUM_S3_THREADS))
.withShutDownThreadPools(false)
.build(),
luceneServerConfiguration.getBucketName(),
true);
}
}

private static class FileCompressAndUploaderNoTar implements Provider<FileCompressAndUploader> {
private static final int NUM_S3_THREADS = 20;

@Inject AmazonS3 s3;
@Inject LuceneServerConfiguration luceneServerConfiguration;

public FileCompressAndUploader get() {
return new FileCompressAndUploader(
new NoTarImpl(),
TransferManagerBuilder.standard()
.withS3Client(s3)
.withExecutorFactory(() -> Executors.newFixedThreadPool(NUM_S3_THREADS))
.withShutDownThreadPools(false)
.build(),
luceneServerConfiguration.getBucketName());
}
}

private static class FileCompressAndUploaderWithTar implements Provider<FileCompressAndUploader> {
private static final int NUM_S3_THREADS = 20;

@Inject AmazonS3 s3;
@Inject LuceneServerConfiguration luceneServerConfiguration;

public FileCompressAndUploader get() {
return new FileCompressAndUploader(
new TarImpl(Tar.CompressionMode.LZ4),
TransferManagerBuilder.standard()
.withS3Client(s3)
.withExecutorFactory(() -> Executors.newFixedThreadPool(NUM_S3_THREADS))
.withShutDownThreadPools(false)
.build(),
luceneServerConfiguration.getBucketName());
}
}

private static class BackupDiffManagerProvider implements Provider<BackupDiffManager> {
@Inject
@Named("fileCompressAndUploaderNoTar")
FileCompressAndUploader fileCompressAndUploader;

@Inject
@Named("contentDownloaderNoTar")
ContentDownloader contentDownloader;

@Inject AmazonS3 s3;
@Inject LuceneServerConfiguration luceneServerConfiguration;

@Override
public BackupDiffManager get() {
return new BackupDiffManager(
contentDownloader,
fileCompressAndUploader,
new VersionManager(s3, luceneServerConfiguration.getBucketName()),
Paths.get(luceneServerConfiguration.getArchiveDirectory()));
}
}

private static class LegacyArchiverProvider implements Provider<Archiver> {
@Inject AmazonS3 s3;
@Inject LuceneServerConfiguration luceneServerConfiguration;
@Inject Tar tar;

@Override
public Archiver get() {
Path archiveDir = Paths.get(luceneServerConfiguration.getArchiveDirectory());
return new ArchiverImpl(
s3,
luceneServerConfiguration.getBucketName(),
archiveDir,
tar,
luceneServerConfiguration.getDownloadAsStream());
}
}

private static class IncArchiverProvider implements Provider<Archiver> {
@Inject BackupDiffManager backupDiffManager;

@Inject
@Named("fileCompressAndUploaderWithTar")
FileCompressAndUploader fileCompressAndUploader;

@Inject
@Named("contentDownloaderWithTar")
ContentDownloader contentDownloader;

@Inject LuceneServerConfiguration luceneServerConfiguration;
@Inject AmazonS3 s3;

@Override
public Archiver get() {
return new IndexArchiver(
backupDiffManager,
fileCompressAndUploader,
contentDownloader,
new VersionManager(s3, luceneServerConfiguration.getBucketName()),
Paths.get(luceneServerConfiguration.getArchiveDirectory()));
}
}
}
72 changes: 3 additions & 69 deletions src/main/java/com/yelp/nrtsearch/LuceneServerModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,46 +15,30 @@
*/
package com.yelp.nrtsearch;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.AnonymousAWSCredentials;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.auth.profile.ProfilesConfigFile;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.yelp.nrtsearch.server.backup.*;
import com.yelp.nrtsearch.server.config.LuceneServerConfiguration;
import com.yelp.nrtsearch.server.grpc.LuceneServer;
import io.prometheus.client.CollectorRegistry;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LuceneServerModule extends AbstractModule {
private static final String DEFAULT_CONFIG_FILE_RESOURCE =
"/lucene_server_default_configuration.yaml";
private final LuceneServer.LuceneServerCommand args;
private static final Logger logger = LoggerFactory.getLogger(LuceneServerModule.class);

public LuceneServerModule(LuceneServer.LuceneServerCommand args) {
this.args = args;
}

@Inject
@Singleton
@Provides
public Tar providesTar() {
return new TarImpl(Tar.CompressionMode.LZ4);
@Override
protected void configure() {
install(new ArchiverModule());
}

@Inject
Expand All @@ -64,56 +48,6 @@ public CollectorRegistry providesCollectorRegistry() {
return new CollectorRegistry();
}

@Inject
@Singleton
@Provides
protected AmazonS3 providesAmazonS3(LuceneServerConfiguration luceneServerConfiguration) {
if (luceneServerConfiguration
.getBotoCfgPath()
.equals(LuceneServerConfiguration.DEFAULT_BOTO_CFG_PATH.toString())) {
return AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new AnonymousAWSCredentials()))
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration("dummyService", "dummyRegion"))
.build();
} else {
Path botoCfgPath = Paths.get(luceneServerConfiguration.getBotoCfgPath());
final ProfilesConfigFile profilesConfigFile = new ProfilesConfigFile(botoCfgPath.toFile());
final AWSCredentialsProvider awsCredentialsProvider =
new ProfileCredentialsProvider(profilesConfigFile, "default");
AmazonS3 s3ClientInterim =
AmazonS3ClientBuilder.standard().withCredentials(awsCredentialsProvider).build();
String region = s3ClientInterim.getBucketLocation(luceneServerConfiguration.getBucketName());
// In useast-1, the region is returned as "US" which is an equivalent to "us-east-1"
// https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/model/Region.html#US_Standard
// However, this causes an UnknownHostException so we override it to the full region name
if (region.equals("US")) {
region = "us-east-1";
}
String serviceEndpoint = String.format("s3.%s.amazonaws.com", region);
logger.info(String.format("S3 ServiceEndpoint: %s", serviceEndpoint));
return AmazonS3ClientBuilder.standard()
.withCredentials(awsCredentialsProvider)
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, region))
.build();
}
}

@Inject
@Singleton
@Provides
protected Archiver providesArchiver(
LuceneServerConfiguration luceneServerConfiguration, AmazonS3 amazonS3, Tar tar) {
Path archiveDir = Paths.get(luceneServerConfiguration.getArchiveDirectory());
return new ArchiverImpl(
amazonS3,
luceneServerConfiguration.getBucketName(),
archiveDir,
tar,
luceneServerConfiguration.getDownloadAsStream());
}

@Inject
@Singleton
@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileInputStream;
Expand Down Expand Up @@ -182,6 +183,7 @@ public static void serializeFileNames(List<String> indexFileNames, Path destBack
}
}

@Inject
public BackupDiffManager(
final ContentDownloader contentDownloader,
final FileCompressAndUploader fileCompressAndUploader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.amazonaws.services.s3.transfer.PersistableTransfer;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.internal.S3ProgressListener;
import com.google.inject.Inject;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -59,6 +60,7 @@ public class ContentDownloaderImpl implements ContentDownloader {
private final ThreadPoolExecutor executor =
(ThreadPoolExecutor) Executors.newFixedThreadPool(NUM_S3_THREADS);

@Inject
public ContentDownloaderImpl(
Tar tar, TransferManager transferManager, String bucketName, boolean downloadAsStream) {
this.tar = tar;
Expand Down
Loading

0 comments on commit df07404

Please sign in to comment.