Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -5683,7 +5683,9 @@ public static enum ConfVars {
HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true,
"If the query results cache is enabled. This will keep results of previously executed queries " +
"to be reused if the same query is executed again."),

HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED("hive.query.results.cache.safe.write.enabled", false,
"Write results to the query output path, then copy them into the results cache if size limits allow. " +
"Adds overhead from that copy versus writing result files once under the cache directory."),
HIVE_QUERY_RESULTS_CACHE_NONTRANSACTIONAL_TABLES_ENABLED("hive.query.results.cache.nontransactional.tables.enabled", false,
"If the query results cache is enabled for queries involving non-transactional tables." +
"Users who enable this setting should be willing to tolerate some amount of stale results in the cache."),
Expand Down
15 changes: 15 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ public class Context {
// keeps track of result cache dir for the query, later cleaned up by context cleanup
private Path fsResultCacheDirs = null;

/**
* When HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED is true, the absolute path prefix under which
* execution wrote fetchable result files.
*/
private String cacheSafeWriteSourceDir;

private Configuration conf;
protected int pathid = 10000;
private int moveTaskId = 0;
Expand Down Expand Up @@ -452,6 +458,7 @@ protected Context(Context ctx) {
this.scratchDirPermission = ctx.scratchDirPermission;
this.fsScratchDirs.putAll(ctx.fsScratchDirs);
this.fsResultCacheDirs = ctx.fsResultCacheDirs;
this.cacheSafeWriteSourceDir = ctx.cacheSafeWriteSourceDir;
this.conf = ctx.conf;
this.pathid = ctx.pathid;
this.explainConfig = ctx.explainConfig;
Expand Down Expand Up @@ -492,6 +499,14 @@ public Path getFsResultCacheDirs() {
return this.fsResultCacheDirs;
}

public void setCacheSafeWriteSourceDir(String cacheSafeWriteSourceDir) {
this.cacheSafeWriteSourceDir = cacheSafeWriteSourceDir;
}

public String getCacheSafeWriteSourceDir() {
return cacheSafeWriteSourceDir;
}

public Map<LoadTableDesc, WriteEntity> getLoadTableOutputMap() {
return loadTableOutputMap;
}
Expand Down
3 changes: 2 additions & 1 deletion ql/src/java/org/apache/hadoop/hive/ql/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ private void preExecutionCacheActions() throws Exception {
CacheEntry pendingCacheEntry =
QueryResultsCache.getInstance().addToCache(driverContext.getCacheUsage().getQueryInfo(), txnWriteIdList);
if (pendingCacheEntry != null) {
pendingCacheEntry.setSafeSourceDir(context.getCacheSafeWriteSourceDir());
// Update cacheUsage to reference the pending entry.
this.driverContext.getCacheUsage().setCacheEntry(pendingCacheEntry);
}
Expand Down Expand Up @@ -436,7 +437,7 @@ private void postExecutionCacheActions() throws Exception {

CacheEntry cacheEntry = driverContext.getCacheUsage().getCacheEntry();
boolean savedToCache = QueryResultsCache.getInstance().setEntryValid(cacheEntry,
driverContext.getPlan().getFetchTask().getWork());
driverContext.getPlan().getFetchTask().getWork(), driverContext.getConf());
LOG.info("savedToCache: {} ({})", savedToCache, cacheEntry);
if (savedToCache) {
useFetchFromCache(driverContext.getCacheUsage().getCacheEntry());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
Expand Down Expand Up @@ -194,6 +195,12 @@ public static class CacheEntry {
private Path cachedResultsPath;
private Set<FileStatus> cachedResultPaths;

/**
* Absolute path prefix for result files when using safe cache write; see
* {@link org.apache.hadoop.hive.ql.Context#getCacheSafeWriteSourceDir()}.
*/
private String safeSourceDir;

// Cache administration
private long size;
private AtomicInteger readers = new AtomicInteger(0);
Expand Down Expand Up @@ -287,6 +294,14 @@ public QueryInfo getQueryInfo() {
return queryInfo;
}

public void setSafeSourceDir(String safeSourceDir) {
this.safeSourceDir = safeSourceDir;
}

public String getSafeSourceDir() {
return safeSourceDir;
}

public Path getCachedResultsPath() {
return cachedResultsPath;
}
Expand Down Expand Up @@ -415,6 +430,19 @@ public Path getCacheDirPath() {
return cacheDirPath;
}

/**
* Runs {@code action} while holding {@link #rwLock} in exclusive (write) mode.
*/
private void withWriteLock(Runnable action) {
Lock writeLock = rwLock.writeLock();
try {
writeLock.lock();
action.run();
} finally {
writeLock.unlock();
}
}

/**
* Check if the cache contains an entry for the requested LookupInfo.
* @param request
Expand Down Expand Up @@ -494,10 +522,7 @@ public CacheEntry addToCache(QueryInfo queryInfo, ValidTxnWriteIdList txnWriteId
addedEntry.queryInfo = queryInfo;
addedEntry.txnWriteIdList = txnWriteIdList;

Lock writeLock = rwLock.writeLock();
try {
writeLock.lock();

withWriteLock(() -> {
LOG.info("Adding placeholder cache entry for query '{}'", queryText);

// Add the entry to the cache structures while under write lock.
Expand All @@ -506,23 +531,36 @@ public CacheEntry addToCache(QueryInfo queryInfo, ValidTxnWriteIdList txnWriteId
// Index of entries by table usage.
addedEntry.getTableNames()
.forEach(tableName -> addToEntryMap(tableToEntryMap, tableName, addedEntry));
} finally {
writeLock.unlock();
}
});

return addedEntry;
}

public void removeInvalidStaleFiles(FileSystem fs, Set<FileStatus> files) {
withWriteLock(() -> {
for (FileStatus f : files) {
try {
fs.delete(f.getPath(), true);
} catch (IOException e) {
LOG.warn("Failed to clean up stale invalid file: {}",
f.getPath(), e);
}
}
});
}

/**
* Updates a pending cache entry with a FetchWork result from a finished query.
* If successful the cache entry will be set to valid status and be usable for cached queries.
* Important: Adding the entry to the cache will increment the reader count for the cache entry.
* CacheEntry.releaseReader() should be called when the caller is done with the cache entry.
* @param cacheEntry
* @param fetchWork
* @param queryConf session (or query) Hive configuration; used for safe-cache-write and filesystem access
* so per-session {@code SET hive.query.results.cache.safe.write.enabled} is honored
* @return
*/
public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) {
public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork, HiveConf queryConf) {
Path queryResultsPath = null;
Path cachedResultsPath = null;

Expand All @@ -532,7 +570,7 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) {

boolean requiresCaching = true;
queryResultsPath = fetchWork.getTblDir();
FileSystem resultsFs = queryResultsPath.getFileSystem(conf);
FileSystem resultsFs = queryResultsPath.getFileSystem(queryConf);

long resultSize = 0;
for(FileStatus fs:fetchWork.getFilesToFetch()) {
Expand All @@ -549,6 +587,10 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) {
return false;
}

if (!rewriteFetchWorkForSafeCacheWrite(cacheEntry, fetchWork, queryConf)) {
return false;
}

// Synchronize on the cache entry so that no one else can invalidate this entry
// while we are in the process of setting it to valid.
synchronized (cacheEntry) {
Expand Down Expand Up @@ -601,10 +643,61 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) {
return true;
}

private boolean rewriteFetchWorkForSafeCacheWrite(CacheEntry cacheEntry, FetchWork fetchWork, HiveConf queryConf)
throws IOException {
if (!queryConf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED)) {
return true;
}
String safeDir = cacheEntry.getSafeSourceDir();
if (safeDir == null) {
LOG.error("Safe cache write enabled but cache entry has no safe source dir; query: {}",
cacheEntry.getQueryInfo().getLookupInfo().getQueryText());
return false;
}
final int safeDirAndSepLen = safeDir.length() + Path.SEPARATOR.length();
Path resultDir = new Path(cacheDirPath, UUID.randomUUID().toString());
FileSystem cacheFs = resultDir.getFileSystem(queryConf);
cacheFs.mkdirs(resultDir);

Set<FileStatus> cacheFilesToFetch = new HashSet<>();
boolean succeeded =
copyFetchWorkFilesIntoCacheDirUnderWriteLock(fetchWork, resultDir, cacheFs, safeDirAndSepLen,
queryConf, cacheFilesToFetch);
if (!succeeded) {
removeInvalidStaleFiles(cacheFs, cacheFilesToFetch);
return false;
}
fetchWork.setFilesToFetch(cacheFilesToFetch);
fetchWork.setTblDir(new Path(resultDir, fetchWork.getTblDir().toString().substring(safeDirAndSepLen)));
return true;
}

private boolean copyFetchWorkFilesIntoCacheDirUnderWriteLock(FetchWork fetchWork, Path resultDir,
FileSystem cacheFs, int safeDirAndSepLen, HiveConf queryConf, Set<FileStatus> destFileStatuses) {
final boolean[] succeeded = {true};
withWriteLock(() -> {
try {
for (FileStatus fs : fetchWork.getFilesToFetch()) {
FileSystem srcFs = fs.getPath().getFileSystem(queryConf);
Path srcFile = fs.getPath();
Path destFile = new Path(resultDir,
new Path(fs.getPath().toString().substring(safeDirAndSepLen)));
succeeded[0] = FileUtil.copy(srcFs, srcFile, cacheFs, destFile, false, queryConf);
if (!succeeded[0]) {
throw new IOException("File copy failed for " + srcFile + " -> " + destFile);
}
destFileStatuses.add(cacheFs.getFileStatus(destFile));
}
} catch (IOException e) {
LOG.warn("Failed to write cache entry to {}", resultDir, e);
succeeded[0] = false;
}
});
return succeeded[0];
}

public void clear() {
Lock writeLock = rwLock.writeLock();
try {
writeLock.lock();
withWriteLock(() -> {
LOG.info("Clearing the results cache");
CacheEntry[] allEntries = null;
synchronized (lru) {
Expand All @@ -617,9 +710,7 @@ public void clear() {
LOG.error("Error removing cache entry " + entry, err);
}
}
} finally {
writeLock.unlock();
}
});
}

public long getSize() {
Expand All @@ -635,27 +726,21 @@ public long getSize() {
public void notifyTableChanged(String dbName, String tableName, long updateTime) {
LOG.debug("Table changed: {}.{}, at {}", dbName, tableName, updateTime);
// Invalidate all cache entries using this table.
List<CacheEntry> entriesToInvalidate = null;
rwLock.writeLock().lock();
try {
withWriteLock(() -> {
String key = (dbName.toLowerCase() + "." + tableName.toLowerCase());
Set<CacheEntry> entriesForTable = tableToEntryMap.get(key);
if (entriesForTable != null) {
// Possible concurrent modification issues if we try to remove cache entries while
// traversing the cache structures. Save the entries to remove in a separate list.
entriesToInvalidate = new ArrayList<>(entriesForTable);
}
if (entriesToInvalidate != null) {
List<CacheEntry> entriesToInvalidate = new ArrayList<>(entriesForTable);
for (CacheEntry entry : entriesToInvalidate) {
// Ignore updates that occured before this cached query was created.
if (entry.getQueryInfo().getQueryTime() <= updateTime) {
removeEntry(entry);
}
}
}
} finally {
rwLock.writeLock().unlock();
}
});
}

private static final int INITIAL_LRU_SIZE = 16;
Expand Down Expand Up @@ -737,15 +822,12 @@ private boolean entryMatches(LookupInfo lookupInfo, CacheEntry entry, Set<CacheE

public void removeEntry(CacheEntry entry) {
entry.invalidate();
rwLock.writeLock().lock();
try {
withWriteLock(() -> {
removeFromLookup(entry);
lru.remove(entry);
// Should the cache size be updated here, or after the result data has actually been deleted?
cacheSize -= entry.size;
} finally {
rwLock.writeLock().unlock();
}
});
}

private void removeFromLookup(CacheEntry entry) {
Expand Down
25 changes: 15 additions & 10 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -1812,7 +1812,7 @@ boolean doPhase1(ASTNode ast, QB qb, Phase1Ctx ctx_1, PlannerContext plannerCtx,
setSqlKind(SqlKind.INSERT);

case HiveParser.TOK_DESTINATION:
ctx_1.dest = this.ctx.getDestNamePrefix(ast, qb).toString() + ctx_1.nextNum;
ctx_1.dest = ctx.getDestNamePrefix(ast, qb).toString() + ctx_1.nextNum;
ctx_1.nextNum++;
boolean isTmpFileDest = false;
if (ast.getChildCount() > 0 && ast.getChild(0) instanceof ASTNode) {
Expand Down Expand Up @@ -7460,7 +7460,7 @@ private Operator genConstraintsPlan(String dest, QB qb, Operator input) throws S
return input;
}

if (updating(dest) && isCBOExecuted() && this.ctx.getOperation() != Context.Operation.MERGE) {
if (updating(dest) && isCBOExecuted() && ctx.getOperation() != Context.Operation.MERGE) {
// for UPDATE statements CBO already added and pushed down the constraints
return input;
}
Expand Down Expand Up @@ -7500,19 +7500,24 @@ protected Table getTargetTable(QB qb, String dest) throws SemanticException {
}

private Path getDestinationFilePath(QB qb, final String destinationFile, boolean isMmTable) {
Path defaultPath = new Path(destinationFile);
if (this.isResultsCacheEnabled() && this.queryTypeCanUseCache(qb)) {
assert (!isMmTable);
QueryResultsCache instance = QueryResultsCache.getInstance();
// QueryResultsCache should have been initialized by now
if (instance != null) {
Path resultCacheTopDir = instance.getCacheDirPath();
String dirName = UUID.randomUUID().toString();
Path resultDir = new Path(resultCacheTopDir, dirName);
this.ctx.setFsResultCacheDirs(resultDir);
return resultDir;
if (!conf.getBoolVar(ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED)) {
Path resultCacheTopDir = instance.getCacheDirPath();
String dirName = UUID.randomUUID().toString();
Path resultDir = new Path(resultCacheTopDir, dirName);
ctx.setFsResultCacheDirs(resultDir);
return resultDir;
} else {
ctx.setCacheSafeWriteSourceDir(defaultPath.toString());
}
}
}
return new Path(destinationFile);
return defaultPath;
}

@SuppressWarnings("nls")
Expand Down Expand Up @@ -12849,7 +12854,7 @@ private void walkASTMarkTABREF(TableMask tableMask, ASTNode ast, Set<String> cte
} else {
List<String> colNames;
List<String> colTypes;
if (this.ctx.isCboSucceeded() && this.columnAccessInfo != null &&
if (ctx.isCboSucceeded() && this.columnAccessInfo != null &&
(colNames = this.columnAccessInfo.getTableToColumnAllAccessMap().get(table.getCompleteName())) != null) {
Map<String, String> colNameToType = table.getAllCols().stream()
.collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getType));
Expand Down Expand Up @@ -13271,7 +13276,7 @@ void analyzeInternal(ASTNode ast, Supplier<PlannerContext> pcf) throws SemanticE

// 3. Deduce Resultset Schema
perfLogger.perfLogBegin(this.getClass().getName(), PerfLogger.DEDUCE_RESULTSET_SCHEMA);
if ((forViewCreation || createVwDesc != null) && !this.ctx.isCboSucceeded()) {
if ((forViewCreation || createVwDesc != null) && !ctx.isCboSucceeded()) {
resultSchema = convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRowResolver());
} else {
// resultSchema will be null if
Expand Down
Loading
Loading