1
1
import RepositoryInterface , { QueryOptions , Repository } from '../../repository' ;
2
- import ReportQuery , { TimeseriesInitialRequest } from '../../../../domain_objects/data_warehouse/data/report_query' ;
2
+ import ReportQuery , { ReportQueryMetadata , TimeseriesInitialRequest } from '../../../../domain_objects/data_warehouse/data/report_query' ;
3
3
import Result from '../../../../common_classes/result' ;
4
4
import ReportQueryMapper from '../../../mappers/data_warehouse/data/report_query_mapper' ;
5
5
import { PoolClient } from 'pg' ;
@@ -72,17 +72,13 @@ export default class ReportQueryRepository extends Repository implements Reposit
72
72
return Promise . resolve ( Result . Success ( true ) ) ;
73
73
}
74
74
75
+ // perform all necessary checks before kicking off the query including verifying
76
+ // all files are timeseries and checking for previously executed/select * queries
75
77
async initiateQuery ( containerID : string , dataSourceID : string , request : TimeseriesInitialRequest , user : User , describe : boolean ) : Promise < Result < string > > {
76
78
// check that all files exist and are timeseries, return an error if not
77
79
const isTimeseries = await this . #fileRepo. checkTimeseries ( request . file_ids ! ) ;
78
80
if ( isTimeseries . isError ) { return Promise . resolve ( Result . Pass ( isTimeseries ) ) }
79
81
80
- // create a new report object
81
- const report = new Report ( { container_id : containerID } ) ;
82
- const reportSaved = await this . #reportMapper. Create ( user . id ! , report ) ;
83
- if ( reportSaved . isError ) { return Promise . resolve ( Result . Pass ( reportSaved ) ) }
84
- const reportID = reportSaved . value . id !
85
-
86
82
// formulate query if describe, check for presence of table name if regular query
87
83
if ( describe ) {
88
84
const describeQueries : string [ ] = [ ] ;
@@ -98,18 +94,70 @@ export default class ReportQueryRepository extends Repository implements Reposit
98
94
}
99
95
}
100
96
101
- // create a report query based on the timeseries rust module query request
97
+ // create a new report object to return the ID if a SELECT * or repeated query is found
98
+ const reportSaved = await this . #reportMapper. Create ( user . id ! , new Report ( { container_id : containerID } ) ) ;
99
+ if ( reportSaved . isError ) { return Promise . resolve ( Result . Pass ( reportSaved ) ) }
100
+ const reportID = reportSaved . value . id !
101
+
102
+ // check if the query text was already successfully used in a previous query
103
+ // if so return the result file from that original query
104
+ const previousQueryResults = await this . #mapper. CheckQueryExists ( request . query ! ) ;
105
+ // if an error is found, simply log and move on
106
+ if ( previousQueryResults . isError ) {
107
+ Logger . error ( previousQueryResults . error . error ) ;
108
+ }
109
+
110
+ if ( previousQueryResults . value ) {
111
+ // grab and use the previous status message for this report
112
+ void this . #reportRepo. setStatus ( reportID , 'completed' , previousQueryResults . value . status_message ) ;
113
+
114
+ return Promise . resolve ( Result . Success ( reportID ) ) ;
115
+ }
116
+
117
+ // create a query object if a previous query was not found
102
118
const reportQuery = new ReportQuery ( { query : request . query ! , report_id : reportID } ) ;
103
119
const querySaved = await this . #mapper. Create ( user . id ! , reportQuery ) ;
104
120
if ( querySaved . isError ) { return Promise . resolve ( Result . Pass ( querySaved ) ) }
105
121
const queryID = querySaved . value . id !
106
122
107
- // fetch file metadata
108
- const fileInfo = await this . #fileRepo. listPathMetadata ( ...request . file_ids ! ) ;
109
- if ( fileInfo . isError ) { return Promise . resolve ( Result . Failure ( 'unable to find file information' ) ) }
110
- const files = fileInfo . value ;
123
+ // check if the query is a SELECT * query; if so return original file instead of querying
124
+ // verify there's only one file being queried
125
+ if ( request . file_ids ! . length === 1 ) {
126
+ const fileID = request . file_ids ! [ 0 ] ;
127
+ // trim and case densensitize query to eliminate any syntax variance
128
+ const normalizedQuery = request . query ?. trim ( ) . replace ( / \s + / g, ' ' ) . replace ( ';' , '' ) . toLowerCase ( ) ;
129
+ if ( normalizedQuery === `select * from table_${ fileID } ` ) {
130
+ // set the original file as the report file and return report ID
131
+ const resultSet = await this . setResultFile ( reportID , queryID , fileID ) ;
132
+ if ( resultSet . isError ) {
133
+ const errorMessage = `error attaching record to report ${ reportID } : ${ resultSet . error . error } ` ;
134
+ void this . #reportRepo. setStatus ( reportID , 'error' , errorMessage ) ;
135
+ Logger . error ( errorMessage ) ;
136
+ }
137
+
138
+ // if everything was successful, set the report status to completed
139
+ const successMessage = `results now available. Download them at "/containers/${ containerID } /files/${ fileID } /download"` ;
140
+ void this . #reportRepo. setStatus ( reportID , 'completed' , successMessage ) ;
141
+ return Promise . resolve ( Result . Success ( reportID ) ) ;
142
+ }
143
+ }
144
+
145
+ const queryMetadata : ReportQueryMetadata = {
146
+ container_id : containerID ,
147
+ data_source_id : dataSourceID ,
148
+ request : request ,
149
+ user : user ,
150
+ report_id : reportID ,
151
+ query : reportQuery ,
152
+ query_id : queryID
153
+ }
111
154
112
- // create a connection string based on the type of storage being used
155
+ // kickoff the query itself if there are no early return scenarios
156
+ return this . kickoffQuery ( queryMetadata , describe ) ;
157
+ }
158
+
159
+ // create a connection string based on the type of storage being used
160
+ async createConnectionString ( containerID : string , dataSourceID : string ) : Promise < Result < string > > {
113
161
const uploadPath = `containers/${ containerID } /datasources/${ dataSourceID } ` ;
114
162
let storageConnection : string ;
115
163
if ( Config . file_storage_method === 'filesystem' ) {
@@ -141,28 +189,46 @@ export default class ReportQueryRepository extends Repository implements Reposit
141
189
return Promise . resolve ( Result . Failure ( `error: unsupported or unimplemented file storage method being used` ) ) ;
142
190
}
143
191
192
+ return Promise . resolve ( Result . Success ( storageConnection ) ) ;
193
+ }
194
+
195
+ async kickoffQuery ( queryMetadata : ReportQueryMetadata , describe : boolean ) : Promise < Result < string > > {
196
+ // fetch file metadata
197
+ const fileInfo = await this . #fileRepo. listPathMetadata ( ...queryMetadata . request . file_ids ! ) ;
198
+ if ( fileInfo . isError ) { return Promise . resolve ( Result . Pass ( fileInfo ) ) }
199
+ const files = fileInfo . value ;
200
+
201
+ const getConnString = await this . createConnectionString ( queryMetadata . container_id , queryMetadata . data_source_id ) ;
202
+ if ( getConnString . isError ) { return Promise . resolve ( Result . Pass ( getConnString ) ) }
203
+ const storageConnection = getConnString . value ;
204
+
205
+ // set report status to "processing"
206
+ let statusSet = await this . #reportRepo. setStatus (
207
+ queryMetadata . report_id , 'processing' ,
208
+ `executing query ${ queryMetadata . query_id } : "${ queryMetadata . query . query } " as part of report ${ queryMetadata . report_id } `
209
+ ) ;
210
+ if ( statusSet . isError ) { return Promise . resolve ( Result . Failure ( `unable to set report status` ) ) }
211
+
212
+ // kick off the describe or query process
144
213
if ( describe ) {
145
- this . processDescribe ( reportID , request . query ! , storageConnection , files as FileMetadata [ ] ) ;
214
+ this . processDescribe (
215
+ queryMetadata . report_id ,
216
+ queryMetadata . request . query ! ,
217
+ storageConnection ,
218
+ files as FileMetadata [ ] ) ;
146
219
} else {
147
220
this . processQuery (
148
- reportID ,
149
- request . query ! ,
221
+ queryMetadata . report_id ,
222
+ queryMetadata . request . query ! ,
150
223
storageConnection ,
151
224
files as FileMetadata [ ] ,
152
- queryID ,
153
- user
225
+ queryMetadata . query_id ,
226
+ queryMetadata . user
154
227
) ;
155
228
}
156
229
157
- // set report status to "processing"
158
- let statusSet = await this . #reportRepo. setStatus (
159
- reportID , 'processing' ,
160
- `executing query ${ queryID } : "${ reportQuery . query } " as part of report ${ reportID } `
161
- ) ;
162
- if ( statusSet . isError ) { return Promise . resolve ( Result . Failure ( `unable to set report status` ) ) }
163
-
164
230
// return report ID to the user so they can poll for results
165
- return Promise . resolve ( Result . Success ( reportID ) ) ;
231
+ return Promise . resolve ( Result . Success ( queryMetadata . report_id ) ) ;
166
232
}
167
233
168
234
async processQuery (
0 commit comments