11import { cloneDeep , first as _first , map as _map , groupBy } from 'lodash' ;
22import { Observable , lastValueFrom , from , isObservable , of } from 'rxjs' ;
3- import { catchError , mergeMap , map } from 'rxjs/operators' ;
3+ import { catchError , mergeMap , map , shareReplay } from 'rxjs/operators' ;
44
55import {
66 AbstractQuery ,
@@ -27,6 +27,7 @@ import {
2727 QueryFixAction ,
2828 ScopedVars ,
2929 SupplementaryQueryType ,
30+ TestDataSourceResponse ,
3031 TimeRange ,
3132} from '@grafana/data' ;
3233import { BucketAggregation , DataLinkConfig , ElasticsearchQuery , Field as QuickwitField , FieldMapping , IndexMetadata , TermsQuery , FieldCapabilitiesResponse } from './types' ;
@@ -61,6 +62,15 @@ type FieldCapsSpec = {
6162 _range ?: TimeRange
6263}
6364
65+ function getTimeFieldInfoFromIndexMetadata ( indexMetadata : any ) {
66+ let fields = getAllFields ( indexMetadata . index_config . doc_mapping . field_mappings ) ;
67+ let timestampFieldName = indexMetadata . index_config . doc_mapping . timestamp_field
68+ let timestampField = fields . find ( ( field ) => field . json_path === timestampFieldName ) ;
69+ let timestampFormat = timestampField ? timestampField . field_mapping . output_format || '' : ''
70+ let timestampFieldInfos = { 'field' : timestampFieldName , 'format' : timestampFormat }
71+ return timestampFieldInfos
72+ }
73+
6474export class QuickwitDataSource
6575 extends DataSourceWithBackend < ElasticsearchQuery , QuickwitOptions >
6676 implements
@@ -73,12 +83,17 @@ export class QuickwitDataSource
7383 timeOutputFormat : string ;
7484 logMessageField ?: string ;
7585 logLevelField ?: string ;
76- queryBuilder : ElasticQueryBuilder ;
7786 dataLinks : DataLinkConfig [ ] ;
7887 languageProvider : ElasticsearchLanguageProvider ;
7988
8089 private logContextProvider : LogContextProvider ;
8190
91+
92+ // Observables from index metadata
93+ indexMetadata$ : any ;
94+ timeFieldInfo$ : Observable < { field : string , format : string } >
95+ queryBuilder$ : Observable < ElasticQueryBuilder > ;
96+
8297 constructor (
8398 instanceSettings : DataSourceInstanceSettings < QuickwitOptions > ,
8499 private readonly templateSrv : TemplateSrv = getTemplateSrv ( )
@@ -88,37 +103,32 @@ export class QuickwitDataSource
88103 this . index = settingsData . index || '' ;
89104 this . timeField = ''
90105 this . timeOutputFormat = ''
91- this . queryBuilder = new ElasticQueryBuilder ( {
92- timeField : this . timeField ,
93- } ) ;
94- from ( this . getResource ( 'indexes/' + this . index ) ) . pipe (
95- map ( ( indexMetadata ) => {
96- let fields = getAllFields ( indexMetadata . index_config . doc_mapping . field_mappings ) ;
97- let timestampFieldName = indexMetadata . index_config . doc_mapping . timestamp_field
98- let timestampField = fields . find ( ( field ) => field . json_path === timestampFieldName ) ;
99- let timestampFormat = timestampField ? timestampField . field_mapping . output_format || '' : ''
100- let timestampFieldInfos = { 'field' : timestampFieldName , 'format' : timestampFormat }
101- return timestampFieldInfos
102- } ) ,
106+
107+ this . indexMetadata$ = from ( this . getResource ( 'indexes/' + this . index ) )
108+ this . timeFieldInfo$ = this . indexMetadata$ . pipe (
109+ map ( getTimeFieldInfoFromIndexMetadata ) ,
103110 catchError ( ( err ) => {
104111 if ( ! err . data || ! err . data . error ) {
105112 let err_source = extractJsonPayload ( err . data . error )
106113 if ( ! err_source ) {
107114 throw err
108115 }
109116 }
110-
111117 // the error will be handle in the testDatasource function
112118 return of ( { 'field' : '' , 'format' : '' } )
113- } )
114- ) . subscribe ( result => {
115- this . timeField = result . field ;
116- this . timeOutputFormat = result . format ;
117- this . queryBuilder = new ElasticQueryBuilder ( {
118- timeField : this . timeField ,
119- } ) ;
120- } ) ;
121-
119+ } ) ,
120+ shareReplay ( 1 ) ,
121+ )
122+ this . timeFieldInfo$ . subscribe ( ( timeFieldInfo ) => {
123+ this . timeField = timeFieldInfo . field ;
124+ this . timeOutputFormat = timeFieldInfo . format
125+ } )
126+ this . queryBuilder$ = this . timeFieldInfo$ . pipe (
127+ map ( ( timeFieldInfo ) => {
128+ return new ElasticQueryBuilder ( { timeField :timeFieldInfo . field } )
129+ } ) ,
130+ shareReplay ( 1 ) ,
131+ )
122132 this . logMessageField = settingsData . logMessageField || '' ;
123133 this . logLevelField = settingsData . logLevelField || '' ;
124134 this . dataLinks = settingsData . dataLinks || [ ] ;
@@ -147,10 +157,8 @@ export class QuickwitDataSource
147157 message : 'Cannot save datasource, `index` is required' ,
148158 } ;
149159 }
150-
151- return lastValueFrom (
152- from ( this . getResource ( 'indexes/' + this . index ) ) . pipe (
153- mergeMap ( ( indexMetadata ) => {
160+ const validation$ : Observable < TestDataSourceResponse > = this . indexMetadata$ . pipe (
161+ mergeMap ( ( indexMetadata : IndexMetadata ) : Observable < TestDataSourceResponse > => {
154162 let error = this . validateIndexConfig ( indexMetadata ) ;
155163 if ( error ) {
156164 return of ( {
@@ -160,7 +168,7 @@ export class QuickwitDataSource
160168 }
161169 return of ( { status : 'success' , message : `Index OK. Time field name OK` } ) ;
162170 } ) ,
163- catchError ( ( err ) => {
171+ catchError ( ( err ) : Observable < TestDataSourceResponse > => {
164172 if ( err . data && err . data . error ) {
165173 let err_source = extractJsonPayload ( err . data . error )
166174 if ( err_source ) {
@@ -177,7 +185,8 @@ export class QuickwitDataSource
177185 }
178186 } )
179187 )
180- ) ;
188+
189+ return lastValueFrom ( validation$ ) ;
181190 }
182191
183192 validateIndexConfig ( indexMetadata : IndexMetadata ) : string | undefined {
@@ -347,18 +356,18 @@ export class QuickwitDataSource
347356 index : this . index ,
348357 } ) ;
349358
350- let esQuery = JSON . stringify ( this . queryBuilder . getTermsQuery ( queryDef ) ) ;
351- esQuery = esQuery . replace ( / \$ t i m e F r o m / g, range . from . valueOf ( ) . toString ( ) ) ;
352- esQuery = esQuery . replace ( / \$ t i m e T o / g, range . to . valueOf ( ) . toString ( ) ) ;
353- esQuery = header + '\n' + esQuery + '\n' ;
354359 const resourceOptions = {
355- headers : {
356- 'content-type' : 'application/x-ndjson'
357- }
360+ headers : { 'content-type' : 'application/x-ndjson' }
358361 } ;
359- const termsObservable = from ( this . postResource ( "_elastic/_msearch" , esQuery , resourceOptions ) ) ;
360362
361- return termsObservable . pipe (
363+ return this . queryBuilder$ . pipe (
364+ mergeMap ( ( queryBuilder ) => {
365+ let esQuery = JSON . stringify ( queryBuilder . getTermsQuery ( queryDef ) ) ;
366+ esQuery = esQuery . replace ( / \$ t i m e F r o m / g, range . from . valueOf ( ) . toString ( ) ) ;
367+ esQuery = esQuery . replace ( / \$ t i m e T o / g, range . to . valueOf ( ) . toString ( ) ) ;
368+ esQuery = header + '\n' + esQuery + '\n' ;
369+ return from ( this . postResource ( "_elastic/_msearch" , esQuery , resourceOptions ) )
370+ } ) ,
362371 map ( ( res ) => {
363372 if ( ! res . responses [ 0 ] . aggregations ) {
364373 return [ ] ;
0 commit comments