Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.fluss.fs.gs;

import org.apache.fluss.fs.FileSystem;
import org.apache.fluss.fs.gs.token.GSImpersonatedTokenProvider;
import org.apache.fluss.fs.hdfs.HadoopFileSystem;
import org.apache.fluss.fs.token.ObtainedSecurityToken;

import org.apache.hadoop.conf.Configuration;

Expand All @@ -34,6 +36,8 @@ public class GSFileSystem extends HadoopFileSystem {
private final String scheme;
private final Configuration conf;

private volatile GSImpersonatedTokenProvider tokenProvider;

/**
* Creates a GSFileSystem based on the given Hadoop Google Cloud Storage file system. The given
* Hadoop file system object is expected to be initialized already.
Expand All @@ -48,4 +52,16 @@ public GSFileSystem(
this.scheme = scheme;
this.conf = conf;
}

@Override
public ObtainedSecurityToken obtainSecurityToken() {
if (tokenProvider == null) {
synchronized (this) {
if (tokenProvider == null) {
tokenProvider = new GSImpersonatedTokenProvider(scheme, conf);
}
}
}
return tokenProvider.obtainSecurityToken();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class GSFileSystemPlugin implements FileSystemPlugin {

private static final String[] FLUSS_CONFIG_PREFIXES = {"gs.", "fs.gs."};

private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
public static final String HADOOP_CONFIG_PREFIX = "fs.gs.";

@Override
public String getScheme() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.fs.gs.token;

import org.apache.fluss.exception.FlussRuntimeException;

import com.google.cloud.hadoop.util.AccessTokenProvider;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/** Delegation token provider for GCS Hadoop filesystems. */
public class GSImperonatedAccessTokenProvider implements AccessTokenProvider {

public static final String NAME = GSImperonatedAccessTokenProvider.class.getName();

public static final String COMPONENT = "Dynamic session credentials for Fluss";

private Configuration configuration;

private static final Logger LOG =
LoggerFactory.getLogger(GSImperonatedAccessTokenProvider.class);

@Override
public AccessToken getAccessToken() {
AccessTokenProvider.AccessToken accessToken = GSImpersonatedTokenReceiver.getAccessToken();

if (accessToken == null) {
throw new FlussRuntimeException(
GSImperonatedAccessTokenProvider.COMPONENT + " not set");
}

LOG.debug("Providing session credentials");

return accessToken;
}

@Override
public void refresh() throws IOException {
// Intentionally blank. Credentials are updated by GSImpersonatedTokenReceiver
}

@Override
public void setConf(Configuration configuration) {
this.configuration = configuration;
}

@Override
public Configuration getConf() {
return configuration;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.fs.gs.token;

import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.fs.token.CredentialsJsonSerde;
import org.apache.fluss.fs.token.ObtainedSecurityToken;

import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.ComputeEngineCredentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ImpersonatedCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import static com.google.cloud.hadoop.util.HadoopCredentialConfiguration.SERVICE_ACCOUNT_JSON_KEYFILE_SUFFIX;
import static org.apache.fluss.fs.gs.GSFileSystemPlugin.HADOOP_CONFIG_PREFIX;

/** Impersonation token provider for GCS Hadoop filesystems. */
public class GSImpersonatedTokenProvider {

private static final Logger LOG = LoggerFactory.getLogger(GSImpersonatedTokenProvider.class);

private final String scheme;

private GoogleCredentials googleCredentials;
private String targetPrincipal;

private static final String AUTH_TYPE_SUFFIX = ".auth.type";

public GSImpersonatedTokenProvider(String scheme, Configuration conf) {
this.scheme = scheme;
Pair<String, GoogleCredentials> pair = extractProvider(conf);
googleCredentials = pair.getValue();
targetPrincipal = pair.getKey();
}

public ObtainedSecurityToken obtainSecurityToken() {
LOG.info("Obtaining session credentials token");

String scope = "https://www.googleapis.com/auth/cloud-platform";

List<String> scopes = new ArrayList<>();
scopes.add(scope);

ImpersonatedCredentials impersonatedCredentials =
ImpersonatedCredentials.newBuilder()
.setSourceCredentials(googleCredentials)
.setTargetPrincipal(targetPrincipal)
.setScopes(scopes)
.setLifetime(3600)
.setDelegates(null)
.build();

AccessToken accessToken = impersonatedCredentials.getAccessToken();
LOG.info(
"Session credentials obtained successfully with expiration: {}",
accessToken.getExpirationTime());

return new ObtainedSecurityToken(
scheme,
toJson(accessToken),
accessToken.getExpirationTime().getTime(),
new HashMap<>());
}

private byte[] toJson(AccessToken accessToken) {
org.apache.fluss.fs.token.Credentials flussCredentials =
new org.apache.fluss.fs.token.Credentials(null, null, accessToken.getTokenValue());
return CredentialsJsonSerde.toJson(flussCredentials);
}

private static Pair<String, GoogleCredentials> extractProvider(Configuration conf) {
final String authType = conf.get(HADOOP_CONFIG_PREFIX + AUTH_TYPE_SUFFIX);
if (authType.equals("COMPUTE_ENGINE")) {
ComputeEngineCredentials credentials = getComputeEngineCredentials();
return Pair.of(credentials.getAccount(), credentials);
} else if (authType.equals("SERVICE_ACCOUNT_JSON_KEYFILE")) {
ServiceAccountCredentials credentials = getServiceAccountCredentials(conf);
return Pair.of(credentials.getAccount(), credentials);
} else if (authType.equals("UNAUTHENTICATED")) {
return null;
} else {
throw new IllegalArgumentException("Unsupported authentication type: " + authType);
}
}

private static ComputeEngineCredentials getComputeEngineCredentials() {
ComputeEngineCredentials credentials = ComputeEngineCredentials.newBuilder().build();
credentials.getAccount();
return credentials;
}

private static ServiceAccountCredentials getServiceAccountCredentials(Configuration conf) {
List<String> prefixes = new ArrayList<>();
prefixes.add(HADOOP_CONFIG_PREFIX);

String keyFile =
SERVICE_ACCOUNT_JSON_KEYFILE_SUFFIX.withPrefixes(prefixes).get(conf, conf::get);
try (FileInputStream fis = new FileInputStream(keyFile)) {
ServiceAccountCredentials accountCredentials =
ServiceAccountCredentials.fromStream(fis);
accountCredentials.getAccount();
return accountCredentials;
} catch (IOException e) {
throw new FlussRuntimeException("Fail to read service account json file" + e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.fs.gs.token;

import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.fs.token.Credentials;
import org.apache.fluss.fs.token.CredentialsJsonSerde;
import org.apache.fluss.fs.token.ObtainedSecurityToken;
import org.apache.fluss.fs.token.SecurityTokenReceiver;

import com.google.cloud.hadoop.util.AccessTokenProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Security token receiver for GCS filesystem. */
public class GSImpersonatedTokenReceiver implements SecurityTokenReceiver {

public static final String PROVIDER_CONFIG_NAME = "fs.gs.auth.access.token.provider.impl";

private static final Logger LOG = LoggerFactory.getLogger(GSImpersonatedTokenReceiver.class);

static volatile AccessTokenProvider.AccessToken accessToken;

public static void updateHadoopConfig(org.apache.hadoop.conf.Configuration hadoopConfig) {
LOG.info("Updating Hadoop configuration");

String providers = hadoopConfig.get(PROVIDER_CONFIG_NAME, "");

if (!providers.contains(GSImperonatedAccessTokenProvider.NAME)) {
if (providers.isEmpty()) {
LOG.debug("Setting provider");
providers = GSImperonatedAccessTokenProvider.NAME;
} else {
providers = GSImperonatedAccessTokenProvider.NAME + "," + providers;
LOG.debug("Prepending provider, new providers value: {}", providers);
}

hadoopConfig.set(PROVIDER_CONFIG_NAME, providers);
} else {
LOG.debug("Provider already exists");
}

if (accessToken == null) {
throw new FlussRuntimeException(
GSImperonatedAccessTokenProvider.COMPONENT + " not set");
}

LOG.info("Updated Hadoop configuration successfully");
}

@Override
public String scheme() {
return "gs";
}

@Override
public void onNewTokensObtained(ObtainedSecurityToken token) throws Exception {
LOG.info("Updating session credentials");

byte[] tokenBytes = token.getToken();

Credentials credentials = CredentialsJsonSerde.fromJson(tokenBytes);

accessToken =
new AccessTokenProvider.AccessToken(
credentials.getSecurityToken(), token.getValidUntil().get());

LOG.info(
"Session credentials updated successfully with access key: {}.",
credentials.getAccessKeyId());
}

public static AccessTokenProvider.AccessToken getAccessToken() {
return accessToken;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ static void createHDFS(@TempDir File tmp) throws Exception {

@AfterAll
static void destroyHDFS() throws Exception {
if (hdfsCluster != null) {
hdfsCluster
.getFileSystem()
.delete(new org.apache.hadoop.fs.Path(basePath.toUri()), true);
hdfsCluster.shutdown();
}
// if (hdfsCluster != null) {
// hdfsCluster
// .getFileSystem()
// .delete(new org.apache.hadoop.fs.Path(basePath.toUri()), true);
// hdfsCluster.shutdown();
// }
}

@Test
Expand Down
Loading