diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java b/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java index 39dd94965835..dc8424de8285 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java @@ -53,6 +53,12 @@ public class MetricsStep { @JsonProperty("miss_cache_millisecond") protected long missCacheMillisecond; + @JsonProperty("parquet_metadata_cache_hits") + protected long parquetMetadataCacheHits; + + @JsonProperty("parquet_metadata_cache_misses") + protected long parquetMetadataCacheMisses; + public String getName() { return name; } @@ -148,4 +154,20 @@ public long getMissCacheMillisecond() { public void setMissCacheMillisecond(long missCacheMillisecond) { this.missCacheMillisecond = missCacheMillisecond; } + + public long getParquetMetadataCacheHits() { + return parquetMetadataCacheHits; + } + + public void setParquetMetadataCacheHits(long parquetMetadataCacheHits) { + this.parquetMetadataCacheHits = parquetMetadataCacheHits; + } + + public long getParquetMetadataCacheMisses() { + return parquetMetadataCacheMisses; + } + + public void setParquetMetadataCacheMisses(long parquetMetadataCacheMisses) { + this.parquetMetadataCacheMisses = parquetMetadataCacheMisses; + } } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 7268ea0e6ffd..ba540914fbc3 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -54,6 +54,7 @@ class CHBackend extends SubstraitBackend { override def name(): String = CHConfig.BACKEND_NAME override def info(): Map[String, String] = { Map( + "ch_org" -> CH_ORG, "ch_branch" -> CH_BRANCH, "ch_revision" -> CH_COMMIT ) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala index a437665e0081..33368185f253 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala @@ -168,7 +168,13 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { "Time reading from filesystem cache"), "missCacheMillisecond" -> SQLMetrics.createTimingMetric( sparkContext, - "Time reading from filesystem cache source (from remote filesystem, etc)") + "Time reading from filesystem cache source (from remote filesystem, etc)"), + "parquetMetadataCacheHits" -> SQLMetrics.createMetric( + sparkContext, + "Number of times parquet metadata has been found in the cache"), + "parquetMetadataCacheMisses" -> SQLMetrics.createMetric( + sparkContext, + "Number of times parquet metadata has not been found in the cache") ) override def genFileSourceScanTransformerMetricsUpdater( diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala index 437194873bdc..7af660201370 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala @@ -44,6 +44,8 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric val readMissBytes: SQLMetric = metrics("readMissBytes") val readCacheMillisecond: SQLMetric = metrics("readCacheMillisecond") val missCacheMillisecond: SQLMetric = metrics("missCacheMillisecond") + val parquetMetadataCacheHits: SQLMetric = metrics("parquetMetadataCacheHits") + val parquetMetadataCacheMisses: SQLMetric = metrics("parquetMetadataCacheMisses") override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = { // inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value) @@ -71,6 +73,8 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric readMissBytes += step.readMissBytes readCacheMillisecond += step.readCacheMillisecond missCacheMillisecond += step.missCacheMillisecond + parquetMetadataCacheHits += step.parquetMetadataCacheHits + parquetMetadataCacheMisses += step.parquetMetadataCacheMisses }) MetricsUtil.updateExtraTimeMetric( diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullNativeParquetSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullNativeParquetSuite.scala index 805afabc5c35..633ede1a730f 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullNativeParquetSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullNativeParquetSuite.scala @@ -16,7 +16,6 @@ */ package org.apache.gluten.execution.tpch -import org.apache.gluten.backendsapi.clickhouse.CHConfig import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution._ @@ -34,9 +33,6 @@ class GlutenClickHouseTPCHSaltNullNativeParquetSuite .set("spark.sql.shuffle.partitions", "5") .set("spark.sql.autoBroadcastJoinThreshold", "10MB") .set(GlutenConfig.GLUTEN_SUPPORTED_SCALA_UDFS.key, "my_add") - .set( - CHConfig.runtimeSettings("input_format_parquet_use_native_reader_with_filter_push_down"), - "true") } final override val testCases: Seq[Int] = Seq( diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version index 79d1ca908ce9..b91935abe06c 100644 --- a/cpp-ch/clickhouse.version +++ b/cpp-ch/clickhouse.version @@ -1,3 +1,3 @@ CH_ORG=Kyligence CH_BRANCH=rebase_ch/20260425-25.12.10.7 -CH_COMMIT=0b1a0ba066e +CH_COMMIT=54c5bf9a97b diff --git a/cpp-ch/local-engine/CMakeLists.txt b/cpp-ch/local-engine/CMakeLists.txt index 2cd14ba2efe5..1955d10bc613 100644 --- a/cpp-ch/local-engine/CMakeLists.txt +++ b/cpp-ch/local-engine/CMakeLists.txt @@ -85,6 +85,7 @@ include_directories( ${ClickHouse_SOURCE_DIR}/contrib/azure/sdk/storage/azure-storage-blobs/inc ${ClickHouse_SOURCE_DIR}/contrib/azure/sdk/core/azure-core/inc ${ClickHouse_SOURCE_DIR}/contrib/azure/sdk/storage/azure-storage-common/inc + ${ClickHouse_SOURCE_DIR}/contrib/azure/sdk/identity/azure-identity/inc ${ClickHouse_SOURCE_DIR}/contrib/llvm-project/llvm/include ${ClickHouse_SOURCE_DIR}/contrib/llvm-project/utils/bazel/llvm-project-overlay/llvm/include ${ClickHouse_SOURCE_DIR}/contrib/libdivide diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index 851e9fb061dc..9487b87ef150 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -92,6 +92,10 @@ extern const ServerSettingsString vector_similarity_index_cache_policy; extern const ServerSettingsUInt64 vector_similarity_index_cache_size; extern const ServerSettingsUInt64 vector_similarity_index_cache_max_entries; extern const ServerSettingsDouble vector_similarity_index_cache_size_ratio; +extern const ServerSettingsString parquet_metadata_cache_policy; +extern const ServerSettingsUInt64 parquet_metadata_cache_size; +extern const ServerSettingsUInt64 parquet_metadata_cache_max_entries; +extern const ServerSettingsDouble parquet_metadata_cache_size_ratio; } namespace Setting { @@ -733,6 +737,8 @@ void BackendInitializerUtil::initSettings(const SparkConfigs::ConfigMap & spark_ settings.set("input_format_orc_skip_columns_with_unsupported_types_in_schema_inference", true); settings.set("input_format_parquet_allow_missing_columns", true); settings.set("input_format_parquet_case_insensitive_column_matching", true); + // TODO: will remove this parameter later, there are some errors when it's true + settings.set("input_format_parquet_use_native_reader_with_filter_push_down", false); settings.set("input_format_parquet_import_nested", true); settings.set("input_format_json_read_numbers_as_strings", true); settings.set("input_format_json_read_bools_as_numbers", false); @@ -854,6 +860,13 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config) // We must set the application type to CLIENT to avoid ServerUUID::get() throw exception global_context->setApplicationType(Context::ApplicationType::CLIENT); + + // create parquet meta data cache + String parquet_metadata_cache_policy = server_settings[ServerSetting::parquet_metadata_cache_policy]; + size_t parquet_metadata_cache_size = server_settings[ServerSetting::parquet_metadata_cache_size]; + size_t parquet_metadata_cache_max_entries = server_settings[ServerSetting::parquet_metadata_cache_max_entries]; + double parquet_metadata_cache_size_ratio = server_settings[ServerSetting::parquet_metadata_cache_size_ratio]; + global_context->setParquetMetadataCache(parquet_metadata_cache_policy, parquet_metadata_cache_size, parquet_metadata_cache_max_entries, parquet_metadata_cache_size_ratio); } else { diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp b/cpp-ch/local-engine/Parser/RelMetric.cpp index 4de98fdde426..98dd10f61c94 100644 --- a/cpp-ch/local-engine/Parser/RelMetric.cpp +++ b/cpp-ch/local-engine/Parser/RelMetric.cpp @@ -42,6 +42,9 @@ extern const Event CachedReadBufferCreateBufferMicroseconds; extern const Event CachedReadBufferReadFromCacheHits; extern const Event CachedReadBufferReadFromCacheMisses; + +extern const Event ParquetMetadataCacheHits; +extern const Event ParquetMetadataCacheMisses; } namespace local_engine @@ -57,6 +60,8 @@ static void writeCacheHits(Writer & writer) auto read_miss_bytes = counters[ProfileEvents::CachedReadBufferReadFromSourceBytes].load(); auto read_cache_millisecond = counters[ProfileEvents::CachedReadBufferReadFromCacheMicroseconds].load() / 1000; auto miss_cache_millisecond = counters[ProfileEvents::CachedReadBufferReadFromSourceMicroseconds].load() / 1000; + auto parquet_metadata_cache_hits = counters[ProfileEvents::ParquetMetadataCacheHits].load(); + auto parquet_metadata_cache_misses = counters[ProfileEvents::ParquetMetadataCacheMisses].load(); writer.Key("read_cache_hits"); writer.Uint64(read_cache_hits); @@ -70,6 +75,10 @@ static void writeCacheHits(Writer & writer) writer.Uint64(read_cache_millisecond); writer.Key("miss_cache_millisecond"); writer.Uint64(miss_cache_millisecond); + writer.Key("parquet_metadata_cache_hits"); + writer.Uint64(parquet_metadata_cache_hits); + writer.Key("parquet_metadata_cache_misses"); + writer.Uint64(parquet_metadata_cache_misses); } RelMetric::RelMetric(size_t id_, const String & name_, std::vector & steps_) : id(id_), name(name_), steps(steps_) diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp index 48b67c8337c0..9c97fed9fdd8 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp @@ -226,7 +226,19 @@ ParquetFormatFile::createInputFormat(const Block & header, const std::shared_ptr LOG_TRACE( &Poco::Logger::get("ParquetFormatFile"), "Using native parquet reader v3"); - auto input = std::make_shared(*read_buffer_, read_header, format_settings, parser_shared_resources, parser_group, min_bytes_for_seek); + // generate parquet meta data cache + ParquetMetadataCachePtr metadata_cache = context->getParquetMetadataCache(); + Poco::URI file_uri(file_info.uri_file()); + const std::optional object_with_metadata = RelativePathWithMetadata( + String(file_uri.getPath()), + ObjectMetadata{ + 0, + static_cast(file_info.properties().filesize()), + file_info.properties().modificationtime(), + file_uri.getScheme(), + {}, + {}}); + auto input = std::make_shared(*read_buffer_, read_header, format_settings, parser_shared_resources, parser_group, min_bytes_for_seek, metadata_cache, object_with_metadata); /// ParquetV3 Reader ignores `format_settings.parquet.skip_row_groups` (unlike ParquetBlockInputFormat). /// Spark splits are expressed via Substrait file start/length; metaBuilder selects row groups, and /// we must pass them as buckets so each task only reads its slice of the file. diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala index 1acf40cb435f..70adceab79ad 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala @@ -51,7 +51,7 @@ trait SubstraitBackend extends Backend with Logging { SoftAffinityListener.register(sc) } - postBuildInfoEvent(sc) + postBuildInfoEvent(sc, info()) setBuildInfoConfig(conf) setPredefinedConfigs(conf) @@ -96,7 +96,7 @@ trait SubstraitBackend extends Backend with Logging { object SubstraitBackend extends Logging { /** Since https://github.com/apache/gluten/pull/2247. */ - private def postBuildInfoEvent(sc: SparkContext): Unit = { + private def postBuildInfoEvent(sc: SparkContext, infos: Map[String, String]): Unit = { // export gluten version to property to spark System.setProperty("gluten.version", GlutenBuildInfo.VERSION) @@ -113,6 +113,7 @@ object SubstraitBackend extends Logging { glutenBuildInfo.put("Gluten Revision Time", GlutenBuildInfo.REVISION_TIME) glutenBuildInfo.put("Gluten Build Time", GlutenBuildInfo.BUILD_DATE) glutenBuildInfo.put("Gluten Repo URL", GlutenBuildInfo.REPO_URL) + infos.foreach { case (key, value) => glutenBuildInfo.put(key, value) } val loggingInfo = glutenBuildInfo .map { case (name, value) => s"$name: $value" } diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenBuildInfo.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenBuildInfo.scala index f96920103933..b204dca09b1d 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenBuildInfo.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenBuildInfo.scala @@ -55,4 +55,5 @@ object GlutenBuildInfo { val VELOX_REVISION_TIME: String = props.getProperty("velox_revision_time", unknown) val CH_BRANCH: String = props.getProperty("ch_branch", unknown) val CH_COMMIT: String = props.getProperty("ch_commit", unknown) + val CH_ORG: String = props.getProperty("ch_org", unknown) }