Skip to content

Commit

Permalink
Allow custom FileSystem logic in HadoopTableOperations. (Netflix#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
mccheah authored and rdblue committed Nov 27, 2018
1 parent 37cf362 commit 1cafa4b
Showing 1 changed file with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
* <p>
* This maintains metadata in a "metadata" folder under the table location.
*/
class HadoopTableOperations implements TableOperations {
public class HadoopTableOperations implements TableOperations {
private static final Logger LOG = LoggerFactory.getLogger(HadoopTableOperations.class);

private final Configuration conf;
Expand All @@ -54,7 +54,7 @@ class HadoopTableOperations implements TableOperations {
private Integer version = null;
private boolean shouldRefresh = true;

HadoopTableOperations(Path location, Configuration conf) {
protected HadoopTableOperations(Path location, Configuration conf) {
this.conf = conf;
this.location = location;
}
Expand All @@ -70,7 +70,7 @@ public TableMetadata current() {
public TableMetadata refresh() {
int ver = version != null ? version : readVersionHint();
Path metadataFile = metadataFile(ver);
FileSystem fs = Util.getFS(metadataFile, conf);
FileSystem fs = getFS(metadataFile, conf);
try {
// don't check if the file exists if version is non-null because it was already checked
if (version == null && !fs.exists(metadataFile)) {
Expand Down Expand Up @@ -112,7 +112,7 @@ public void commit(TableMetadata base, TableMetadata metadata) {

int nextVersion = (version != null ? version : 0) + 1;
Path finalMetadataFile = metadataFile(nextVersion);
FileSystem fs = Util.getFS(tempMetadataFile, conf);
FileSystem fs = getFS(tempMetadataFile, conf);

try {
if (fs.exists(finalMetadataFile)) {
Expand Down Expand Up @@ -154,7 +154,7 @@ public OutputFile newMetadataFile(String filename) {
@Override
public void deleteFile(String path) {
Path toDelete = new Path(path);
FileSystem fs = Util.getFS(toDelete, conf);
FileSystem fs = getFS(toDelete, conf);
try {
fs.delete(toDelete, false /* not recursive */ );
} catch (IOException e) {
Expand All @@ -181,7 +181,7 @@ private Path versionHintFile() {

private void writeVersionHint(int version) {
Path versionHintFile = versionHintFile();
FileSystem fs = Util.getFS(versionHintFile, conf);
FileSystem fs = getFS(versionHintFile, conf);

try (FSDataOutputStream out = fs.create(versionHintFile, true /* overwrite */ )) {
out.write(String.valueOf(version).getBytes("UTF-8"));
Expand All @@ -207,4 +207,8 @@ private int readVersionHint() {
throw new RuntimeIOException(e, "Failed to get file system for path: %s", versionHintFile);
}
}

protected FileSystem getFS(Path path, Configuration conf) {
return Util.getFS(path, conf);
}
}

0 comments on commit 1cafa4b

Please sign in to comment.