File tree Expand file tree Collapse file tree 1 file changed +15
-5
lines changed Expand file tree Collapse file tree 1 file changed +15
-5
lines changed Original file line number Diff line number Diff line change @@ -439,12 +439,22 @@ pub async fn create_streams_for_querier() -> Result<(), QueryError> {
439
439
let querier_streams = PARSEABLE . streams . list ( ) ;
440
440
441
441
let querier_streams_set: HashSet < _ > = querier_streams. into_iter ( ) . collect ( ) ;
442
-
443
- let storage_streams = store. list_streams ( ) . await ?;
444
-
445
- let missing_streams: Vec < _ > = storage_streams
442
+ // fetch querier streams which have field list blank
443
+ // now missing streams should be list of streams which are in storage but not in querier
444
+ // and also have no fields in the schema
445
+ // this is to ensure that we do not create streams for querier which already exist in querier
446
+
447
+ let missing_streams: Vec < _ > = store
448
+ . list_streams ( )
449
+ . await ?
446
450
. into_iter ( )
447
- . filter ( |stream_name| !querier_streams_set. contains ( stream_name) )
451
+ . filter ( |stream_name| {
452
+ !querier_streams_set. contains ( stream_name)
453
+ && PARSEABLE
454
+ . get_stream ( stream_name)
455
+ . map ( |s| s. get_schema ( ) . fields ( ) . is_empty ( ) )
456
+ . unwrap_or ( true )
457
+ } )
448
458
. collect ( ) ;
449
459
450
460
if missing_streams. is_empty ( ) {
You can’t perform that action at this time.
0 commit comments