diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 31b5e32c2ddb..f28339f92830 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -5683,7 +5683,9 @@ public static enum ConfVars { HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true, "If the query results cache is enabled. This will keep results of previously executed queries " + "to be reused if the same query is executed again."), - + HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED("hive.query.results.cache.safe.write.enabled", false, + "Write results to the query output path, then copy them into the results cache if size limits allow. " + + "Adds overhead from that copy versus writing result files once under the cache directory."), HIVE_QUERY_RESULTS_CACHE_NONTRANSACTIONAL_TABLES_ENABLED("hive.query.results.cache.nontransactional.tables.enabled", false, "If the query results cache is enabled for queries involving non-transactional tables." + "Users who enable this setting should be willing to tolerate some amount of stale results in the cache."), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 138787a733e3..34eb1d388c02 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -107,6 +107,12 @@ public class Context { // keeps track of result cache dir for the query, later cleaned up by context cleanup private Path fsResultCacheDirs = null; + /** + * When HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED is true, the absolute path prefix under which + * execution wrote fetchable result files. + */ + private String cacheSafeWriteSourceDir; + private Configuration conf; protected int pathid = 10000; private int moveTaskId = 0; @@ -452,6 +458,7 @@ protected Context(Context ctx) { this.scratchDirPermission = ctx.scratchDirPermission; this.fsScratchDirs.putAll(ctx.fsScratchDirs); this.fsResultCacheDirs = ctx.fsResultCacheDirs; + this.cacheSafeWriteSourceDir = ctx.cacheSafeWriteSourceDir; this.conf = ctx.conf; this.pathid = ctx.pathid; this.explainConfig = ctx.explainConfig; @@ -492,6 +499,14 @@ public Path getFsResultCacheDirs() { return this.fsResultCacheDirs; } + public void setCacheSafeWriteSourceDir(String cacheSafeWriteSourceDir) { + this.cacheSafeWriteSourceDir = cacheSafeWriteSourceDir; + } + + public String getCacheSafeWriteSourceDir() { + return cacheSafeWriteSourceDir; + } + public Map getLoadTableOutputMap() { return loadTableOutputMap; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Executor.java b/ql/src/java/org/apache/hadoop/hive/ql/Executor.java index 708e3870efa4..f2ee1c393ee0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Executor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Executor.java @@ -227,6 +227,7 @@ private void preExecutionCacheActions() throws Exception { CacheEntry pendingCacheEntry = QueryResultsCache.getInstance().addToCache(driverContext.getCacheUsage().getQueryInfo(), txnWriteIdList); if (pendingCacheEntry != null) { + pendingCacheEntry.setSafeSourceDir(context.getCacheSafeWriteSourceDir()); // Update cacheUsage to reference the pending entry. this.driverContext.getCacheUsage().setCacheEntry(pendingCacheEntry); } @@ -436,7 +437,7 @@ private void postExecutionCacheActions() throws Exception { CacheEntry cacheEntry = driverContext.getCacheUsage().getCacheEntry(); boolean savedToCache = QueryResultsCache.getInstance().setEntryValid(cacheEntry, - driverContext.getPlan().getFetchTask().getWork()); + driverContext.getPlan().getFetchTask().getWork(), driverContext.getConf()); LOG.info("savedToCache: {} ({})", savedToCache, cacheEntry); if (savedToCache) { useFetchFromCache(driverContext.getCacheUsage().getCacheEntry()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java index 6713219d2c34..ff3328eccf2f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java @@ -50,6 +50,7 @@ import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.metrics.common.Metrics; @@ -194,6 +195,12 @@ public static class CacheEntry { private Path cachedResultsPath; private Set cachedResultPaths; + /** + * Absolute path prefix for result files when using safe cache write; see + * {@link org.apache.hadoop.hive.ql.Context#getCacheSafeWriteSourceDir()}. + */ + private String safeSourceDir; + // Cache administration private long size; private AtomicInteger readers = new AtomicInteger(0); @@ -287,6 +294,14 @@ public QueryInfo getQueryInfo() { return queryInfo; } + public void setSafeSourceDir(String safeSourceDir) { + this.safeSourceDir = safeSourceDir; + } + + public String getSafeSourceDir() { + return safeSourceDir; + } + public Path getCachedResultsPath() { return cachedResultsPath; } @@ -415,6 +430,19 @@ public Path getCacheDirPath() { return cacheDirPath; } + /** + * Runs {@code action} while holding {@link #rwLock} in exclusive (write) mode. + */ + private void withWriteLock(Runnable action) { + Lock writeLock = rwLock.writeLock(); + try { + writeLock.lock(); + action.run(); + } finally { + writeLock.unlock(); + } + } + /** * Check if the cache contains an entry for the requested LookupInfo. * @param request @@ -494,10 +522,7 @@ public CacheEntry addToCache(QueryInfo queryInfo, ValidTxnWriteIdList txnWriteId addedEntry.queryInfo = queryInfo; addedEntry.txnWriteIdList = txnWriteIdList; - Lock writeLock = rwLock.writeLock(); - try { - writeLock.lock(); - + withWriteLock(() -> { LOG.info("Adding placeholder cache entry for query '{}'", queryText); // Add the entry to the cache structures while under write lock. @@ -506,13 +531,24 @@ public CacheEntry addToCache(QueryInfo queryInfo, ValidTxnWriteIdList txnWriteId // Index of entries by table usage. addedEntry.getTableNames() .forEach(tableName -> addToEntryMap(tableToEntryMap, tableName, addedEntry)); - } finally { - writeLock.unlock(); - } + }); return addedEntry; } + public void removeInvalidStaleFiles(FileSystem fs, Set files) { + withWriteLock(() -> { + for (FileStatus f : files) { + try { + fs.delete(f.getPath(), true); + } catch (IOException e) { + LOG.warn("Failed to clean up stale invalid file: {}", + f.getPath(), e); + } + } + }); + } + /** * Updates a pending cache entry with a FetchWork result from a finished query. * If successful the cache entry will be set to valid status and be usable for cached queries. @@ -520,9 +556,11 @@ public CacheEntry addToCache(QueryInfo queryInfo, ValidTxnWriteIdList txnWriteId * CacheEntry.releaseReader() should be called when the caller is done with the cache entry. * @param cacheEntry * @param fetchWork + * @param queryConf session (or query) Hive configuration; used for safe-cache-write and filesystem access + * so per-session {@code SET hive.query.results.cache.safe.write.enabled} is honored * @return */ - public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) { + public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork, HiveConf queryConf) { Path queryResultsPath = null; Path cachedResultsPath = null; @@ -532,7 +570,7 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) { boolean requiresCaching = true; queryResultsPath = fetchWork.getTblDir(); - FileSystem resultsFs = queryResultsPath.getFileSystem(conf); + FileSystem resultsFs = queryResultsPath.getFileSystem(queryConf); long resultSize = 0; for(FileStatus fs:fetchWork.getFilesToFetch()) { @@ -549,6 +587,10 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) { return false; } + if (!rewriteFetchWorkForSafeCacheWrite(cacheEntry, fetchWork, queryConf)) { + return false; + } + // Synchronize on the cache entry so that no one else can invalidate this entry // while we are in the process of setting it to valid. synchronized (cacheEntry) { @@ -601,10 +643,61 @@ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) { return true; } + private boolean rewriteFetchWorkForSafeCacheWrite(CacheEntry cacheEntry, FetchWork fetchWork, HiveConf queryConf) + throws IOException { + if (!queryConf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED)) { + return true; + } + String safeDir = cacheEntry.getSafeSourceDir(); + if (safeDir == null) { + LOG.error("Safe cache write enabled but cache entry has no safe source dir; query: {}", + cacheEntry.getQueryInfo().getLookupInfo().getQueryText()); + return false; + } + final int safeDirAndSepLen = safeDir.length() + Path.SEPARATOR.length(); + Path resultDir = new Path(cacheDirPath, UUID.randomUUID().toString()); + FileSystem cacheFs = resultDir.getFileSystem(queryConf); + cacheFs.mkdirs(resultDir); + + Set cacheFilesToFetch = new HashSet<>(); + boolean succeeded = + copyFetchWorkFilesIntoCacheDirUnderWriteLock(fetchWork, resultDir, cacheFs, safeDirAndSepLen, + queryConf, cacheFilesToFetch); + if (!succeeded) { + removeInvalidStaleFiles(cacheFs, cacheFilesToFetch); + return false; + } + fetchWork.setFilesToFetch(cacheFilesToFetch); + fetchWork.setTblDir(new Path(resultDir, fetchWork.getTblDir().toString().substring(safeDirAndSepLen))); + return true; + } + + private boolean copyFetchWorkFilesIntoCacheDirUnderWriteLock(FetchWork fetchWork, Path resultDir, + FileSystem cacheFs, int safeDirAndSepLen, HiveConf queryConf, Set destFileStatuses) { + final boolean[] succeeded = {true}; + withWriteLock(() -> { + try { + for (FileStatus fs : fetchWork.getFilesToFetch()) { + FileSystem srcFs = fs.getPath().getFileSystem(queryConf); + Path srcFile = fs.getPath(); + Path destFile = new Path(resultDir, + new Path(fs.getPath().toString().substring(safeDirAndSepLen))); + succeeded[0] = FileUtil.copy(srcFs, srcFile, cacheFs, destFile, false, queryConf); + if (!succeeded[0]) { + throw new IOException("File copy failed for " + srcFile + " -> " + destFile); + } + destFileStatuses.add(cacheFs.getFileStatus(destFile)); + } + } catch (IOException e) { + LOG.warn("Failed to write cache entry to {}", resultDir, e); + succeeded[0] = false; + } + }); + return succeeded[0]; + } + public void clear() { - Lock writeLock = rwLock.writeLock(); - try { - writeLock.lock(); + withWriteLock(() -> { LOG.info("Clearing the results cache"); CacheEntry[] allEntries = null; synchronized (lru) { @@ -617,9 +710,7 @@ public void clear() { LOG.error("Error removing cache entry " + entry, err); } } - } finally { - writeLock.unlock(); - } + }); } public long getSize() { @@ -635,17 +726,13 @@ public long getSize() { public void notifyTableChanged(String dbName, String tableName, long updateTime) { LOG.debug("Table changed: {}.{}, at {}", dbName, tableName, updateTime); // Invalidate all cache entries using this table. - List entriesToInvalidate = null; - rwLock.writeLock().lock(); - try { + withWriteLock(() -> { String key = (dbName.toLowerCase() + "." + tableName.toLowerCase()); Set entriesForTable = tableToEntryMap.get(key); if (entriesForTable != null) { // Possible concurrent modification issues if we try to remove cache entries while // traversing the cache structures. Save the entries to remove in a separate list. - entriesToInvalidate = new ArrayList<>(entriesForTable); - } - if (entriesToInvalidate != null) { + List entriesToInvalidate = new ArrayList<>(entriesForTable); for (CacheEntry entry : entriesToInvalidate) { // Ignore updates that occured before this cached query was created. if (entry.getQueryInfo().getQueryTime() <= updateTime) { @@ -653,9 +740,7 @@ public void notifyTableChanged(String dbName, String tableName, long updateTime) } } } - } finally { - rwLock.writeLock().unlock(); - } + }); } private static final int INITIAL_LRU_SIZE = 16; @@ -737,15 +822,12 @@ private boolean entryMatches(LookupInfo lookupInfo, CacheEntry entry, Set { removeFromLookup(entry); lru.remove(entry); // Should the cache size be updated here, or after the result data has actually been deleted? cacheSize -= entry.size; - } finally { - rwLock.writeLock().unlock(); - } + }); } private void removeFromLookup(CacheEntry entry) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 1aec2ac86091..15b20e50bcf8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -1812,7 +1812,7 @@ boolean doPhase1(ASTNode ast, QB qb, Phase1Ctx ctx_1, PlannerContext plannerCtx, setSqlKind(SqlKind.INSERT); case HiveParser.TOK_DESTINATION: - ctx_1.dest = this.ctx.getDestNamePrefix(ast, qb).toString() + ctx_1.nextNum; + ctx_1.dest = ctx.getDestNamePrefix(ast, qb).toString() + ctx_1.nextNum; ctx_1.nextNum++; boolean isTmpFileDest = false; if (ast.getChildCount() > 0 && ast.getChild(0) instanceof ASTNode) { @@ -7460,7 +7460,7 @@ private Operator genConstraintsPlan(String dest, QB qb, Operator input) throws S return input; } - if (updating(dest) && isCBOExecuted() && this.ctx.getOperation() != Context.Operation.MERGE) { + if (updating(dest) && isCBOExecuted() && ctx.getOperation() != Context.Operation.MERGE) { // for UPDATE statements CBO already added and pushed down the constraints return input; } @@ -7500,19 +7500,24 @@ protected Table getTargetTable(QB qb, String dest) throws SemanticException { } private Path getDestinationFilePath(QB qb, final String destinationFile, boolean isMmTable) { + Path defaultPath = new Path(destinationFile); if (this.isResultsCacheEnabled() && this.queryTypeCanUseCache(qb)) { assert (!isMmTable); QueryResultsCache instance = QueryResultsCache.getInstance(); // QueryResultsCache should have been initialized by now if (instance != null) { - Path resultCacheTopDir = instance.getCacheDirPath(); - String dirName = UUID.randomUUID().toString(); - Path resultDir = new Path(resultCacheTopDir, dirName); - this.ctx.setFsResultCacheDirs(resultDir); - return resultDir; + if (!conf.getBoolVar(ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED)) { + Path resultCacheTopDir = instance.getCacheDirPath(); + String dirName = UUID.randomUUID().toString(); + Path resultDir = new Path(resultCacheTopDir, dirName); + ctx.setFsResultCacheDirs(resultDir); + return resultDir; + } else { + ctx.setCacheSafeWriteSourceDir(defaultPath.toString()); + } } } - return new Path(destinationFile); + return defaultPath; } @SuppressWarnings("nls") @@ -12849,7 +12854,7 @@ private void walkASTMarkTABREF(TableMask tableMask, ASTNode ast, Set cte } else { List colNames; List colTypes; - if (this.ctx.isCboSucceeded() && this.columnAccessInfo != null && + if (ctx.isCboSucceeded() && this.columnAccessInfo != null && (colNames = this.columnAccessInfo.getTableToColumnAllAccessMap().get(table.getCompleteName())) != null) { Map colNameToType = table.getAllCols().stream() .collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getType)); @@ -13271,7 +13276,7 @@ void analyzeInternal(ASTNode ast, Supplier pcf) throws SemanticE // 3. Deduce Resultset Schema perfLogger.perfLogBegin(this.getClass().getName(), PerfLogger.DEDUCE_RESULTSET_SCHEMA); - if ((forViewCreation || createVwDesc != null) && !this.ctx.isCboSucceeded()) { + if ((forViewCreation || createVwDesc != null) && !ctx.isCboSucceeded()) { resultSchema = convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRowResolver()); } else { // resultSchema will be null if diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestCachedResults.java b/ql/src/test/org/apache/hadoop/hive/ql/TestCachedResults.java new file mode 100644 index 000000000000..65f97e8cf05f --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestCachedResults.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.testutils.HiveTestEnvSetup; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestCachedResults { + + private static final Logger LOG = LoggerFactory.getLogger(TestCachedResults.class); + private static final long MAX_ALLOWED_CACHE_SIZE = 1_000_000L; + + private static final String Q_WINDOW = + "SELECT t1.id, " + + "SUM(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_sum, " + + "AVG(t2.id) OVER (PARTITION BY t1.id % 10) AS window_avg, " + + "COUNT(t2.id) OVER (PARTITION BY t1.id % 10 ORDER BY t1.id) AS running_cnt " + + "FROM tab t1 " + + "JOIN tab t2 ON t1.id % 10 = t2.id % 10 " + + "WHERE t1.id <= 300"; + + private static final String Q_JOIN = + "SELECT base.id, base.bucket, agg.bucket_avg " + + "FROM ( SELECT id, id % 10 AS bucket FROM tab WHERE id <= 500 ) base " + + "JOIN ( " + + " SELECT id % 10 AS bucket, AVG(id) AS bucket_avg, COUNT(*) AS cnt " + + " FROM tab GROUP BY id % 10 " + + ") agg ON base.bucket = agg.bucket " + + "ORDER BY base.id"; + + private static final String Q_CTE = + "WITH base AS ( " + + " SELECT id, id % 2 AS is_even, id % 5 AS mod5, id % 10 AS mod10 FROM tab " + + "), joined AS ( " + + " SELECT a.id AS a_id, b.id AS b_id, a.mod5, a.mod10, (a.id * b.id) AS product " + + " FROM base a JOIN base b ON a.mod5 = b.mod5 AND a.is_even = b.is_even " + + " WHERE a.id <= 200 AND b.id <= 200 " + + ") " + + "SELECT mod5, mod10, COUNT(*) AS cnt, SUM(product) AS total_product, " + + "MAX(product) AS max_product, MIN(a_id) AS min_a " + + "FROM joined GROUP BY mod5, mod10 ORDER BY mod5, mod10"; + + @ClassRule + public static HiveTestEnvSetup envSetup = new HiveTestEnvSetup(); + + private static HiveConf conf; + + private ScheduledExecutorService scheduler; + private volatile long maxCacheSize; + + @BeforeClass + public static void setUp() throws Exception { + conf = envSetup.getTestCtx().hiveConf; + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED, true); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY, "/tmp/hive/cache"); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_NONTRANSACTIONAL_TABLES_ENABLED, true); + HiveConf.setLongVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE, MAX_ALLOWED_CACHE_SIZE); + LOG.info("max allowed cache size: {}", conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE)); + SessionState.start(conf); + createAndPopulateTables(); + } + + private static void createAndPopulateTables() throws Exception { + IDriver driver = DriverFactory.newDriver(conf); + driver.run("DROP TABLE IF EXISTS tab"); + driver.run("CREATE TABLE tab (id INT)"); + driver.run( + "INSERT INTO TABLE tab SELECT pos + 1 AS id FROM ( " + + "SELECT posexplode(split(space(999), ' ')) AS (pos, val) ) t"); + } + + @AfterClass + public static void afterClass() throws Exception { + DriverFactory.newDriver(conf).run("DROP TABLE IF EXISTS tab"); + } + + @Before + public void beforeEach() { + scheduler = Executors.newSingleThreadScheduledExecutor(); + maxCacheSize = 0; + } + + @After + public void afterEach() throws Exception { + QueryResultsCache.cleanupInstance(); + Path cachePath = new Path(conf.getVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY)); + try { + FileSystem fs = cachePath.getFileSystem(conf); + fs.delete(cachePath, true); + } catch (IOException e) { + LOG.warn("Failed to clean up cache directory: {}", cachePath, e); + } + scheduler.shutdownNow(); + scheduler.awaitTermination(1, TimeUnit.SECONDS); + } + + private void executeQueries(IDriver driver) throws Exception { + driver.run(Q_WINDOW); + driver.run(Q_JOIN); + driver.run(Q_CTE); + } + + @Test + public void testSafeCacheWrite() throws Exception { + runCacheScenario(true); + LOG.info("Maximum cache size in safe mode: {}", maxCacheSize); + Assert.assertFalse("max cache size should stay within limit", maxCacheSize > MAX_ALLOWED_CACHE_SIZE); + } + + @Test + public void testUnsafeCacheWrite() throws Exception { + runCacheScenario(false); + LOG.info("Maximum cache size in non-safe mode: {}", maxCacheSize); + Assert.assertFalse("max cache size should exceed limit when unsafe", maxCacheSize < MAX_ALLOWED_CACHE_SIZE); + } + + private void runCacheScenario(boolean safeCacheWrite) throws Exception { + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_QUERY_RESULTS_SAFE_CACHE_WRITE_ENABLED, safeCacheWrite); + startCacheMonitor(1); + executeQueries(DriverFactory.newDriver(conf)); + stopCacheMonitor(); + Assert.assertNotEquals("cache size should have grown", 0, maxCacheSize); + } + + private void startCacheMonitor(long intervalMs) { + Path cachePath = new Path(conf.getVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY)); + scheduler.scheduleAtFixedRate(() -> { + try { + long size = cachePath.getFileSystem(conf).getContentSummary(cachePath).getLength(); + maxCacheSize = Math.max(maxCacheSize, size); + } catch (IOException e) { + LOG.debug("cache path not readable yet: {}", cachePath, e); + } + }, 0, intervalMs, TimeUnit.MILLISECONDS); + } + + private void stopCacheMonitor() throws InterruptedException { + scheduler.shutdown(); + scheduler.awaitTermination(2, TimeUnit.SECONDS); + } +}