Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ public class MetricsStep {
@JsonProperty("miss_cache_millisecond")
protected long missCacheMillisecond;

@JsonProperty("parquet_metadata_cache_hits")
protected long parquetMetadataCacheHits;

@JsonProperty("parquet_metadata_cache_misses")
protected long parquetMetadataCacheMisses;

public String getName() {
return name;
}
Expand Down Expand Up @@ -148,4 +154,20 @@ public long getMissCacheMillisecond() {
public void setMissCacheMillisecond(long missCacheMillisecond) {
this.missCacheMillisecond = missCacheMillisecond;
}

public long getParquetMetadataCacheHits() {
return parquetMetadataCacheHits;
}

public void setParquetMetadataCacheHits(long parquetMetadataCacheHits) {
this.parquetMetadataCacheHits = parquetMetadataCacheHits;
}

public long getParquetMetadataCacheMisses() {
return parquetMetadataCacheMisses;
}

public void setParquetMetadataCacheMisses(long parquetMetadataCacheMisses) {
this.parquetMetadataCacheMisses = parquetMetadataCacheMisses;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class CHBackend extends SubstraitBackend {
override def name(): String = CHConfig.BACKEND_NAME
override def info(): Map[String, String] = {
Map(
"ch_org" -> CH_ORG,
"ch_branch" -> CH_BRANCH,
"ch_revision" -> CH_COMMIT
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,13 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
"Time reading from filesystem cache"),
"missCacheMillisecond" -> SQLMetrics.createTimingMetric(
sparkContext,
"Time reading from filesystem cache source (from remote filesystem, etc)")
"Time reading from filesystem cache source (from remote filesystem, etc)"),
"parquetMetadataCacheHits" -> SQLMetrics.createMetric(
sparkContext,
"Number of times parquet metadata has been found in the cache"),
"parquetMetadataCacheMisses" -> SQLMetrics.createMetric(
sparkContext,
"Number of times parquet metadata has not been found in the cache")
)

override def genFileSourceScanTransformerMetricsUpdater(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
val readMissBytes: SQLMetric = metrics("readMissBytes")
val readCacheMillisecond: SQLMetric = metrics("readCacheMillisecond")
val missCacheMillisecond: SQLMetric = metrics("missCacheMillisecond")
val parquetMetadataCacheHits: SQLMetric = metrics("parquetMetadataCacheHits")
val parquetMetadataCacheMisses: SQLMetric = metrics("parquetMetadataCacheMisses")

override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
// inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value)
Expand Down Expand Up @@ -71,6 +73,8 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
readMissBytes += step.readMissBytes
readCacheMillisecond += step.readCacheMillisecond
missCacheMillisecond += step.missCacheMillisecond
parquetMetadataCacheHits += step.parquetMetadataCacheHits
parquetMetadataCacheMisses += step.parquetMetadataCacheMisses
})

MetricsUtil.updateExtraTimeMetric(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.gluten.execution.tpch

import org.apache.gluten.backendsapi.clickhouse.CHConfig
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution._

Expand All @@ -34,9 +33,6 @@ class GlutenClickHouseTPCHSaltNullNativeParquetSuite
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set(GlutenConfig.GLUTEN_SUPPORTED_SCALA_UDFS.key, "my_add")
.set(
CHConfig.runtimeSettings("input_format_parquet_use_native_reader_with_filter_push_down"),
"true")
}

final override val testCases: Seq[Int] = Seq(
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20260425-25.12.10.7
CH_COMMIT=0b1a0ba066e
CH_COMMIT=54c5bf9a97b
1 change: 1 addition & 0 deletions cpp-ch/local-engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ include_directories(
${ClickHouse_SOURCE_DIR}/contrib/azure/sdk/storage/azure-storage-blobs/inc
${ClickHouse_SOURCE_DIR}/contrib/azure/sdk/core/azure-core/inc
${ClickHouse_SOURCE_DIR}/contrib/azure/sdk/storage/azure-storage-common/inc
${ClickHouse_SOURCE_DIR}/contrib/azure/sdk/identity/azure-identity/inc
${ClickHouse_SOURCE_DIR}/contrib/llvm-project/llvm/include
${ClickHouse_SOURCE_DIR}/contrib/llvm-project/utils/bazel/llvm-project-overlay/llvm/include
${ClickHouse_SOURCE_DIR}/contrib/libdivide
Expand Down
13 changes: 13 additions & 0 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ extern const ServerSettingsString vector_similarity_index_cache_policy;
extern const ServerSettingsUInt64 vector_similarity_index_cache_size;
extern const ServerSettingsUInt64 vector_similarity_index_cache_max_entries;
extern const ServerSettingsDouble vector_similarity_index_cache_size_ratio;
extern const ServerSettingsString parquet_metadata_cache_policy;
extern const ServerSettingsUInt64 parquet_metadata_cache_size;
extern const ServerSettingsUInt64 parquet_metadata_cache_max_entries;
extern const ServerSettingsDouble parquet_metadata_cache_size_ratio;
}
namespace Setting
{
Expand Down Expand Up @@ -733,6 +737,8 @@ void BackendInitializerUtil::initSettings(const SparkConfigs::ConfigMap & spark_
settings.set("input_format_orc_skip_columns_with_unsupported_types_in_schema_inference", true);
settings.set("input_format_parquet_allow_missing_columns", true);
settings.set("input_format_parquet_case_insensitive_column_matching", true);
// TODO: will remove this parameter later, there are some errors when it's true
settings.set("input_format_parquet_use_native_reader_with_filter_push_down", false);
settings.set("input_format_parquet_import_nested", true);
settings.set("input_format_json_read_numbers_as_strings", true);
settings.set("input_format_json_read_bools_as_numbers", false);
Expand Down Expand Up @@ -854,6 +860,13 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)

// We must set the application type to CLIENT to avoid ServerUUID::get() throw exception
global_context->setApplicationType(Context::ApplicationType::CLIENT);

// create parquet meta data cache
String parquet_metadata_cache_policy = server_settings[ServerSetting::parquet_metadata_cache_policy];
size_t parquet_metadata_cache_size = server_settings[ServerSetting::parquet_metadata_cache_size];
size_t parquet_metadata_cache_max_entries = server_settings[ServerSetting::parquet_metadata_cache_max_entries];
double parquet_metadata_cache_size_ratio = server_settings[ServerSetting::parquet_metadata_cache_size_ratio];
global_context->setParquetMetadataCache(parquet_metadata_cache_policy, parquet_metadata_cache_size, parquet_metadata_cache_max_entries, parquet_metadata_cache_size_ratio);
}
else
{
Expand Down
9 changes: 9 additions & 0 deletions cpp-ch/local-engine/Parser/RelMetric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ extern const Event CachedReadBufferCreateBufferMicroseconds;

extern const Event CachedReadBufferReadFromCacheHits;
extern const Event CachedReadBufferReadFromCacheMisses;

extern const Event ParquetMetadataCacheHits;
extern const Event ParquetMetadataCacheMisses;
}

namespace local_engine
Expand All @@ -57,6 +60,8 @@ static void writeCacheHits(Writer<StringBuffer> & writer)
auto read_miss_bytes = counters[ProfileEvents::CachedReadBufferReadFromSourceBytes].load();
auto read_cache_millisecond = counters[ProfileEvents::CachedReadBufferReadFromCacheMicroseconds].load() / 1000;
auto miss_cache_millisecond = counters[ProfileEvents::CachedReadBufferReadFromSourceMicroseconds].load() / 1000;
auto parquet_metadata_cache_hits = counters[ProfileEvents::ParquetMetadataCacheHits].load();
auto parquet_metadata_cache_misses = counters[ProfileEvents::ParquetMetadataCacheMisses].load();

writer.Key("read_cache_hits");
writer.Uint64(read_cache_hits);
Expand All @@ -70,6 +75,10 @@ static void writeCacheHits(Writer<StringBuffer> & writer)
writer.Uint64(read_cache_millisecond);
writer.Key("miss_cache_millisecond");
writer.Uint64(miss_cache_millisecond);
writer.Key("parquet_metadata_cache_hits");
writer.Uint64(parquet_metadata_cache_hits);
writer.Key("parquet_metadata_cache_misses");
writer.Uint64(parquet_metadata_cache_misses);
}

RelMetric::RelMetric(size_t id_, const String & name_, std::vector<DB::IQueryPlanStep *> & steps_) : id(id_), name(name_), steps(steps_)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,19 @@ ParquetFormatFile::createInputFormat(const Block & header, const std::shared_ptr
LOG_TRACE(
&Poco::Logger::get("ParquetFormatFile"),
"Using native parquet reader v3");
auto input = std::make_shared<ParquetV3BlockInputFormat>(*read_buffer_, read_header, format_settings, parser_shared_resources, parser_group, min_bytes_for_seek);
// generate parquet meta data cache
ParquetMetadataCachePtr metadata_cache = context->getParquetMetadataCache();
Poco::URI file_uri(file_info.uri_file());
const std::optional<RelativePathWithMetadata> object_with_metadata = RelativePathWithMetadata(
String(file_uri.getPath()),
ObjectMetadata{
0,
static_cast<uint64_t>(file_info.properties().filesize()),
file_info.properties().modificationtime(),
file_uri.getScheme(),
{},
{}});
auto input = std::make_shared<ParquetV3BlockInputFormat>(*read_buffer_, read_header, format_settings, parser_shared_resources, parser_group, min_bytes_for_seek, metadata_cache, object_with_metadata);
/// ParquetV3 Reader ignores `format_settings.parquet.skip_row_groups` (unlike ParquetBlockInputFormat).
/// Spark splits are expressed via Substrait file start/length; metaBuilder selects row groups, and
/// we must pass them as buckets so each task only reads its slice of the file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ trait SubstraitBackend extends Backend with Logging {
SoftAffinityListener.register(sc)
}

postBuildInfoEvent(sc)
postBuildInfoEvent(sc, info())
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhouyuan @zhztheplayer please help to review, thanks.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some version infos for the each backend:
image

setBuildInfoConfig(conf)

setPredefinedConfigs(conf)
Expand Down Expand Up @@ -96,7 +96,7 @@ trait SubstraitBackend extends Backend with Logging {
object SubstraitBackend extends Logging {

/** Since https://github.com/apache/gluten/pull/2247. */
private def postBuildInfoEvent(sc: SparkContext): Unit = {
private def postBuildInfoEvent(sc: SparkContext, infos: Map[String, String]): Unit = {
// export gluten version to property to spark
System.setProperty("gluten.version", GlutenBuildInfo.VERSION)

Expand All @@ -113,6 +113,7 @@ object SubstraitBackend extends Logging {
glutenBuildInfo.put("Gluten Revision Time", GlutenBuildInfo.REVISION_TIME)
glutenBuildInfo.put("Gluten Build Time", GlutenBuildInfo.BUILD_DATE)
glutenBuildInfo.put("Gluten Repo URL", GlutenBuildInfo.REPO_URL)
infos.foreach { case (key, value) => glutenBuildInfo.put(key, value) }

val loggingInfo = glutenBuildInfo
.map { case (name, value) => s"$name: $value" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,5 @@ object GlutenBuildInfo {
val VELOX_REVISION_TIME: String = props.getProperty("velox_revision_time", unknown)
val CH_BRANCH: String = props.getProperty("ch_branch", unknown)
val CH_COMMIT: String = props.getProperty("ch_commit", unknown)
val CH_ORG: String = props.getProperty("ch_org", unknown)
}
Loading