@@ -22,8 +22,8 @@ use crate::oxql::ast::table_ops::limit::LimitKind;
2222use crate :: oxql:: query:: QueryAuthzScope ;
2323use crate :: query:: field_table_name;
2424use oximeter:: Measurement ;
25- use oximeter:: TimeseriesSchema ;
2625use oximeter:: schema:: TimeseriesKey ;
26+ use oximeter:: { FieldSchema , TimeseriesSchema } ;
2727use slog:: Logger ;
2828use slog:: debug;
2929use slog:: trace;
@@ -949,98 +949,118 @@ impl Client {
949949 Ok ( query)
950950 }
951951
952+ // We store fields in long tables of (field_name, field_value, ...) with
953+ // separate tables by field type. Here, we need to pivot to wide tables,
954+ // collecting all field (name, value) pairs for a given timeseries key. To
955+ // do this relatively efficiently, we use the `merge` table function to
956+ // scan multiple field tables in parallel, and pivot results using maps.
957+ // Due to a bug in the version of ClickHouse we're using at the moment, we
958+ // can't merge all field tables together at once, since ClickHouse attempts
959+ // to cast values to the wrong types as part of the the merge. To work
960+ // around this, we group field tables into sets that ClickHouse can merge
961+ // correctly, then join them together to collect the results. See #9256 for
962+ // details.
963+ //
964+ // TODO(jmcarp): simplify the merge logic after upgrading to a ClickHouse
965+ // version with a fix for this bug, or refactor further with a different
966+ // approach. We may be able to do better by aggregating results at the
967+ // application layer, or by materializing fields into a wide table on
968+ // write.
952969 fn all_fields_query_raw (
953970 & self ,
954971 schema : & TimeseriesSchema ,
955972 ) -> ( bool , String ) {
956973 match schema. field_schema . len ( ) {
957974 0 => unreachable ! ( ) ,
958- 1 => {
959- let field_schema = schema. field_schema . first ( ) . unwrap ( ) ;
960- (
961- true ,
962- format ! (
963- "SELECT DISTINCT timeseries_key, field_value AS {field_name} \
964- FROM {db_name}.{field_table} \
965- WHERE \
966- timeseries_name = '{timeseries_name}' AND \
967- field_name = '{field_name}'",
968- field_name = field_schema. name,
969- db_name = crate :: DATABASE_NAME ,
970- field_table = field_table_name( field_schema. field_type) ,
971- timeseries_name = schema. timeseries_name,
972- ) ,
973- )
974- }
975975 _ => {
976- let mut top_level_columns =
977- Vec :: with_capacity ( schema. field_schema . len ( ) ) ;
978- let mut field_subqueries =
979- Vec :: with_capacity ( schema. field_schema . len ( ) ) ;
976+ let mut intish_fields: Vec < & FieldSchema > = Vec :: new ( ) ;
977+ let mut textish_fields: Vec < & FieldSchema > = Vec :: new ( ) ;
980978
981- // Select each field value, aliasing it to its field name.
979+ let mut selects: Vec < String > =
980+ vec ! [ String :: from( "timeseries_key" ) ] ;
981+ let mut froms: Vec < String > = Vec :: new ( ) ;
982982 for field_schema in schema. field_schema . iter ( ) {
983- top_level_columns. push ( format ! (
984- "filter_on_{}.field_value AS {}" ,
985- field_schema. name, field_schema. name
986- ) ) ;
987- field_subqueries. push ( (
988- format ! (
989- "SELECT DISTINCT timeseries_key, field_value \
990- FROM {db_name}.{field_table} \
991- WHERE \
992- timeseries_name = '{timeseries_name}' AND \
993- field_name = '{field_name}' \
994- ",
995- db_name = crate :: DATABASE_NAME ,
996- field_table =
997- field_table_name( field_schema. field_type) ,
998- timeseries_name = schema. timeseries_name,
999- field_name = field_schema. name,
1000- ) ,
1001- format ! ( "filter_on_{}" , field_schema. name) ,
1002- ) ) ;
1003- }
1004-
1005- // Write the top-level select statement, starting by selecting
1006- // the timeseries key from the first field schema.
1007- let mut out = format ! (
1008- "SELECT {}.timeseries_key AS timeseries_key, {} FROM " ,
1009- field_subqueries[ 0 ] . 1 ,
1010- top_level_columns. join( ", " ) ,
1011- ) ;
1012-
1013- // Then add all the subqueries selecting each field.
1014- //
1015- // We need to add these, along with their aliases. The first
1016- // such subquery has no join conditions, but the later ones all
1017- // refer to the previous via:
1018- //
1019- // `ON <previous_filter_name>.timeseries_key = <current_filter_name>.timeseries_key`
1020- for ( i, ( subq, alias) ) in field_subqueries. iter ( ) . enumerate ( ) {
1021- // Push the subquery itself, aliased.
1022- out. push ( '(' ) ;
1023- out. push_str ( subq) ;
1024- out. push_str ( ") AS " ) ;
1025- out. push_str ( alias) ;
1026-
1027- // Push the join conditions.
1028- if i > 0 {
1029- let previous_alias = & field_subqueries[ i - 1 ] . 1 ;
1030- out. push_str ( " ON " ) ;
1031- out. push_str ( alias) ;
1032- out. push_str ( ".timeseries_key = " ) ;
1033- out. push_str ( previous_alias) ;
1034- out. push_str ( ".timeseries_key" ) ;
983+ match field_schema. field_type {
984+ oximeter:: FieldType :: I8
985+ | oximeter:: FieldType :: I16
986+ | oximeter:: FieldType :: I32
987+ | oximeter:: FieldType :: I64
988+ | oximeter:: FieldType :: U8
989+ | oximeter:: FieldType :: U16
990+ | oximeter:: FieldType :: U32
991+ | oximeter:: FieldType :: U64
992+ | oximeter:: FieldType :: Bool => {
993+ intish_fields. push ( field_schema) ;
994+ selects. push ( format ! (
995+ "intish_fields.fields['{}']::{} AS {}" ,
996+ field_schema. name,
997+ ch_type_for_field( field_schema. field_type) ,
998+ field_schema. name
999+ ) )
1000+ }
1001+ oximeter:: FieldType :: String
1002+ | oximeter:: FieldType :: Uuid
1003+ | oximeter:: FieldType :: IpAddr => {
1004+ textish_fields. push ( field_schema) ;
1005+ selects. push ( format ! (
1006+ "textish_fields.fields['{}']::{} AS {}" ,
1007+ field_schema. name,
1008+ ch_type_for_field( field_schema. field_type) ,
1009+ field_schema. name
1010+ ) )
1011+ }
10351012 }
1013+ }
1014+ let intish_tables: Vec < String > = intish_fields
1015+ . iter ( )
1016+ . map ( |field| field_table_name ( field. field_type ) )
1017+ . collect ( ) ;
1018+ let textish_tables: Vec < String > = textish_fields
1019+ . iter ( )
1020+ . map ( |field| field_table_name ( field. field_type ) )
1021+ . collect ( ) ;
1022+ let mut ctes: Vec < String > = Vec :: new ( ) ;
1023+ if !intish_tables. is_empty ( ) {
1024+ ctes. push ( format ! (
1025+ "intish_fields AS (
1026+ SELECT timeseries_name, timeseries_key, mapFromArrays(groupArray(field_name), groupArray(field_value)) AS fields \
1027+ FROM merge('oximeter', '{field_table_regex}') \
1028+ WHERE timeseries_name = '{timeseries_name}' \
1029+ GROUP BY timeseries_name, timeseries_key \
1030+ )",
1031+ field_table_regex = intish_tables. join( "|" ) ,
1032+ timeseries_name = schema. timeseries_name) ) ;
1033+ froms. push ( String :: from ( "intish_fields" ) ) ;
1034+ }
1035+ if !textish_tables. is_empty ( ) {
1036+ ctes. push ( format ! (
1037+ "textish_fields AS (
1038+ SELECT timeseries_name, timeseries_key, mapFromArrays(groupArray(field_name), groupArray(field_value)) AS fields \
1039+ FROM merge('oximeter', '{field_table_regex}') \
1040+ WHERE timeseries_name = '{timeseries_name}' \
1041+ GROUP BY timeseries_name, timeseries_key \
1042+ )",
1043+ field_table_regex = textish_tables. join( "|" ) ,
1044+ timeseries_name = schema. timeseries_name) ) ;
1045+ froms. push ( String :: from ( "textish_fields" ) ) ;
1046+ }
10361047
1037- // Push the "INNER JOIN" expression itself, for all but the
1038- // last subquery.
1039- if i < field_subqueries. len ( ) - 1 {
1040- out. push_str ( " INNER JOIN " ) ;
1041- }
1048+ let mut from = froms[ 0 ] . clone ( ) ;
1049+ for from_item in froms. iter ( ) . skip ( 1 ) {
1050+ from = format ! (
1051+ "{} JOIN {} USING (timeseries_key)" ,
1052+ from, from_item
1053+ ) ;
10421054 }
1043- ( false , out)
1055+ let query = format ! (
1056+ "WITH {ctes} \
1057+ SELECT {select} \
1058+ FROM {from}",
1059+ ctes = ctes. join( ", " ) ,
1060+ select = selects. join( ",\n " ) ,
1061+ from = from,
1062+ ) ;
1063+ ( false , query)
10441064 }
10451065 }
10461066 }
@@ -1179,6 +1199,23 @@ fn update_total_rows_and_check(
11791199 Ok ( ( ) )
11801200}
11811201
1202+ fn ch_type_for_field ( field : oximeter:: FieldType ) -> String {
1203+ match field {
1204+ oximeter:: FieldType :: String => String :: from ( "String" ) ,
1205+ oximeter:: FieldType :: Bool => String :: from ( "Bool" ) ,
1206+ oximeter:: FieldType :: Uuid => String :: from ( "UUID" ) ,
1207+ oximeter:: FieldType :: IpAddr => String :: from ( "IpV6" ) ,
1208+ oximeter:: FieldType :: I8 => String :: from ( "Int8" ) ,
1209+ oximeter:: FieldType :: I16 => String :: from ( "Int16" ) ,
1210+ oximeter:: FieldType :: I32 => String :: from ( "Int32" ) ,
1211+ oximeter:: FieldType :: I64 => String :: from ( "Int64" ) ,
1212+ oximeter:: FieldType :: U8 => String :: from ( "UInt8" ) ,
1213+ oximeter:: FieldType :: U16 => String :: from ( "UInt16" ) ,
1214+ oximeter:: FieldType :: U32 => String :: from ( "UInt32" ) ,
1215+ oximeter:: FieldType :: U64 => String :: from ( "UInt64" ) ,
1216+ }
1217+ }
1218+
11821219#[ cfg( test) ]
11831220mod tests {
11841221 use super :: ConsistentKeyGroup ;
0 commit comments