From a095898cdde94a564f0407e93136341c2421f00d Mon Sep 17 00:00:00 2001 From: Neeraj Khatri Date: Sat, 13 Jun 2026 16:40:05 +0530 Subject: [PATCH] HIVE-29281: Make proactive cache eviction work with catalog --- .../iceberg/orc/VectorizedReadUtils.java | 2 +- .../src/protobuf/LlapDaemonProtocol.proto | 3 +- .../api/impl/LlapCacheMetadataSerializer.java | 16 +- .../hive/llap/io/api/impl/LlapIoImpl.java | 8 +- .../llap/io/encoded/OrcEncodedDataReader.java | 2 +- .../io/encoded/SerDeEncodedDataReader.java | 2 +- .../io/metadata/OrcFileEstimateErrors.java | 4 +- .../llap/cache/TestCacheContentsTracker.java | 59 ++-- .../hadoop/hive/llap/cache/TestFileCache.java | 3 +- .../llap/cache/TestLowLevelCacheImpl.java | 8 +- .../hive/llap/cache/TestOrcMetadataCache.java | 19 +- .../llap/cache/TestProactiveEviction.java | 327 ++++++++++++++++++ .../impl/TestLlapCacheMetadataSerializer.java | 2 +- .../hadoop/hive/llap/LlapHiveUtils.java | 20 +- .../hadoop/hive/llap/ProactiveEviction.java | 256 ++++++++------ .../database/drop/DropDatabaseAnalyzer.java | 3 + .../database/drop/DropDatabaseOperation.java | 2 +- .../ql/ddl/table/drop/DropTableOperation.java | 2 +- .../AlterTableDropPartitionOperation.java | 3 +- .../apache/hadoop/hive/ql/exec/Utilities.java | 3 +- .../orc/VectorizedOrcAcidRowBatchReader.java | 2 +- .../vector/VectorizedParquetRecordReader.java | 2 +- .../hadoop/hive/ql/plan/PartitionDesc.java | 7 + .../apache/hadoop/hive/ql/plan/TableDesc.java | 31 +- .../clientpositive/llap/db_ddl_explain.q.out | 1 + .../hadoop/hive/common/io/CacheTag.java | 48 ++- 26 files changed, 640 insertions(+), 195 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java index c05f8bc62ab7..34b99d6b00b5 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java @@ -79,7 +79,7 @@ public static ByteBuffer getSerializedOrcTail(Path path, SyntheticFileId fileId, // Note: Since Hive doesn't know about partition information of Iceberg tables, partitionDesc is only used to // deduct the table (and DB) name here. CacheTag cacheTag = HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE) ? - LlapHiveUtils.getDbAndTableNameForMetrics(path, true, partitionDesc) : null; + LlapHiveUtils.getCacheTag(path, true, partitionDesc) : null; try { // Schema has to be serialized and deserialized as it is passed between different packages of TypeDescription: diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto b/llap-common/src/protobuf/LlapDaemonProtocol.proto index 641958aef8a1..8b15f4392eb5 100644 --- a/llap-common/src/protobuf/LlapDaemonProtocol.proto +++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto @@ -233,10 +233,11 @@ message SetCapacityRequestProto { message SetCapacityResponseProto { } -// Used for proactive eviction request. Must contain one DB name, and optionally table information. +// Used for proactive eviction request. Must contain a DB name, and optionally table information and catalog name. message EvictEntityRequestProto { required string db_name = 1; repeated TableProto table = 2; + optional string catalog_name = 3 [default = "hive"]; } // Used in EvictEntityRequestProto, can be used for non-partitioned and partitioned tables too. diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapCacheMetadataSerializer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapCacheMetadataSerializer.java index dcb90ec197dd..aa3b6fdd0662 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapCacheMetadataSerializer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapCacheMetadataSerializer.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.llap.io.encoded.LlapOrcCacheLoader; import org.apache.hadoop.hive.ql.io.SyntheticFileId; import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hive.common.util.FixedSizedObjectPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -149,7 +150,7 @@ public void loadData(LlapDaemonProtocolProtos.CacheEntryList data) { } private void loadData(LlapDaemonProtocolProtos.CacheEntry ce) throws IOException { - CacheTag cacheTag = decodeCacheTag(ce.getCacheTag()); + CacheTag cacheTag = decodeCacheTag(ce.getCacheTag(), conf); DiskRangeList ranges = decodeRanges(ce.getRangesList()); Object fileKey = decodeFileKey(ce.getFileKey()); try (LlapOrcCacheLoader llr = new LlapOrcCacheLoader(new Path(ce.getFilePath()), fileKey, conf, cache, @@ -167,9 +168,16 @@ private static DiskRangeList decodeRanges(List + tables.forEach((table, partitions) -> + sb.append(catalogdb.catalog()).append(".").append(catalogdb.database()) + .append(".").append(table).append(" ")) + ); sb.append(" Duration: ").append(time).append(" ms"); LOG.debug(sb.toString()); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 75a71560b81c..76f95a3ec33c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -235,7 +235,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff // LlapInputFormat needs to know the file schema to decide if schema evolution is supported. PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(split.getPath(), parts); cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE) - ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, partitionDesc) : null; + ? LlapHiveUtils.getCacheTag(split.getPath(), true, partitionDesc) : null; // 1. Get file metadata from cache, or create the reader and read it. // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that fsSupplier = getFsSupplier(split.getPath(), jobConf); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index 3322136366d0..e32b0584c888 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -225,7 +225,7 @@ public MemoryBuffer create() { PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(split.getPath(), parts); fileKey = determineCacheKey(fs, split, partitionDesc, daemonConf); cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE) - ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, partitionDesc) : null; + ? LlapHiveUtils.getCacheTag(split.getPath(), true, partitionDesc) : null; this.sourceInputFormat = sourceInputFormat; this.sourceSerDe = sourceSerDe; this.reporter = reporter; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java index 02ee55f250e8..1c975b623d0e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper; import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.llap.cache.EvictionDispatcher; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer; import org.apache.hadoop.hive.ql.io.SyntheticFileId; import org.apache.hadoop.hive.ql.io.orc.encoded.IncompleteCb; @@ -140,7 +141,6 @@ public boolean isMarkedForEviction() { @Override public CacheTag getTag() { - // We don't care about these. - return CacheTag.build("OrcEstimates"); + return CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "OrcEstimates"); } } \ No newline at end of file diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java index 15d3f8fd1579..08eb1f45d936 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java @@ -17,11 +17,10 @@ */ package org.apache.hadoop.hive.llap.cache; -import java.util.HashMap; import java.util.LinkedHashMap; -import java.util.Map; import org.apache.hadoop.hive.common.io.CacheTag; +import org.apache.hadoop.hive.metastore.Warehouse; import org.junit.BeforeClass; import org.junit.Test; @@ -127,7 +126,7 @@ public void testCacheTagComparison() { public void testEncodingDecoding() throws Exception { LinkedHashMap partDescs = new LinkedHashMap<>(); partDescs.put("pytha=goras", "a2+b2=c2"); - CacheTag tag = CacheTag.build("math.rules", partDescs); + CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "math.rules", partDescs); CacheTag.SinglePartitionCacheTag stag = ((CacheTag.SinglePartitionCacheTag)tag); assertEquals("pytha=goras=a2+b2=c2", stag.partitionDescToString()); assertEquals(1, stag.getPartitionDescMap().size()); @@ -136,7 +135,7 @@ public void testEncodingDecoding() throws Exception { partDescs.clear(); partDescs.put("mutli=one", "one=/1"); partDescs.put("mutli=two/", "two=2"); - tag = CacheTag.build("math.rules", partDescs); + tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "math.rules", partDescs); CacheTag.MultiPartitionCacheTag mtag = ((CacheTag.MultiPartitionCacheTag)tag); assertEquals("mutli=one=one=/1/mutli=two/=two=2", mtag.partitionDescToString()); assertEquals(2, mtag.getPartitionDescMap().size()); @@ -168,6 +167,10 @@ private static LlapCacheableBuffer createMockBuffer(long size, CacheTag cacheTag } public static CacheTag cacheTagBuilder(String dbAndTable, String... partitions) { + String[] parts = dbAndTable.split("\\."); + if(parts.length < 3) { + dbAndTable = Warehouse.DEFAULT_CATALOG_NAME + "." + dbAndTable; + } if (partitions != null && partitions.length > 0) { LinkedHashMap partDescs = new LinkedHashMap<>(); for (String partition : partitions) { @@ -215,33 +218,33 @@ private static void evictSomeTestBuffers() { private static final String EXPECTED_CACHE_STATE_WHEN_FULL = "\n" + "Cache state: \n" + - "default : 2/2, 2101248/2101248\n" + - "default.testtable : 2/2, 2101248/2101248\n" + - "otherdb : 7/7, 1611106304/1611106304\n" + - "otherdb.testtable : 4/4, 231424/231424\n" + - "otherdb.testtable/p=v1 : 3/3, 100352/100352\n" + - "otherdb.testtable/p=v1/pp=vv1 : 2/2, 34816/34816\n" + - "otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" + - "otherdb.testtable/p=v2 : 1/1, 131072/131072\n" + - "otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" + - "otherdb.testtable2 : 2/2, 537133056/537133056\n" + - "otherdb.testtable2/p=v3 : 2/2, 537133056/537133056\n" + - "otherdb.testtable3 : 1/1, 1073741824/1073741824"; + "hive.default : 2/2, 2101248/2101248\n" + + "hive.default.testtable : 2/2, 2101248/2101248\n" + + "hive.otherdb : 7/7, 1611106304/1611106304\n" + + "hive.otherdb.testtable : 4/4, 231424/231424\n" + + "hive.otherdb.testtable/p=v1 : 3/3, 100352/100352\n" + + "hive.otherdb.testtable/p=v1/pp=vv1 : 2/2, 34816/34816\n" + + "hive.otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" + + "hive.otherdb.testtable/p=v2 : 1/1, 131072/131072\n" + + "hive.otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" + + "hive.otherdb.testtable2 : 2/2, 537133056/537133056\n" + + "hive.otherdb.testtable2/p=v3 : 2/2, 537133056/537133056\n" + + "hive.otherdb.testtable3 : 1/1, 1073741824/1073741824"; private static final String EXPECTED_CACHE_STATE_AFTER_EVICTION = "\n" + "Cache state: \n" + - "default : 0/2, 0/2101248\n" + - "default.testtable : 0/2, 0/2101248\n" + - "otherdb : 5/7, 1074202624/1611106304\n" + - "otherdb.testtable : 3/4, 198656/231424\n" + - "otherdb.testtable/p=v1 : 2/3, 67584/100352\n" + - "otherdb.testtable/p=v1/pp=vv1 : 1/2, 2048/34816\n" + - "otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" + - "otherdb.testtable/p=v2 : 1/1, 131072/131072\n" + - "otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" + - "otherdb.testtable2 : 1/2, 262144/537133056\n" + - "otherdb.testtable2/p=v3 : 1/2, 262144/537133056\n" + - "otherdb.testtable3 : 1/1, 1073741824/1073741824"; + "hive.default : 0/2, 0/2101248\n" + + "hive.default.testtable : 0/2, 0/2101248\n" + + "hive.otherdb : 5/7, 1074202624/1611106304\n" + + "hive.otherdb.testtable : 3/4, 198656/231424\n" + + "hive.otherdb.testtable/p=v1 : 2/3, 67584/100352\n" + + "hive.otherdb.testtable/p=v1/pp=vv1 : 1/2, 2048/34816\n" + + "hive.otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" + + "hive.otherdb.testtable/p=v2 : 1/1, 131072/131072\n" + + "hive.otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" + + "hive.otherdb.testtable2 : 1/2, 262144/537133056\n" + + "hive.otherdb.testtable2/p=v3 : 1/2, 262144/537133056\n" + + "hive.otherdb.testtable3 : 1/1, 1073741824/1073741824"; } diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestFileCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestFileCache.java index 34203ddf5d6f..f5bb1e0d254d 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestFileCache.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestFileCache.java @@ -19,6 +19,7 @@ import com.google.common.base.Function; import org.apache.hadoop.hive.common.io.CacheTag; +import org.apache.hadoop.hive.metastore.Warehouse; import org.junit.Test; import java.util.concurrent.ConcurrentHashMap; @@ -32,7 +33,7 @@ public void testFileCacheMetadata() { ConcurrentHashMap> cache = new ConcurrentHashMap<>(); Object fileKey = 1234L; Function f = a -> new Object(); - CacheTag tag = CacheTag.build("test_table"); + CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "test_db.test_table"); FileCache result = FileCache.getOrAddFileSubCache(cache, fileKey, f, tag); diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java index 4e3c10ed6b2d..764dd9ec319b 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java @@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.io.CacheTag; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory; @@ -309,13 +310,14 @@ private void _testProactiveEvictionMark(boolean isInstantDeallocation) { LlapDataBuffer[] buffs1 = IntStream.range(0, 4).mapToObj(i -> fb()).toArray(LlapDataBuffer[]::new); DiskRange[] drs1 = drs(IntStream.range(1, 5).toArray()); - CacheTag tag1 = CacheTag.build("default.table1"); + CacheTag tag1 = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "default.table1"); LlapDataBuffer[] buffs2 = IntStream.range(0, 41).mapToObj(i -> fb()).toArray(LlapDataBuffer[]::new); DiskRange[] drs2 = drs(IntStream.range(1, 42).toArray()); - CacheTag tag2 = CacheTag.build("default.table2"); + CacheTag tag2 = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "default.table2"); - Predicate predicate = tag -> "default.table1".equals(tag.getTableName()); + Predicate predicate = tag -> + (Warehouse.DEFAULT_CATALOG_NAME + "." + "default.table1").equals(tag.getTableName()); cache.putFileData(fn1, drs1, buffs1, 0, Priority.NORMAL, null, tag1); cache.putFileData(fn2, drs2, buffs2, 0, Priority.NORMAL, null, tag2); diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java index 62a8c7474399..5ec15beccfb9 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.IllegalCacheConfigurationException; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader; import org.apache.hadoop.hive.llap.io.metadata.MetadataCache; @@ -249,7 +250,7 @@ public void testGetOrcTailForPath() throws Exception { Path path = new Path("../data/files/alltypesorc"); Configuration jobConf = new Configuration(); Configuration daemonConf = new Configuration(); - CacheTag tag = CacheTag.build("test-table"); + CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "test-db.test-table"); OrcTail uncached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache, null); jobConf.set(HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname, "true"); OrcTail cached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache, null); @@ -270,7 +271,7 @@ public void testGetOrcTailForPathWithFileId() throws Exception { Path path = new Path("../data/files/alltypesorc"); Configuration jobConf = new Configuration(); Configuration daemonConf = new Configuration(); - CacheTag tag = CacheTag.build("test-table"); + CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "test-db.test-table"); FileSystem fs = FileSystem.get(daemonConf); FileStatus fileStatus = fs.getFileStatus(path); OrcTail uncached = OrcEncodedDataReader.getOrcTailForPath(fileStatus.getPath(), jobConf, tag, daemonConf, cache, new SyntheticFileId(fileStatus)); @@ -294,7 +295,7 @@ public void testGetOrcTailForPathWithFileIdChange() throws Exception { Path path = new Path("../data/files/alltypesorc"); Configuration jobConf = new Configuration(); Configuration daemonConf = new Configuration(); - CacheTag tag = CacheTag.build("test-table"); + CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "test-db.test-table"); OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache, new SyntheticFileId(path, 100, 100)); jobConf.set(HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname, "true"); Exception ex = null; @@ -337,19 +338,23 @@ public void testProactiveEvictionMark() throws Exception { // below is of length 65 ByteBuffer bb2 = ByteBuffer.wrap("-large-meta-data-content-large-meta-data-content-large-meta-data-".getBytes()); - LlapBufferOrBuffers table1Buffers1 = cache.putFileMetadata(fn1, bb, CacheTag.build("default.table1"), isStopped); + LlapBufferOrBuffers table1Buffers1 = cache.putFileMetadata(fn1, bb, + CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "default.table1"), isStopped); assertNotNull(table1Buffers1.getSingleLlapBuffer()); - LlapBufferOrBuffers table1Buffers2 = cache.putFileMetadata(fn2, bb2, CacheTag.build("default.table1"), isStopped); + LlapBufferOrBuffers table1Buffers2 = cache.putFileMetadata(fn2, bb2, + CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "default.table1"), isStopped); assertNotNull(table1Buffers2.getMultipleLlapBuffers()); assertEquals(2, table1Buffers2.getMultipleLlapBuffers().length); // Case for when metadata consists of just 1 buffer (most of the realworld cases) ByteBuffer bb3 = ByteBuffer.wrap("small-meta-data-content-for-otherFile".getBytes()); - LlapBufferOrBuffers table2Buffers1 = cache.putFileMetadata(fn3, bb3, CacheTag.build("default.table2"), isStopped); + LlapBufferOrBuffers table2Buffers1 = cache.putFileMetadata(fn3, bb3, + CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "default.table2"), isStopped); assertNotNull(table2Buffers1.getSingleLlapBuffer()); - Predicate predicate = tag -> "default.table1".equals(tag.getTableName()); + Predicate predicate = tag -> + (Warehouse.DEFAULT_CATALOG_NAME + ".default.table1").equals(tag.getTableName()); // Simulating eviction on some buffers table1Buffers2.getMultipleLlapBuffers()[1].decRef(); diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java index 89c8f6055038..e75237b87650 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestProactiveEviction.java @@ -22,6 +22,8 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -32,7 +34,9 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.ProactiveEviction.Request; import org.apache.hadoop.hive.llap.ProactiveEviction.Request.Builder; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EvictEntityRequestProto; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; +import org.apache.hadoop.hive.metastore.Warehouse; import com.google.common.annotations.VisibleForTesting; @@ -42,6 +46,7 @@ import static org.apache.hadoop.hive.llap.cache.TestCacheContentsTracker.cacheTagBuilder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; /** @@ -106,6 +111,328 @@ private static void assertMatchOnTags(Builder requestBuilder, String expected) { assertEquals(expected, sb.toString()); } + /** + * Verifies that passing an explicit catalog produces correct matching via isTagMatch. + * TEST_TAGS all belong to the default catalog, so requests for a different catalog must not match. + */ + @Test + public void testCatalogAwareCacheTagAndRequestMatching() { + // Default catalog matches as expected. + assertMatchOnTags(Builder.create().addDb("fx"), "111111111111000000"); + assertMatchOnTags(Builder.create().addTable("fx", "futures"), "000001111000000000"); + assertMatchOnTags(Builder.create().addPartitionOfATable("fx", "futures", + buildParts("ccy", "JPY")), "000000110000000000"); + assertMatchOnTags(Builder.create().addTable("fixedincome", "bonds"), "000000000000000110"); + assertMatchOnTags(Builder.create().addPartitionOfATable("fx", "rates", + buildParts("from", "EUR", "to", "HUF")), "000010000000000000"); + + // Non-default catalog: CacheTag now carries catalog info, so none of the TEST_TAGS + // (all default-catalog) should match requests targeting a different catalog. + assertMatchOnTags(Builder.create().addDb("custom_catalog", "fx"), "000000000000000000"); + assertMatchOnTags(Builder.create().addTable("custom_catalog", "equity", "prices"), + "000000000000000000"); + assertMatchOnTags(Builder.create().addPartitionOfATable( + "custom_catalog", "equity", "prices", buildParts("ex", "NYSE")), + "000000000000000000"); + } + + /** + * Verifies that catalog_name is serialized into the proto and correctly restored via fromProtoRequest. + */ + @Test + public void testProtoRoundTripPreservesCatalog() { + // Default catalog is always serialized into the proto. + Request defaultCatRequest = Builder.create().addDb("testdb").build(); + List protos = defaultCatRequest.toProtoRequests(); + assertEquals(1, protos.size()); + EvictEntityRequestProto proto = protos.get(0); + assertEquals(Warehouse.DEFAULT_CATALOG_NAME, proto.getCatalogName()); + assertEquals("testdb", proto.getDbName()); + + Request roundTripped = Builder.create().fromProtoRequest(proto).build(); + assertTrue(roundTripped.hasDatabaseName(Warehouse.DEFAULT_CATALOG_NAME, "testdb")); + + // Custom catalog is also preserved. + Request customCatRequest = Builder.create().addTable("spark_catalog", "salesdb", "orders").build(); + protos = customCatRequest.toProtoRequests(); + assertEquals(1, protos.size()); + proto = protos.get(0); + assertEquals("spark_catalog", proto.getCatalogName()); + assertEquals("salesdb", proto.getDbName()); + + roundTripped = Builder.create().fromProtoRequest(proto).build(); + assertTrue(roundTripped.hasDatabaseName("spark_catalog", "salesdb")); + } + + /** + * Verifies that entities in different catalogs are independently scoped even when they share + * the same DB name. + */ + @Test + public void testMultiCatalogBuilderScoping() { + // Two different catalogs, each with the same DB name but different tables. + Request request = Builder.create() + .addTable("catalog_a", "shared_db", "table_a") + .addTable("catalog_b", "shared_db", "table_b") + .build(); + + assertEquals(2, request.getEntities().size()); + assertTrue(request.getEntities().containsKey(new Request.CatalogDb("catalog_a", "shared_db"))); + assertTrue(request.getEntities().containsKey(new Request.CatalogDb("catalog_b", "shared_db"))); + + // catalog_a only knows about table_a. + assertTrue(request.getEntities().get(new Request.CatalogDb("catalog_a", "shared_db")).containsKey("table_a")); + assertFalse(request.getEntities().get(new Request.CatalogDb("catalog_a", "shared_db")).containsKey("table_b")); + + // catalog_b only knows about table_b. + assertTrue(request.getEntities().get(new Request.CatalogDb("catalog_b", "shared_db")).containsKey("table_b")); + assertFalse(request.getEntities().get(new Request.CatalogDb("catalog_b", "shared_db")).containsKey("table_a")); + } + + /** + * Verifies that multiple tables and partitions added to the same catalog+DB are merged + * into a single catalog entry (no duplication). + */ + @Test + public void testSameCatalogMultipleEntitiesMergedCorrectly() { + Request request = Builder.create() + .addTable("mydb", "table1") + .addTable("mydb", "table2") + .addPartitionOfATable("mydb", "table3", buildParts("dt", "2024-01-01")) + .addPartitionOfATable("mydb", "table3", buildParts("dt", "2024-01-02")) + .build(); + + assertTrue(request.hasDatabaseName(Warehouse.DEFAULT_CATALOG_NAME, "mydb")); + // One catalog, one DB, three tables. + assertEquals(1, request.getEntities().size()); + assertEquals(3, request.getEntities() + .get(new Request.CatalogDb(Warehouse.DEFAULT_CATALOG_NAME, "mydb")).size()); + // table3 has two partition specs. + assertEquals(2, request.getEntities() + .get(new Request.CatalogDb(Warehouse.DEFAULT_CATALOG_NAME, "mydb")).get("table3").size()); + } + + /** + * Verifies that CacheTag catalog information is correctly used to isolate eviction between catalogs. + * A request targeting catalog A must not evict buffers that belong to catalog B, even when the + * DB and table names are identical. + */ + @Test + public void testCatalogIsolationInIsTagMatch() { + CacheTag defaultCatalogTag = cacheTagBuilder("fx.rates", "from=USD", "to=HUF"); + CacheTag otherCatalogTag = cacheTagBuilder("other_catalog.fx.rates", "from=USD", "to=HUF"); + + // Request for the default catalog's "fx" DB matches only default-catalog tags. + Request defaultCatalogRequest = Builder.create() + .fromProtoRequest(Builder.create() + .addDb("fx") + .build().toProtoRequests().get(0)) + .build(); + assertTrue(defaultCatalogRequest.isTagMatch(defaultCatalogTag)); + assertFalse("Must not evict buffers belonging to other_catalog", + defaultCatalogRequest.isTagMatch(otherCatalogTag)); + + // Request for a different catalog matches only tags from that catalog. + Request otherCatalogRequest = Builder.create() + .fromProtoRequest(Builder.create() + .addDb("other_catalog", "fx") + .build().toProtoRequests().get(0)) + .build(); + assertTrue(otherCatalogRequest.isTagMatch(otherCatalogTag)); + assertFalse("Must not evict buffers belonging to the default catalog", + otherCatalogRequest.isTagMatch(defaultCatalogTag)); + + // A request for a DB that doesn't exist in the tags must not match, regardless of catalog. + Request noMatchRequest = Builder.create() + .fromProtoRequest(Builder.create() + .addDb("any_catalog", "nonexistent_db") + .build().toProtoRequests().get(0)) + .build(); + assertFalse(noMatchRequest.isTagMatch(defaultCatalogTag)); + assertFalse(noMatchRequest.isTagMatch(otherCatalogTag)); + } + + /** + * Verifies that Iceberg metadata table cache tags (catalog.db.table.metaTable) are handled by + * isTagMatch and evicted when the base table is dropped. + */ + @Test + public void testIcebergMetaTableTagMatching() { + CacheTag baseTableTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + ".salesdb.orders"); + CacheTag filesMetaTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + ".salesdb.orders.files"); + CacheTag snapshotsMetaTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + ".salesdb.orders.snapshots"); + CacheTag otherTableMetaTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + ".salesdb.other.files"); + + Request dropTableRequest = Builder.create() + .fromProtoRequest(Builder.create() + .addTable("salesdb", "orders") + .build().toProtoRequests().get(0)) + .build(); + + assertTrue(dropTableRequest.isTagMatch(baseTableTag)); + assertTrue(dropTableRequest.isTagMatch(filesMetaTag)); + assertTrue(dropTableRequest.isTagMatch(snapshotsMetaTag)); + assertFalse(dropTableRequest.isTagMatch(otherTableMetaTag)); + + // Drop-partition requests must not evict metadata-table cache via prefix matching. + Request dropPartitionRequest = Builder.create() + .fromProtoRequest(Builder.create() + .addPartitionOfATable("salesdb", "orders", buildParts("dt", "2024-01-01")) + .build().toProtoRequests().get(0)) + .build(); + assertFalse(dropPartitionRequest.isTagMatch(filesMetaTag)); + } + + /** + * Legacy cache tags created before catalog support are 2-part (db.table) with no catalog + * component. They must be treated as belonging to the default catalog. + */ + @Test + public void testTwoPartLegacyTagMatching() { + CacheTag tableTag = CacheTag.build("salesdb.orders"); + CacheTag partitionedTag = CacheTag.build("salesdb.orders", buildParts("dt", "2024-01-01")); + + // Drop database, table and matching partition (all default catalog) evict the legacy tags. + assertTrue(dropDbRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb").isTagMatch(tableTag)); + assertTrue(dropTableRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb", "orders").isTagMatch(tableTag)); + assertTrue(dropPartitionRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb", "orders", + buildParts("dt", "2024-01-01")).isTagMatch(partitionedTag)); + + // A non-matching partition value must not evict. + assertFalse(dropPartitionRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb", "orders", + buildParts("dt", "2024-01-02")).isTagMatch(partitionedTag)); + + // A request scoped to a custom catalog must not evict default-catalog legacy tags. + assertFalse(dropDbRequest("custom_catalog", "salesdb").isTagMatch(tableTag)); + assertFalse(dropTableRequest("custom_catalog", "salesdb", "orders").isTagMatch(tableTag)); + } + + /** + * Iceberg metadata table tags (catalog.db.table.metaTable) on a non-default catalog must be + * evicted only by requests scoped to that same catalog. + */ + @Test + public void testNonDefaultCatalogIcebergMetaTableMatching() { + CacheTag filesMetaTag = CacheTag.build("spark_catalog.salesdb.orders.files"); + CacheTag snapshotsMetaTag = CacheTag.build("spark_catalog.salesdb.orders.snapshots"); + + // Drop table on the custom catalog evicts its metadata-table cache. + Request dropTable = dropTableRequest("spark_catalog", "salesdb", "orders"); + assertTrue(dropTable.isTagMatch(filesMetaTag)); + assertTrue(dropTable.isTagMatch(snapshotsMetaTag)); + + // Drop database on the custom catalog evicts metadata-table cache too. + Request dropDb = dropDbRequest("spark_catalog", "salesdb"); + assertTrue(dropDb.isTagMatch(filesMetaTag)); + assertTrue(dropDb.isTagMatch(snapshotsMetaTag)); + + // The same logical name in the default catalog must not be evicted. + assertFalse(dropTableRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb", "orders") + .isTagMatch(filesMetaTag)); + assertFalse(dropDbRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb").isTagMatch(filesMetaTag)); + } + + /** + * Dropping a database must evict both base-table and Iceberg metadata-table cache entries + * belonging to that database. + */ + @Test + public void testDropDatabaseEvictsMetaTableTags() { + CacheTag baseTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + ".salesdb.orders"); + CacheTag filesMetaTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + ".salesdb.orders.files"); + + Request dropDb = dropDbRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb"); + assertTrue(dropDb.isTagMatch(baseTag)); + assertTrue(dropDb.isTagMatch(filesMetaTag)); + + // A database with a different name must not match. + assertFalse(dropDbRequest(Warehouse.DEFAULT_CATALOG_NAME, "otherdb").isTagMatch(filesMetaTag)); + } + + /** + * Prefix matching used for Iceberg metadata tables must not produce false positives for tables + * that merely share a name prefix with the dropped table. + */ + @Test + public void testDropTablePrefixMatchingAvoidsFalsePositives() { + CacheTag siblingTableTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + ".salesdb.orders_archive"); + CacheTag siblingMetaTag = + CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + ".salesdb.orders_archive.files"); + + Request dropTable = dropTableRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb", "orders"); + assertFalse(dropTable.isTagMatch(siblingTableTag)); + assertFalse(dropTable.isTagMatch(siblingMetaTag)); + } + + /** + * With catalog-aware tags a 3-part name is always interpreted as catalog.db.table, never as a + * default-catalog db.table.metaTable. + */ + @Test + public void testThreePartTagInterpretedAsCatalogQualified() { + CacheTag tag = CacheTag.build("custom_catalog.salesdb.orders"); + + // Matched when the request targets the same catalog + db + table. + assertTrue(dropTableRequest("custom_catalog", "salesdb", "orders").isTagMatch(tag)); + + // Not matched when "custom_catalog" is mistaken for a database in the default catalog. + assertFalse(dropTableRequest(Warehouse.DEFAULT_CATALOG_NAME, "custom_catalog", "salesdb") + .isTagMatch(tag)); + } + + /** + * Snapshot-ref tags (branch_/tag_) must be evicted when their base table is dropped, both in the + * 4-part catalog-qualified form and in the legacy 3-part db.table.ref form. + */ + @Test + public void testSnapshotRefTagMatching() { + CacheTag branchTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + ".salesdb.orders.branch_main"); + CacheTag tagRefTag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + ".salesdb.orders.tag_v1"); + // Legacy 3-part snapshot ref without catalog prefix -> default-catalog db.table.ref. + CacheTag legacyBranchTag = CacheTag.build("salesdb.orders.branch_main"); + + Request dropTable = dropTableRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb", "orders"); + assertTrue(dropTable.isTagMatch(branchTag)); + assertTrue(dropTable.isTagMatch(tagRefTag)); + assertTrue(dropTable.isTagMatch(legacyBranchTag)); + } + + /** + * Cache tag names must have between 2 and 4 dot-separated components; anything else is rejected. + */ + @Test + public void testInvalidCacheTagLengthThrows() { + Request request = dropDbRequest(Warehouse.DEFAULT_CATALOG_NAME, "salesdb"); + + // Single-component tag is not a valid db-qualified name. + assertThrows(UnsupportedOperationException.class, + () -> request.isTagMatch(CacheTag.build("orders"))); + + // Five-component tag exceeds the supported catalog.db.table.metaTable structure. + assertThrows(UnsupportedOperationException.class, + () -> request.isTagMatch(CacheTag.build("a.b.c.d.e"))); + } + + private static Request dropDbRequest(String catalog, String db) { + return roundTrip(Builder.create().addDb(catalog, db)); + } + + private static Request dropTableRequest(String catalog, String db, String table) { + return roundTrip(Builder.create().addTable(catalog, db, table)); + } + + private static Request dropPartitionRequest(String catalog, String db, String table, + Map partSpec) { + return roundTrip(Builder.create().addPartitionOfATable(catalog, db, table, partSpec)); + } + + /** + * Marshals the request to proto and back, mirroring how the LLAP daemon receives requests. + */ + private static Request roundTrip(Builder requestBuilder) { + return Builder.create().fromProtoRequest(requestBuilder.build().toProtoRequests().get(0)).build(); + } + @Test public void testProactiveSweep() throws Exception { closeSweeperExecutorForTest(); diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/TestLlapCacheMetadataSerializer.java b/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/TestLlapCacheMetadataSerializer.java index 01581dd94a71..6082f57d0657 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/TestLlapCacheMetadataSerializer.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/io/api/impl/TestLlapCacheMetadataSerializer.java @@ -107,7 +107,7 @@ private LlapDaemonProtocolProtos.CacheEntryList createDummyMetadata() throws IOE LlapDaemonProtocolProtos.CacheEntryRange re2 = LlapDaemonProtocolProtos.CacheEntryRange.newBuilder().setStart(14L).setEnd(38L).build(); LlapDaemonProtocolProtos.CacheTag ct = - LlapDaemonProtocolProtos.CacheTag.newBuilder().setTableName("dummyTable").build(); + LlapDaemonProtocolProtos.CacheTag.newBuilder().setTableName("hive.default.dummyTable").build(); Path path = new Path(TEST_PATH); SyntheticFileId syntheticFileId = fileId(path); diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java index ba62b8d89c22..76034ca69a57 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.io.CacheTag; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.io.HdfsUtils; @@ -74,18 +75,21 @@ public static PartitionDesc partitionDescForPath(Path path, Map>>> entities; + public record PartitionSpec(Map spec) {} + public record CatalogDb(String catalog, String database){} + private final Map>> entities; - private Request(Map>>> entities) { + private Request(Map>> entities) { this.entities = entities; } - public Map>>> getEntities() { + public Map>> getEntities() { return entities; } @@ -172,15 +172,8 @@ public boolean isEmpty() { return entities.isEmpty(); } - /** - * Request often times only contains tables/partitions of 1 DB only. - * @return the single DB name, null if the count of DBs present is not exactly 1. - */ - public String getSingleDbName() { - if (entities.size() == 1) { - return entities.keySet().stream().findFirst().get(); - } - return null; + public boolean hasDatabaseName(String catalogName, String dbName) { + return entities.containsKey(new CatalogDb(catalogName, dbName)); } /** @@ -188,41 +181,39 @@ public String getSingleDbName() { * @return list of request instances ready to be sent over protobuf. */ public List toProtoRequests() { - - List protoRequests = new LinkedList<>(); - - for (Map.Entry>>> dbEntry : entities.entrySet()) { - String dbName = dbEntry.getKey(); - Map>> tables = dbEntry.getValue(); - - LlapDaemonProtocolProtos.EvictEntityRequestProto.Builder requestBuilder = - LlapDaemonProtocolProtos.EvictEntityRequestProto.newBuilder(); - LlapDaemonProtocolProtos.TableProto.Builder tableBuilder = null; - - requestBuilder.setDbName(dbName.toLowerCase()); - for (Map.Entry>> tableEntry : tables.entrySet()) { - String tableName = tableEntry.getKey(); - tableBuilder = LlapDaemonProtocolProtos.TableProto.newBuilder(); - tableBuilder.setTableName(tableName.toLowerCase()); - - Set> partitions = tableEntry.getValue(); - Set partitionKeys = null; - - for (Map partitionSpec : partitions) { - if (partitionKeys == null) { + return entities.entrySet().stream() + .map(entry -> { + CatalogDb catalogDb = entry.getKey(); + Map> tables = entry.getValue(); + LlapDaemonProtocolProtos.EvictEntityRequestProto.Builder requestBuilder = + LlapDaemonProtocolProtos.EvictEntityRequestProto.newBuilder(); + + requestBuilder.setCatalogName(catalogDb.catalog().toLowerCase()); + requestBuilder.setDbName(catalogDb.database().toLowerCase()); + + tables.forEach((tableName, partitions) -> { + LlapDaemonProtocolProtos.TableProto.Builder tableBuilder = + LlapDaemonProtocolProtos.TableProto.newBuilder(); + + tableBuilder.setTableName(tableName.toLowerCase()); + + Set partitionKeys = null; + + for (PartitionSpec partitionSpec : partitions) { + if (partitionKeys == null) { + partitionKeys = new LinkedHashSet<>(partitionSpec.spec().keySet()); + tableBuilder.addAllPartKey(partitionKeys); + } + for (String partKey : tableBuilder.getPartKeyList()) { + tableBuilder.addPartVal(partitionSpec.spec().get(partKey)); + } + } // For a given table the set of partition columns (keys) should not change. - partitionKeys = new LinkedHashSet<>(partitionSpec.keySet()); - tableBuilder.addAllPartKey(partitionKeys); - } - for (String partKey : tableBuilder.getPartKeyList()) { - tableBuilder.addPartVal(partitionSpec.get(partKey)); - } - } - requestBuilder.addTable(tableBuilder.build()); - } - protoRequests.add(requestBuilder.build()); - } - return protoRequests; + requestBuilder.addTable(tableBuilder.build()); + }); + return requestBuilder.build(); + }) + .toList(); } /** @@ -233,19 +224,19 @@ public List toProtoRequests() * @return true if cacheTag matches and the related buffer is eligible for proactive eviction, false otherwise. */ public boolean isTagMatch(CacheTag cacheTag) { - String db = getSingleDbName(); - if (db == null) { - // Number of DBs in the request was not exactly 1. - throw new UnsupportedOperationException("Predicate only implemented for 1 DB case."); - } - TableName tagTableName = TableName.fromString(cacheTag.getTableName(), null, null); - - // Check against DB. - if (!db.equals(tagTableName.getDb())) { + // Parse the tag once and derive catalog/db from the parsed result, so that 2-part + // (db.table), 3-part (catalog.db.table), 4-part (catalog.db.table.metaTable) and + // snapshot-ref names are all interpreted consistently. + TableName tagTableName = parseCacheTagTableName(cacheTag.getTableName()); + String catalog = tagTableName.getCat(); + String db = tagTableName.getDb(); + + // Check that the tag's catalog and database is present in the eviction request. + if (!entities.containsKey(new CatalogDb(catalog, db))) { return false; } - Map>> tables = entities.get(db); + Map> tables = entities.getOrDefault(new CatalogDb(catalog, db), Map.of()); // If true, must be a drop DB event and this cacheTag matches. if (tables.isEmpty()) { @@ -257,31 +248,60 @@ public boolean isTagMatch(CacheTag cacheTag) { tagPartDescMap = ((CacheTag.PartitionCacheTag) cacheTag).getPartitionDescMap(); } + String tagDbTable = tagTableName.getNotEmptyDbTable(); // Check against table name. - for (String tableAndDbName : tables.keySet()) { - if (tableAndDbName.equals(tagTableName.getNotEmptyDbTable())) { - - Set> partDescs = tables.get(tableAndDbName); - - // If true, must be a drop table event, and this cacheTag matches. - if (partDescs == null) { - return true; + for (Map.Entry> tableEntry : tables.entrySet()) { + String tableAndDbName = tableEntry.getKey(); + Set partDescs = tableEntry.getValue(); + if (!tableAndDbName.equals(tagDbTable)) { + // Drop-table requests use db.table; Iceberg metadata tables are tagged db.table.metaTable. + if (partDescs != null || !tagDbTable.startsWith(tableAndDbName + ".")) { + continue; } + return true; + } - // Check against partition keys and values and alas for drop partition event. - if (!(cacheTag instanceof CacheTag.PartitionCacheTag)) { - throw new IllegalArgumentException("CacheTag has no partition information, while trying" + - " to evict due to (and based on) a drop partition DDL statement.."); - } + // If true, must be a drop table event, and this cacheTag matches. + if (partDescs == null) { + return true; + } - if (partDescs.contains(tagPartDescMap)) { - return true; - } + // Check against partition keys and values and alas for drop partition event. + if (!(cacheTag instanceof CacheTag.PartitionCacheTag)) { + throw new IllegalArgumentException("CacheTag has no partition information, while trying" + + " to evict due to (and based on) a drop partition DDL statement.."); + } + + if (partDescs.contains(new PartitionSpec(tagPartDescMap))) { + return true; } } return false; } + /** + * Parses a cache-tag table name into a {@link TableName}. Supports legacy {@code db.table}, + * catalog-qualified {@code catalog.db.table}, and Iceberg metadata tables + * {@code catalog.db.table.metaTable}. + */ + private static TableName parseCacheTagTableName(String fullTableName) { + String[] names = fullTableName.split("\\."); + switch (names.length) { + case 2: + return new TableName(Warehouse.DEFAULT_CATALOG_NAME, names[0], names[1], null); + case 3: + if (TableName.SNAPSHOT_REF.matcher(names[2]).matches()) { + return new TableName(Warehouse.DEFAULT_CATALOG_NAME, names[0], names[1], names[2]); + } + return new TableName(names[0], names[1], names[2], null); + case 4: + return new TableName(names[0], names[1], names[2], names[3]); + default: + throw new UnsupportedOperationException( + "Cache tag table name must have 2-4 dot-separated components: " + fullTableName); + } + } + @Override public String toString() { return "Request { entities = " + entities + " }"; @@ -292,7 +312,7 @@ public String toString() { */ public static final class Builder { - private final Map>>> entities; + private final Map>> entities; private Builder() { this.entities = new HashMap<>(); @@ -302,45 +322,64 @@ public static Builder create() { return new Builder(); } - public Builder addPartitionOfATable(String db, String tableName, LinkedHashMap partSpec) { - ensureDb(db); - ensureTable(db, tableName); - entities.get(db).get(tableName).add(partSpec); + /** + * Add a partition of a table scoped to the given catalog. + */ + public Builder addPartitionOfATable(String catalog, String db, String tableName, + Map partSpec) { + ensureTable(catalog, db, tableName); + entities.get(new CatalogDb(catalog, db)).get(tableName).add(new PartitionSpec(partSpec)); return this; } + /** + * Add a partition of a table scoped to the default catalog. + */ + public Builder addPartitionOfATable(String db, String tableName, Map partSpec) { + return addPartitionOfATable(Warehouse.DEFAULT_CATALOG_NAME, db, tableName, partSpec); + } + + /** + * Add a database scoped to the given catalog. + */ + public Builder addDb(String catalog, String db) { + ensureDb(catalog, db); + return this; + } + + /** + * Add a database scoped to the default catalog. + */ public Builder addDb(String db) { - ensureDb(db); + return addDb(Warehouse.DEFAULT_CATALOG_NAME, db); + } + + /** + * Add a table scoped to the given catalog. + */ + public Builder addTable(String catalog, String db, String table) { + ensureTable(catalog, db, table); return this; } + /** + * Add a table scoped to the default catalog. + */ public Builder addTable(String db, String table) { - ensureDb(db); - ensureTable(db, table); - return this; + return addTable(Warehouse.DEFAULT_CATALOG_NAME, db, table); } public Request build() { return new Request(entities); } - private void ensureDb(String dbName) { - Map>> tables = entities.get(dbName); - if (tables == null) { - tables = new HashMap<>(); - entities.put(dbName, tables); - } + private void ensureDb(String catalogName, String dbName) { + entities.computeIfAbsent(new CatalogDb(catalogName, dbName), k -> new HashMap<>()); } - private void ensureTable(String dbName, String tableName) { - ensureDb(dbName); - Map>> tables = entities.get(dbName); - - Set> partitions = tables.get(tableName); - if (partitions == null) { - partitions = new HashSet<>(); - tables.put(tableName, partitions); - } + private void ensureTable(String catalogName, String dbName, String tableName) { + ensureDb(catalogName, dbName); + entities.get(new CatalogDb(catalogName, dbName)).computeIfAbsent(tableName, k -> new HashSet<>()); } /** @@ -350,9 +389,10 @@ private void ensureTable(String dbName, String tableName) { */ public Builder fromProtoRequest(LlapDaemonProtocolProtos.EvictEntityRequestProto protoRequest) { entities.clear(); + String catalogName = protoRequest.getCatalogName().toLowerCase(); String dbName = protoRequest.getDbName().toLowerCase(); - Map>> entitiesInDb = new HashMap<>(); + Map> entitiesInDb = new HashMap<>(); List tables = protoRequest.getTableList(); if (tables != null && !tables.isEmpty()) { @@ -364,8 +404,8 @@ public Builder fromProtoRequest(LlapDaemonProtocolProtos.EvictEntityRequestProto entitiesInDb.put(dbAndTableName, null); continue; } - Set> partitions = new HashSet<>(); - LinkedHashMap partDesc = new LinkedHashMap<>(); + Set partitions = new HashSet<>(); + Map partDesc = new HashMap<>(); for (int valIx = 0; valIx < table.getPartValCount(); ++valIx) { int keyIx = valIx % table.getPartKeyCount(); @@ -373,15 +413,15 @@ public Builder fromProtoRequest(LlapDaemonProtocolProtos.EvictEntityRequestProto partDesc.put(table.getPartKey(keyIx).toLowerCase(), table.getPartVal(valIx)); if (keyIx == table.getPartKeyCount() - 1) { - partitions.add(partDesc); - partDesc = new LinkedHashMap<>(); + partitions.add(new PartitionSpec(partDesc)); + partDesc = new HashMap<>(); } } entitiesInDb.put(dbAndTableName, partitions); } } - entities.put(dbName, entitiesInDb); + entities.put(new CatalogDb(catalogName, dbName), entitiesInDb); return this; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseAnalyzer.java index 11f8917334b1..256b3456ff79 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseAnalyzer.java @@ -67,6 +67,9 @@ public void analyzeInternal(ASTNode root) throws SemanticException { if (database == null) { return; } + if (catalogName == null) { + catalogName = database.getCatalogName(); + } // if cascade=true, then we need to authorize the drop table action as well, and add the tables to the outputs boolean isDbLevelLock = true; if (cascade) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseOperation.java index b544b7b4a24b..fe4d153f1d28 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/database/drop/DropDatabaseOperation.java @@ -53,7 +53,7 @@ public int execute() throws HiveException { if (LlapHiveUtils.isLlapMode(context.getConf())) { ProactiveEviction.Request.Builder llapEvictRequestBuilder = ProactiveEviction.Request.Builder.create(); - llapEvictRequestBuilder.addDb(dbName); // TODO catalog. add catalog for the cache. Depend on HIVE-29281 + llapEvictRequestBuilder.addDb(catName, dbName); ProactiveEviction.evict(context.getConf(), llapEvictRequestBuilder.build()); } // Unregister the functions as well diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableOperation.java index b253ec5df5ff..7e6c93ebdcad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableOperation.java @@ -115,7 +115,7 @@ public int execute() throws HiveException { if (LlapHiveUtils.isLlapMode(context.getConf())) { TableName tableName = HiveTableName.of(table); ProactiveEviction.Request.Builder llapEvictRequestBuilder = ProactiveEviction.Request.Builder.create(); - llapEvictRequestBuilder.addTable(tableName.getDb(), tableName.getTable()); + llapEvictRequestBuilder.addTable(table.getCatName(), tableName.getDb(), tableName.getTable()); ProactiveEviction.evict(context.getConf(), llapEvictRequestBuilder.build()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java index a0eda1ab4eff..89ad090efb57 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java @@ -147,7 +147,8 @@ private void dropPartitions(boolean isRepl) throws HiveException { DDLUtils.addIfAbsentByName(new WriteEntity(partition, WriteEntity.WriteType.DDL_NO_LOCK), context); if (llapEvictRequestBuilder != null) { - llapEvictRequestBuilder.addPartitionOfATable(tableName.getDb(), tableName.getTable(), partition.getSpec()); + llapEvictRequestBuilder.addPartitionOfATable( + tableName.getCat(), tableName.getDb(), tableName.getTable(), partition.getSpec()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index a29e532b113b..f58fafc85566 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -770,8 +770,7 @@ public static TableDesc getTableDesc(Table tbl) { if (tbl.getSnapshotRef() != null) { props.put(SNAPSHOT_REF, tbl.getSnapshotRef()); } - return (new TableDesc(tbl.getInputFormatClass(), tbl - .getOutputFormatClass(), props)); + return new TableDesc(tbl.getInputFormatClass(), tbl.getOutputFormatClass(), props, tbl.getCatName()); } // column names and column types are all delimited by comma diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index d46eded36e61..9248897bf745 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -275,7 +275,7 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporte } PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(orcSplit.getPath(), mapWork.getPathToPartitionInfo()); - cacheTag = LlapHiveUtils.getDbAndTableNameForMetrics(orcSplit.getPath(), true, partitionDesc); + cacheTag = LlapHiveUtils.getCacheTag(orcSplit.getPath(), true, partitionDesc); } else { cacheTag = null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index fbffab44a64c..c346331325df 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -343,7 +343,7 @@ public static CacheTag cacheTagOfParquetFile(Path path, Configuration cacheConf, return null; } PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(path, mapWork.getPathToPartitionInfo()); - return LlapHiveUtils.getDbAndTableNameForMetrics(path, true, partitionDesc); + return LlapHiveUtils.getCacheTag(path, true, partitionDesc); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java index 0dcfe72d7f5b..25f8afe6d6c7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java @@ -258,6 +258,13 @@ public String getTableName() { return tableName; } + /** + * Returns the catalog name for this partition's table. + */ + public String getCatalogName() { + return tableDesc.getCatalogName(); + } + @Explain(displayName = "input format", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public String getInputFileFormatClassName() { return getInputFileFormatClass().getName(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java index 58ce207c0c6c..b0e6feff7ec1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StringInternUtils; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; @@ -52,6 +53,8 @@ public class TableDesc implements Serializable, Cloneable { public static final String SECRET_PREFIX = "TABLE_SECRET"; public static final String SECRET_DELIMIT = "#"; + private String catalogName; + public TableDesc() { } @@ -59,14 +62,31 @@ public TableDesc() { * @param inputFormatClass * @param outputFormatClass * @param properties must contain serde class name associate with this table. + * @param catalogName the catalog this table belongs to; stored as a dedicated field so it does + * not appear in EXPLAIN output. Pass {@code null} for internal/intermediate + * descriptors that are not backed by a real user table; {@code null} will be + * normalized to {@link Warehouse#DEFAULT_CATALOG_NAME}. */ public TableDesc( final Class inputFormatClass, - final Class outputFormatClass, final Properties properties) { + final Class outputFormatClass, final Properties properties, + final String catalogName) { this.inputFileFormatClass = inputFormatClass; outputFileFormatClass = HiveFileFormatUtils .getOutputFormatSubstitute(outputFormatClass); setProperties(properties); + this.catalogName = catalogName == null ? Warehouse.DEFAULT_CATALOG_NAME : catalogName; + } + + /** + * @param inputFormatClass + * @param outputFormatClass + * @param properties must contain serde class name associate with this table. + */ + public TableDesc( + final Class inputFormatClass, + final Class outputFormatClass, final Properties properties) { + this(inputFormatClass, outputFormatClass, properties, null); } public Class getSerDeClass() { @@ -199,6 +219,14 @@ public int getBucketingVersion() { properties.getProperty(hive_metastoreConstants.TABLE_BUCKETING_VERSION)); } + public String getCatalogName() { + return catalogName; + } + + public void setCatalogName(String catalogName) { + this.catalogName = catalogName == null ? Warehouse.DEFAULT_CATALOG_NAME : catalogName; + } + @Override public Object clone() { TableDesc ret = new TableDesc(); @@ -215,6 +243,7 @@ public Object clone() { if (jobProperties != null) { ret.jobProperties = new LinkedHashMap(jobProperties); } + ret.catalogName = catalogName == null ? Warehouse.DEFAULT_CATALOG_NAME : catalogName; return ret; } diff --git a/ql/src/test/results/clientpositive/llap/db_ddl_explain.q.out b/ql/src/test/results/clientpositive/llap/db_ddl_explain.q.out index 257d69751fde..3dc0c6bbc39a 100644 --- a/ql/src/test/results/clientpositive/llap/db_ddl_explain.q.out +++ b/ql/src/test/results/clientpositive/llap/db_ddl_explain.q.out @@ -159,6 +159,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Drop Database + catalog: hive database: d if exists: false diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java index 0f5d7b915168..66ef19e1d35f 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java +++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java @@ -30,17 +30,19 @@ /** * Used for identifying the related object of the buffer stored in cache. * Comes in 3 flavours to optimize for minimal memory overhead: - * - TableCacheTag for tables without partitions: DB/table level - * - SinglePartitionCacheTag for tables with 1 partition level: DB/table/1st_partition + * - TableCacheTag for tables without partitions: catalog.DB.table level + * - SinglePartitionCacheTag for tables with 1 partition level: catalog.DB.table/1st_partition * - MultiPartitionCacheTag for tables with > 1 partition levels: - * DB/table/1st_partition/.../nth_partition . + * catalog.DB.table/1st_partition/.../nth_partition . */ public abstract class CacheTag implements Comparable { private static final String ENCODING = "UTF-8"; /** - * Prepended by DB name and '.' . + * Catalog-qualified, DB-qualified table name. Stored as {@code catalog.db.table}, e.g. + * {@code hive.salesdb.orders}. For DB-level parent tags produced by + * {@link #createParentCacheTag} this is just {@code catalog.db}. */ protected final String tableName; @@ -48,6 +50,9 @@ private CacheTag(String tableName) { this.tableName = tableName.intern(); } + /** + * Returns the full catalog-qualified, DB-qualified name, i.e. {@code catalog.db.table}. + */ public String getTableName() { return tableName; } @@ -71,8 +76,7 @@ public boolean equals(Object obj) { @Override public int hashCode() { - int res = tableName.hashCode(); - return res; + return tableName.hashCode(); } public static final CacheTag build(String tableName) { @@ -82,8 +86,16 @@ public static final CacheTag build(String tableName) { return new TableCacheTag(tableName); } - public static final CacheTag build(String tableName, LinkedHashMap partDescMap) { - if (StringUtils.isEmpty(tableName) || partDescMap == null || partDescMap.isEmpty()) { + public static final CacheTag build(String catalogName, String dbAndTableName) { + return build(catalogName + "." + dbAndTableName); + } + + public static final CacheTag build(String catalogName, String dbAndTableName, LinkedHashMap partDescMap) { + return build(catalogName + "." + dbAndTableName, partDescMap); + } + + public static final CacheTag build(String fullTableName, LinkedHashMap partDescMap) { + if (StringUtils.isEmpty(fullTableName) || partDescMap == null || partDescMap.isEmpty()) { throw new IllegalArgumentException(); } @@ -95,10 +107,10 @@ public static final CacheTag build(String tableName, LinkedHashMap1 - return new MultiPartitionCacheTag(tableName, partDescs); + return new MultiPartitionCacheTag(fullTableName, partDescs); } } @@ -118,7 +130,10 @@ public static final CacheTag build(String tableName, List partDescs) { /** * Constructs a (fake) parent CacheTag instance by walking back in the hierarchy i.e. stepping * from inner to outer partition levels, then producing a CacheTag for the table and finally - * the DB. + * the DB. The catalog prefix is preserved throughout the walk. + * + *

The walk terminates at the DB level: a tag whose {@code tableName} contains exactly one + * dot (i.e. {@code catalog.db}) has no parent, so {@code null} is returned. */ public static final CacheTag createParentCacheTag(CacheTag tag) { if (tag == null) { @@ -134,20 +149,18 @@ public static final CacheTag createParentCacheTag(CacheTag tag) { } return new MultiPartitionCacheTag(multiPartitionCacheTag.tableName, subList); } else { - return new SinglePartitionCacheTag(multiPartitionCacheTag.tableName, - multiPartitionCacheTag.partitionDesc[0]); + return new SinglePartitionCacheTag( + multiPartitionCacheTag.tableName, multiPartitionCacheTag.partitionDesc[0]); } } if (tag instanceof SinglePartitionCacheTag) { return new TableCacheTag(tag.tableName); } else { - // DB level - int ix = tag.tableName.indexOf("."); - if (ix <= 0) { + if (tag.tableName.split("\\.", 3).length < 3) { return null; } - return new TableCacheTag(tag.tableName.substring(0, ix)); + return new TableCacheTag(tag.tableName.substring(0, tag.tableName.lastIndexOf('.'))); } } @@ -381,4 +394,3 @@ private static String[] decodePartDesc(String partDesc) { } } -