Skip to content

Commit b84e511

Browse files
committed
feat/show-process-list:
- **Refactor Row Construction**: Updated row construction in multiple files to use references for `Value` objects, improving memory efficiency. Affected files include: - `cluster_info.rs` - `columns.rs` - `flows.rs` - `key_column_usage.rs` - `partitions.rs` - `procedure_info.rs` - `process_list.rs` - `region_peers.rs` - `region_statistics.rs` - `schemata.rs` - `table_constraints.rs` - `tables.rs` - `views.rs` - `pg_class.rs` - `pg_database.rs` - `pg_namespace.rs` - **Remove Unused Code**: Deleted unused functions and error variants related to process management in `process_list.rs` and `error.rs`. - **Predicate Evaluation Update**: Modified predicate evaluation functions in `predicate.rs` to work with references, enhancing performance.
1 parent 86421f6 commit b84e511

File tree

18 files changed

+130
-189
lines changed

18 files changed

+130
-189
lines changed

src/catalog/src/system_schema/information_schema/cluster_info.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -178,11 +178,11 @@ impl InformationSchemaClusterInfoBuilder {
178178
let peer_type = node_info.status.role_name();
179179

180180
let row = [
181-
(PEER_ID, Value::from(node_info.peer.id)),
182-
(PEER_TYPE, Value::from(peer_type)),
183-
(PEER_ADDR, Value::from(node_info.peer.addr.as_str())),
184-
(VERSION, Value::from(node_info.version.as_str())),
185-
(GIT_COMMIT, Value::from(node_info.git_commit.as_str())),
181+
(PEER_ID, &Value::from(node_info.peer.id)),
182+
(PEER_TYPE, &Value::from(peer_type)),
183+
(PEER_ADDR, &Value::from(node_info.peer.addr.as_str())),
184+
(VERSION, &Value::from(node_info.version.as_str())),
185+
(GIT_COMMIT, &Value::from(node_info.git_commit.as_str())),
186186
];
187187

188188
if !predicates.eval(&row) {

src/catalog/src/system_schema/information_schema/columns.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -315,14 +315,14 @@ impl InformationSchemaColumnsBuilder {
315315
};
316316

317317
let row = [
318-
(TABLE_CATALOG, Value::from(catalog_name)),
319-
(TABLE_SCHEMA, Value::from(schema_name)),
320-
(TABLE_NAME, Value::from(table_name)),
321-
(COLUMN_NAME, Value::from(column_schema.name.as_str())),
322-
(DATA_TYPE, Value::from(data_type.as_str())),
323-
(SEMANTIC_TYPE, Value::from(semantic_type)),
324-
(ORDINAL_POSITION, Value::from((index + 1) as i64)),
325-
(COLUMN_KEY, Value::from(column_key)),
318+
(TABLE_CATALOG, &Value::from(catalog_name)),
319+
(TABLE_SCHEMA, &Value::from(schema_name)),
320+
(TABLE_NAME, &Value::from(table_name)),
321+
(COLUMN_NAME, &Value::from(column_schema.name.as_str())),
322+
(DATA_TYPE, &Value::from(data_type.as_str())),
323+
(SEMANTIC_TYPE, &Value::from(semantic_type)),
324+
(ORDINAL_POSITION, &Value::from((index + 1) as i64)),
325+
(COLUMN_KEY, &Value::from(column_key)),
326326
];
327327

328328
if !predicates.eval(&row) {

src/catalog/src/system_schema/information_schema/flows.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,11 +273,11 @@ impl InformationSchemaFlowsBuilder {
273273
flow_stat: &Option<FlowStat>,
274274
) -> Result<()> {
275275
let row = [
276-
(FLOW_NAME, Value::from(flow_info.flow_name().to_string())),
277-
(FLOW_ID, Value::from(flow_id)),
276+
(FLOW_NAME, &Value::from(flow_info.flow_name().to_string())),
277+
(FLOW_ID, &Value::from(flow_id)),
278278
(
279279
TABLE_CATALOG,
280-
Value::from(flow_info.catalog_name().to_string()),
280+
&Value::from(flow_info.catalog_name().to_string()),
281281
),
282282
];
283283
if !predicates.eval(&row) {

src/catalog/src/system_schema/information_schema/key_column_usage.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -290,13 +290,13 @@ impl InformationSchemaKeyColumnUsageBuilder {
290290
ordinal_position: u32,
291291
) {
292292
let row = [
293-
(CONSTRAINT_SCHEMA, Value::from(constraint_schema)),
294-
(CONSTRAINT_NAME, Value::from(constraint_name)),
295-
(REAL_TABLE_CATALOG, Value::from(table_catalog)),
296-
(TABLE_SCHEMA, Value::from(table_schema)),
297-
(TABLE_NAME, Value::from(table_name)),
298-
(COLUMN_NAME, Value::from(column_name)),
299-
(ORDINAL_POSITION, Value::from(ordinal_position)),
293+
(CONSTRAINT_SCHEMA, &Value::from(constraint_schema)),
294+
(CONSTRAINT_NAME, &Value::from(constraint_name)),
295+
(REAL_TABLE_CATALOG, &Value::from(table_catalog)),
296+
(TABLE_SCHEMA, &Value::from(table_schema)),
297+
(TABLE_NAME, &Value::from(table_name)),
298+
(COLUMN_NAME, &Value::from(column_name)),
299+
(ORDINAL_POSITION, &Value::from(ordinal_position)),
300300
];
301301

302302
if !predicates.eval(&row) {

src/catalog/src/system_schema/information_schema/partitions.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -311,9 +311,9 @@ impl InformationSchemaPartitionsBuilder {
311311
partitions: &[PartitionInfo],
312312
) {
313313
let row = [
314-
(TABLE_CATALOG, Value::from(catalog_name)),
315-
(TABLE_SCHEMA, Value::from(schema_name)),
316-
(TABLE_NAME, Value::from(table_name)),
314+
(TABLE_CATALOG, &Value::from(catalog_name)),
315+
(TABLE_SCHEMA, &Value::from(schema_name)),
316+
(TABLE_NAME, &Value::from(table_name)),
317317
];
318318

319319
if !predicates.eval(&row) {

src/catalog/src/system_schema/information_schema/procedure_info.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -187,12 +187,12 @@ impl InformationSchemaProcedureInfoBuilder {
187187
let lock_keys = lock_keys.join(",");
188188

189189
let row = [
190-
(PROCEDURE_ID, Value::from(pid.clone())),
191-
(PROCEDURE_TYPE, Value::from(type_name.clone())),
192-
(START_TIME, Value::from(start_time)),
193-
(END_TIME, Value::from(end_time)),
194-
(STATUS, Value::from(status.clone())),
195-
(LOCK_KEYS, Value::from(lock_keys.clone())),
190+
(PROCEDURE_ID, &Value::from(pid.clone())),
191+
(PROCEDURE_TYPE, &Value::from(type_name.clone())),
192+
(START_TIME, &Value::from(start_time)),
193+
(END_TIME, &Value::from(end_time)),
194+
(STATUS, &Value::from(status.clone())),
195+
(LOCK_KEYS, &Value::from(lock_keys.clone())),
196196
];
197197
if !predicates.eval(&row) {
198198
return;

src/catalog/src/system_schema/information_schema/process_list.rs

Lines changed: 23 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use std::sync::Arc;
1616

1717
use common_catalog::consts::INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID;
1818
use common_error::ext::BoxedError;
19-
use common_meta::key::process_list::Process;
2019
use common_recordbatch::adapter::RecordBatchStreamAdapter;
2120
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
2221
use common_time::util::current_time_millis;
@@ -134,20 +133,35 @@ async fn make_process_list(
134133
let mut start_time_builder = TimestampMillisecondVectorBuilder::with_capacity(8);
135134
let mut elapsed_time_builder = DurationMillisecondVectorBuilder::with_capacity(8);
136135

137-
let mut current_row = Vec::with_capacity(5);
138136
while let Some(process) = stream
139137
.next()
140138
.await
141139
.transpose()
142140
.context(error::ListProcessSnafu)?
143141
{
144-
process_to_row(process, current_time, &mut current_row);
145-
if predicates.eval(&current_row) {
146-
id_builder.push(current_row[0].1.as_u64());
147-
database_builder.push(current_row[1].1.as_string().as_deref());
148-
query_builder.push(current_row[2].1.as_string().as_deref());
149-
start_time_builder.push(current_row[3].1.as_timestamp().map(|t| t.value().into()));
150-
elapsed_time_builder.push(current_row[4].1.as_duration().map(|d| d.value().into()));
142+
let row = [
143+
(ID, &Value::from(process.query_id())),
144+
(DATABASE, &Value::from(process.database())),
145+
(QUERY, &Value::from(process.query_string())),
146+
(
147+
START_TIMESTAMP,
148+
&Value::from(Timestamp::new_millisecond(
149+
process.query_start_timestamp_ms(),
150+
)),
151+
),
152+
(
153+
ELAPSED_TIME,
154+
&Value::from(Duration::new_millisecond(
155+
current_time - process.query_start_timestamp_ms(),
156+
)),
157+
),
158+
];
159+
if predicates.eval(&row) {
160+
id_builder.push(row[0].1.as_u64());
161+
database_builder.push(row[1].1.as_string().as_deref());
162+
query_builder.push(row[2].1.as_string().as_deref());
163+
start_time_builder.push(row[3].1.as_timestamp().map(|t| t.value().into()));
164+
elapsed_time_builder.push(row[4].1.as_duration().map(|d| d.value().into()));
151165
}
152166
}
153167

@@ -162,34 +176,3 @@ async fn make_process_list(
162176
RecordBatch::new(InformationSchemaProcessList::schema(), columns)
163177
.context(error::CreateRecordBatchSnafu)
164178
}
165-
166-
// Convert [Process] structs to rows.
167-
fn process_to_row(
168-
process: Process,
169-
current_time_ms: i64,
170-
current_row: &mut Vec<(&'static str, Value)>,
171-
) {
172-
current_row.clear();
173-
current_row.push((ID, Value::UInt64(process.query_id())));
174-
current_row.push((
175-
DATABASE,
176-
Value::String(process.database().to_string().into()),
177-
));
178-
current_row.push((
179-
QUERY,
180-
Value::String(process.query_string().to_string().into()),
181-
));
182-
183-
current_row.push((
184-
START_TIMESTAMP,
185-
Value::Timestamp(Timestamp::new_millisecond(
186-
process.query_start_timestamp_ms(),
187-
)),
188-
));
189-
current_row.push((
190-
ELAPSED_TIME,
191-
Value::Duration(Duration::new_millisecond(
192-
current_time_ms - process.query_start_timestamp_ms(),
193-
)),
194-
));
195-
}

src/catalog/src/system_schema/information_schema/region_peers.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -263,10 +263,10 @@ impl InformationSchemaRegionPeersBuilder {
263263
};
264264

265265
let row = [
266-
(TABLE_CATALOG, Value::from(table_catalog)),
267-
(TABLE_SCHEMA, Value::from(table_schema)),
268-
(TABLE_NAME, Value::from(table_name)),
269-
(REGION_ID, Value::from(region_id)),
266+
(TABLE_CATALOG, &Value::from(table_catalog)),
267+
(TABLE_SCHEMA, &Value::from(table_schema)),
268+
(TABLE_NAME, &Value::from(table_name)),
269+
(REGION_ID, &Value::from(region_id)),
270270
];
271271

272272
if !predicates.eval(&row) {

src/catalog/src/system_schema/information_schema/region_statistics.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -189,17 +189,17 @@ impl InformationSchemaRegionStatisticsBuilder {
189189

190190
fn add_region_statistic(&mut self, predicate: &Predicates, region_stat: RegionStat) {
191191
let row = [
192-
(REGION_ID, Value::from(region_stat.id.as_u64())),
193-
(TABLE_ID, Value::from(region_stat.id.table_id())),
194-
(REGION_NUMBER, Value::from(region_stat.id.region_number())),
195-
(REGION_ROWS, Value::from(region_stat.num_rows)),
196-
(DISK_SIZE, Value::from(region_stat.approximate_bytes)),
197-
(MEMTABLE_SIZE, Value::from(region_stat.memtable_size)),
198-
(MANIFEST_SIZE, Value::from(region_stat.manifest_size)),
199-
(SST_SIZE, Value::from(region_stat.sst_size)),
200-
(INDEX_SIZE, Value::from(region_stat.index_size)),
201-
(ENGINE, Value::from(region_stat.engine.as_str())),
202-
(REGION_ROLE, Value::from(region_stat.role.to_string())),
192+
(REGION_ID, &Value::from(region_stat.id.as_u64())),
193+
(TABLE_ID, &Value::from(region_stat.id.table_id())),
194+
(REGION_NUMBER, &Value::from(region_stat.id.region_number())),
195+
(REGION_ROWS, &Value::from(region_stat.num_rows)),
196+
(DISK_SIZE, &Value::from(region_stat.approximate_bytes)),
197+
(MEMTABLE_SIZE, &Value::from(region_stat.memtable_size)),
198+
(MANIFEST_SIZE, &Value::from(region_stat.manifest_size)),
199+
(SST_SIZE, &Value::from(region_stat.sst_size)),
200+
(INDEX_SIZE, &Value::from(region_stat.index_size)),
201+
(ENGINE, &Value::from(region_stat.engine.as_str())),
202+
(REGION_ROLE, &Value::from(region_stat.role.to_string())),
203203
];
204204

205205
if !predicate.eval(&row) {

src/catalog/src/system_schema/information_schema/schemata.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -204,11 +204,11 @@ impl InformationSchemaSchemataBuilder {
204204
schema_options: &str,
205205
) {
206206
let row = [
207-
(CATALOG_NAME, Value::from(catalog_name)),
208-
(SCHEMA_NAME, Value::from(schema_name)),
209-
(DEFAULT_CHARACTER_SET_NAME, Value::from("utf8")),
210-
(DEFAULT_COLLATION_NAME, Value::from("utf8_bin")),
211-
(SCHEMA_OPTS, Value::from(schema_options)),
207+
(CATALOG_NAME, &Value::from(catalog_name)),
208+
(SCHEMA_NAME, &Value::from(schema_name)),
209+
(DEFAULT_CHARACTER_SET_NAME, &Value::from("utf8")),
210+
(DEFAULT_COLLATION_NAME, &Value::from("utf8_bin")),
211+
(SCHEMA_OPTS, &Value::from(schema_options)),
212212
];
213213

214214
if !predicates.eval(&row) {

0 commit comments

Comments
 (0)