Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions src/common/base/src/runtime/profile/profiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ pub enum ProfileStatisticsName {
OutputRows,
OutputBytes,
ScanBytes,
ScanCacheBytes,
ScanPartitions,
ScanBytesFromRemote,
ScanBytesFromLocal,
ScanBytesFromMemory,

RemoteSpillWriteCount,
RemoteSpillWriteBytes,
Expand Down Expand Up @@ -190,20 +192,34 @@ pub fn get_statistics_desc() -> Arc<BTreeMap<ProfileStatisticsName, ProfileDesc>
unit: StatisticsUnit::Bytes,
plain_statistics: true,
}),
(ProfileStatisticsName::ScanCacheBytes, ProfileDesc {
display_name: "bytes scanned from cache",
desc: "The bytes scanned from cache of query",
index: ProfileStatisticsName::ScanCacheBytes as usize,
unit: StatisticsUnit::Bytes,
plain_statistics: true,
}),
(ProfileStatisticsName::ScanPartitions, ProfileDesc {
display_name: "partitions scanned",
desc: "The partitions scanned of query",
index: ProfileStatisticsName::ScanPartitions as usize,
unit: StatisticsUnit::Count,
plain_statistics: true,
}),
(ProfileStatisticsName::ScanBytesFromRemote, ProfileDesc {
display_name: "bytes scanned from remote",
desc: "The bytes scanned from remote storage (compressed)",
index: ProfileStatisticsName::ScanBytesFromRemote as usize,
unit: StatisticsUnit::Bytes,
plain_statistics: true,
}),
(ProfileStatisticsName::ScanBytesFromLocal, ProfileDesc {
display_name: "bytes scanned from local cache",
desc: "The bytes scanned from local disk cache (compressed)",
index: ProfileStatisticsName::ScanBytesFromLocal as usize,
unit: StatisticsUnit::Bytes,
plain_statistics: true,
}),
(ProfileStatisticsName::ScanBytesFromMemory, ProfileDesc {
display_name: "bytes scanned from memory cache",
desc: "The bytes scanned from memory cache (compressed)",
index: ProfileStatisticsName::ScanBytesFromMemory as usize,
unit: StatisticsUnit::Bytes,
plain_statistics: true,
}),
(ProfileStatisticsName::RemoteSpillWriteCount, ProfileDesc {
display_name: "numbers remote spilled by write",
desc: "The number of remote spilled by write",
Expand Down
60 changes: 53 additions & 7 deletions src/query/sql/src/executor/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashMap;
use databend_common_ast::ast::FormatTreeNode;
use databend_common_base::base::format_byte_size;
use databend_common_base::runtime::profile::get_statistics_desc;
use databend_common_base::runtime::profile::ProfileStatisticsName;
use databend_common_catalog::plan::PartStatistics;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand Down Expand Up @@ -612,14 +613,54 @@ fn append_profile_info(
plan_id: u32,
) {
if let Some(prof) = profs.get(&plan_id) {
for (_, desc) in get_statistics_desc().iter() {
if prof.statistics[desc.index] != 0 {
children.push(FormatTreeNode::new(format!(
// Calculate total scan IO bytes for percentage
let total_scan_io = prof.statistics[ProfileStatisticsName::ScanBytesFromRemote as usize]
+ prof.statistics[ProfileStatisticsName::ScanBytesFromLocal as usize]
+ prof.statistics[ProfileStatisticsName::ScanBytesFromMemory as usize];

for (stat_name, desc) in get_statistics_desc().iter() {
let value = prof.statistics[desc.index];
let always_show = matches!(
stat_name,
ProfileStatisticsName::ScanBytes
| ProfileStatisticsName::ScanBytesFromRemote
| ProfileStatisticsName::ScanBytesFromLocal
| ProfileStatisticsName::ScanBytesFromMemory
);

if value == 0 && !always_show {
continue;
}

// Add percentage for cache-related statistics
let display_text = if total_scan_io > 0 {
match stat_name {
ProfileStatisticsName::ScanBytesFromRemote
| ProfileStatisticsName::ScanBytesFromLocal
| ProfileStatisticsName::ScanBytesFromMemory => {
let percentage = (value as f64 / total_scan_io as f64) * 100.0;
format!(
"{}: {} ({:.2}%)",
desc.display_name.to_lowercase(),
desc.human_format(value),
percentage
)
}
_ => format!(
"{}: {}",
desc.display_name.to_lowercase(),
desc.human_format(value)
),
}
} else {
format!(
"{}: {}",
desc.display_name.to_lowercase(),
desc.human_format(prof.statistics[desc.index])
)));
}
desc.human_format(value)
)
};

children.push(FormatTreeNode::new(display_text));
}
}
}
Expand Down Expand Up @@ -863,7 +904,12 @@ fn table_scan_to_format_tree(
context: &mut FormatContext,
) -> Result<FormatTreeNode<String>> {
if plan.table_index == Some(DUMMY_TABLE_INDEX) {
return Ok(FormatTreeNode::new("DummyTableScan".to_string()));
let mut children = vec![];
append_profile_info(&mut children, profs, plan.plan_id);
return Ok(FormatTreeNode::with_children(
"DummyTableScan".to_string(),
children,
));
}

let table_name = match plan.table_index {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ impl CacheAccessor for DiskCacheAccessor {
metrics_inc_cache_access_count(1, &self.name);
let k = k.as_ref();
if let Some(item) = self.lru_disk_cache.get(k) {
Profile::record_usize_profile(ProfileStatisticsName::ScanCacheBytes, item.len());
let size = item.len();
Profile::record_usize_profile(ProfileStatisticsName::ScanBytesFromLocal, size);
metrics_inc_cache_hit_count(1, &self.name);
Some(item)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,14 @@ where

fn get<Q: AsRef<str>>(&self, k: Q) -> Option<Arc<Self::V>> {
if let Some(item) = self.memory_cache.get(k.as_ref()) {
// Record memory cache hit
// Note: The actual size recording happens in memory_cache.get()
// try putting it bach to on-disk cache if necessary
self.insert_to_disk_cache_if_necessary(k.as_ref(), item.as_ref());
Some(item)
} else if let Some(bytes) = self.disk_cache.get(k.as_ref()) {
// Record disk cache hit
// Note: The actual size recording happens in disk_cache.get()
let bytes = bytes.as_ref().clone();
match bytes.try_into() {
Ok(v) => Some(self.memory_cache.insert(k.as_ref().to_owned(), v)),
Expand All @@ -169,7 +173,7 @@ where
}
}
} else {
// Cache Miss
// Cache Miss - will need to read from remote
None
}
}
Expand Down
19 changes: 15 additions & 4 deletions src/query/storages/common/cache/src/providers/memory_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ impl<V: Into<CacheValue<V>>> InMemoryLruCache<V> {
mod impls {
use std::sync::Arc;

use databend_common_base::runtime::profile::Profile;
use databend_common_base::runtime::profile::ProfileStatisticsName;
use databend_common_cache::MemSized;
use databend_common_metrics::cache::metrics_inc_cache_access_count;
use databend_common_metrics::cache::metrics_inc_cache_hit_count;
use databend_common_metrics::cache::metrics_inc_cache_miss_bytes;
Expand All @@ -93,16 +96,24 @@ mod impls {

fn get<Q: AsRef<str>>(&self, k: Q) -> Option<Arc<V>> {
metrics_inc_cache_access_count(1, self.name());
let v = {
let (v, mem_bytes) = {
let mut guard = self.inner.write();
guard
.get(k.as_ref())
.map(|cache_value: &CacheValue<V>| cache_value.get_inner())
match guard.get(k.as_ref()) {
Some(cache_value) => {
(Some(cache_value.get_inner()), Some(cache_value.mem_bytes()))
}
None => (None, None),
}
};

if v.is_none() {
metrics_inc_cache_miss_count(1, &self.name);
} else {
metrics_inc_cache_hit_count(1, &self.name);
// Record bytes scanned from memory cache
if let Some(size) = mem_bytes {
Profile::record_usize_profile(ProfileStatisticsName::ScanBytesFromMemory, size);
}
}
v
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::collections::HashMap;
use std::collections::HashSet;

use databend_common_base::runtime::profile::Profile;
use databend_common_base::runtime::profile::ProfileStatisticsName;
use databend_common_exception::Result;
use databend_common_expression::ColumnId;
use databend_common_metrics::storage::*;
Expand Down Expand Up @@ -87,6 +89,12 @@ impl BlockReader {
metrics_inc_remote_io_seeks(1);
metrics_inc_remote_io_read_bytes(len);
}

// Record bytes scanned from remote storage
Profile::record_usize_profile(
ProfileStatisticsName::ScanBytesFromRemote,
len as usize,
);
}
}

Expand Down
20 changes: 19 additions & 1 deletion tests/sqllogictests/suites/mode/cluster/explain_analyze.test
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ EvalScalar
├── cpu time: <slt:ignore>
├── output rows: 1
├── output bytes: 1.00 B
└── bytes scanned: 1.00 B
├── bytes scanned: 1.00 B
├── bytes scanned from local cache: <slt:ignore>
├── bytes scanned from memory cache: <slt:ignore>
└── bytes scanned from remote: <slt:ignore>

query T
settings (max_threads = 4) EXPLAIN ANALYZE select avg(number) from numbers(1000000);
Expand Down Expand Up @@ -42,6 +45,9 @@ EvalScalar
├── output rows: 1 million
├── output bytes: 7.63 MiB
├── bytes scanned: 7.63 MiB
├── bytes scanned from local cache: <slt:ignore>
├── bytes scanned from memory cache: <slt:ignore>
├── bytes scanned from remote: <slt:ignore>
├── table: default.system.numbers
├── scan id: 0
├── output columns: [number (#0)]
Expand Down Expand Up @@ -128,6 +134,9 @@ Exchange
├── output rows: 6
├── output bytes: 120.00 B
├── bytes scanned: 120.00 B
├── bytes scanned from local cache: <slt:ignore>
├── bytes scanned from memory cache: <slt:ignore>
├── bytes scanned from remote: <slt:ignore>
├── runtime filter inlist/min-max time: <slt:ignore>
├── table: default.default.article
├── scan id: 0
Expand Down Expand Up @@ -159,6 +168,9 @@ Exchange
├── output rows: 4
├── output bytes: 80.00 B
├── bytes scanned: 80.00 B
├── bytes scanned from local cache: <slt:ignore>
├── bytes scanned from memory cache: <slt:ignore>
├── bytes scanned from remote: <slt:ignore>
├── runtime filter inlist/min-max time: <slt:ignore>
├── table: default.default.article
├── scan id: 0
Expand Down Expand Up @@ -208,6 +220,9 @@ Exchange
│ ├── output rows: 1
│ ├── output bytes: 31.00 B
│ ├── bytes scanned: 31.00 B
│ ├── bytes scanned from local cache: <slt:ignore>
│ ├── bytes scanned from memory cache: <slt:ignore>
│ ├── bytes scanned from remote: <slt:ignore>
│ ├── runtime filter inlist/min-max time: <slt:ignore>
│ ├── table: default.default.author
│ ├── scan id: 1
Expand All @@ -225,6 +240,9 @@ Exchange
├── output rows: 6
├── output bytes: 120.00 B
├── bytes scanned: 120.00 B
├── bytes scanned from local cache: <slt:ignore>
├── bytes scanned from memory cache: <slt:ignore>
├── bytes scanned from remote: <slt:ignore>
├── runtime filter inlist/min-max time: <slt:ignore>
├── table: default.default.article
├── scan id: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ EvalScalar
├── cpu time: <slt:ignore>
├── output rows: 1
├── output bytes: 1.00 B
└── bytes scanned: 1.00 B
├── bytes scanned: 1.00 B
├── bytes scanned from local cache: <slt:ignore>
├── bytes scanned from memory cache: <slt:ignore>
└── bytes scanned from remote: <slt:ignore>

query T
settings (max_threads = 4) EXPLAIN ANALYZE select avg(number) from numbers(1000000);
Expand Down Expand Up @@ -38,6 +41,9 @@ EvalScalar
├── output rows: 1 million
├── output bytes: 7.63 MiB
├── bytes scanned: 7.63 MiB
├── bytes scanned from local cache: <slt:ignore>
├── bytes scanned from memory cache: <slt:ignore>
├── bytes scanned from remote: <slt:ignore>
├── table: default.system.numbers
├── scan id: 0
├── output columns: [number (#0)]
Expand Down Expand Up @@ -138,6 +144,9 @@ Filter
├── output rows: 4
├── output bytes: 80.00 B
├── bytes scanned: 80.00 B
├── bytes scanned from local cache: <slt:ignore>
├── bytes scanned from memory cache: <slt:ignore>
├── bytes scanned from remote: <slt:ignore>
├── runtime filter inlist/min-max time: <slt:ignore>
├── table: default.default.article
├── scan id: 0
Expand Down Expand Up @@ -176,6 +185,9 @@ HashJoin
│ ├── output rows: 1
│ ├── output bytes: 31.00 B
│ ├── bytes scanned: 31.00 B
│ ├── bytes scanned from local cache: <slt:ignore>
│ ├── bytes scanned from memory cache: <slt:ignore>
│ ├── bytes scanned from remote: <slt:ignore>
│ ├── runtime filter inlist/min-max time: <slt:ignore>
│ ├── table: default.default.author
│ ├── scan id: 1
Expand All @@ -193,6 +205,9 @@ HashJoin
├── output rows: 6
├── output bytes: 120.00 B
├── bytes scanned: 120.00 B
├── bytes scanned from local cache: <slt:ignore>
├── bytes scanned from memory cache: <slt:ignore>
├── bytes scanned from remote: <slt:ignore>
├── runtime filter inlist/min-max time: <slt:ignore>
├── table: default.default.article
├── scan id: 0
Expand Down
Loading