1
- package javaxt .express .services ;
2
- import javaxt .express .utils .*;
3
- import javaxt .express .*;
4
-
5
- import java .util .*;
6
- import java .util .concurrent .ConcurrentHashMap ;
7
- import java .io .StringReader ;
8
- import java .math .BigDecimal ;
9
- import java .sql .SQLException ;
10
-
11
- import javaxt .sql .*;
12
- import javaxt .json .*;
13
- import static javaxt .utils .Console .console ;
14
-
15
- import net .sf .jsqlparser .parser .*;
16
- import net .sf .jsqlparser .statement .select .*;
17
- import net .sf .jsqlparser .statement .Statement ;
18
- import net .sf .jsqlparser .statement .Statements ;
19
- import net .sf .jsqlparser .statement .create .table .CreateTable ;
20
- import net .sf .jsqlparser .expression .LongValue ;
21
-
22
- //******************************************************************************
23
- //** QueryService
24
- //******************************************************************************
25
- /**
26
- * Provides a set of web methods used to query the database. Loosely based
27
- * on the CartoDB SQL API: https://carto.com/developers/sql-api/reference/
28
- *
29
- ******************************************************************************/
30
-
31
- public class QueryService extends WebService {
32
-
33
- private Database database ;
34
- private javaxt .io .Directory jobDir ;
35
- private javaxt .io .Directory logDir ;
36
- private Map <String , QueryJob > jobs = new ConcurrentHashMap <>();
37
- private List <String > pendingJobs = new LinkedList <>();
38
- private List <String > completedJobs = new LinkedList <>();
39
- private java .util .List <SelectItem > selectCount ;
40
-
41
-
42
- //**************************************************************************
43
- //** Constructor
44
- //**************************************************************************
45
- public QueryService (Database database , javaxt .io .Directory jobDir , javaxt .io .Directory logDir ){
46
- this .database = database ;
47
-
48
- //Set path to the jobs directory
49
- if (jobDir !=null ) if (!jobDir .exists ()) jobDir .create ();
50
- if (jobDir ==null || !jobDir .exists ()){
51
- throw new IllegalArgumentException ("Invalid \" jobDir\" " );
52
- }
53
- this .jobDir = jobDir ;
54
-
55
-
56
-
57
- //Set path to the log directory
58
- if (logDir !=null ) if (!logDir .exists ()) logDir .create ();
59
- if (logDir !=null && logDir .exists ()){
60
- this .logDir = logDir ;
61
- }
62
-
63
-
64
-
65
- //Delete any orphan sql jobs
66
- for (javaxt .io .Directory dir : jobDir .getSubDirectories ()){
67
- dir .delete ();
68
- }
69
-
70
-
71
-
72
- //Test whether JSqlParser is in the classpath and parse a default "select count(*)" statement
73
- try {
74
- CCJSqlParserManager parserManager = new CCJSqlParserManager ();
75
- Select select = (Select ) parserManager .parse (new StringReader ("SELECT count(*) FROM T" ));
76
- PlainSelect plainSelect = (PlainSelect ) select .getSelectBody ();
77
- selectCount = plainSelect .getSelectItems ();
78
- }
79
- catch (Throwable t ){
80
- throw new IllegalArgumentException ("Failed to instantiate JSqlParser" );
81
- }
82
-
83
-
84
-
85
- //Spawn threads used to execute queries
86
- int numThreads = 1 ; //TODO: Make configurable...
87
- for (int i =0 ; i <numThreads ; i ++){
88
- new Thread (new QueryProcessor (database , this )).start ();
89
- }
90
- }
91
-
92
-
93
- //**************************************************************************
94
- //** getServiceResponse
95
- //**************************************************************************
96
- /** Used to generate a response to an HTTP request. The default routes are
97
- * as follows:
98
- * <ul>
99
- * <li>POST /job - Used to create a new query job and return a jobID</li>
100
- * <li>GET /job/{jobID} - Returns query results or job status for a given jobID</li>
101
- * <li>DELETE /job/{jobID} - Used to cancel query for a given jobID </li>
102
- * <li>GET /jobs - Returns a list of all query jobs associated with the user</li>
103
- * <li>GET /tables - Returns a list of all the tables in the database</li>
104
- * </ul>
105
- */
106
- public ServiceResponse getServiceResponse (ServiceRequest request , Database database ) {
107
- if (database ==null ) database = this .database ;
108
-
109
- String path = request .getPath (0 ).toString ();
110
- if (path !=null ){
111
- if (path .equals ("jobs" )){
112
- return list (request );
113
- }
114
- else if (path .equals ("job" )){
115
- String method = request .getRequest ().getMethod ();
116
- if (method .equals ("GET" )){
117
- return getJob (request );
118
- }
119
- else if (method .equals ("POST" )){
120
- return query (request , true );
121
- }
122
- else if (method .equals ("DELETE" )){
123
- return cancel (request , database );
124
- }
125
- else {
126
- return new ServiceResponse (501 , "Not implemented" );
127
- }
128
- }
129
- else if (path .equals ("tables" )){
130
- return getTables (request , database );
131
- }
132
- else {
133
- return new ServiceResponse (501 , "Not implemented" );
134
- }
135
- }
136
- else {
137
- return query (request , false );
138
- }
139
- }
140
-
141
-
142
- //**************************************************************************
143
- //** notify
144
- //**************************************************************************
145
- public void notify (QueryJob job ){}
146
-
147
-
148
- //**************************************************************************
149
- //** query
150
- //**************************************************************************
151
- private ServiceResponse query (ServiceRequest request , boolean async ) {
152
- try {
153
-
154
- //Get query
155
- String query = getParameter ("q" , request ).toString ();
156
- if (query ==null ) query = getParameter ("query" , request ).toString ();
157
- if (query ==null ) throw new IllegalArgumentException ("Query is required" );
158
-
159
-
160
- //Get Offset and Limit
161
- Long offset = getParameter ("offset" , request ).toLong ();
162
- Long limit = getParameter ("limit" , request ).toLong ();
163
- if (limit ==null ) limit = 25L ;
164
- if (offset ==null ){
165
- Long page = getParameter ("page" , request ).toLong ();
166
- if (page !=null ) offset = (page *limit )-limit ;
167
- }
168
-
169
-
170
-
171
- //Parse sql statement using JSQLParser
172
- Select select = null ;
173
- CreateTable createTempTable = null ;
174
- Statements statements = CCJSqlParserUtil .parseStatements (query );
175
- if (statements !=null ){
176
-
177
- Iterator <Statement > it = statements .getStatements ().iterator ();
178
- while (it .hasNext ()){
179
- Statement statement = it .next ();
180
- if (statement instanceof CreateTable ){
181
- CreateTable createTable = (CreateTable ) statement ;
182
- boolean isTemporaryTable = false ;
183
- Iterator <String > i2 = createTable .getCreateOptionsStrings ().iterator ();
184
- while (i2 .hasNext ()){
185
- String option = i2 .next ();
186
- if (option .equalsIgnoreCase ("TEMPORARY" )) isTemporaryTable = true ;
187
- break ;
188
- }
189
-
190
- if (isTemporaryTable ){
191
- if (select !=null ) throw new IllegalArgumentException ("Temporary table must be created before the SELECT statement" );
192
- if (createTempTable !=null ) throw new IllegalArgumentException ("Only 1 temp table allowed" );
193
- createTempTable = createTable ;
194
- }
195
- else {
196
- throw new IllegalArgumentException ("CREATE TABLE statements not allowed" );
197
- }
198
- }
199
- else if (statement instanceof Select ){
200
- if (select !=null ) throw new IllegalArgumentException ("Only 1 SELECT statement allowed" );
201
- select = (Select ) statement ;
202
- }
203
- else {
204
- throw new IllegalArgumentException (statement .getClass ().getSimpleName () + " statements not allowed" );
205
- }
206
- }
207
- }
208
-
209
-
210
-
211
-
212
- //Check whether the select statement has illegal or unsupported functions
213
- checkSelect ((PlainSelect ) select .getSelectBody ());
214
-
215
-
216
-
217
- //Collect misc params
218
- JSONObject params = new JSONObject ();
219
- params .set ("format" , getParameter ("format" , request ).toString ());
220
- Boolean addMetadata = getParameter ("metadata" , request ).toBoolean ();
221
- if (addMetadata !=null && addMetadata ==true ){
222
- params .set ("metadata" , true );
223
- }
224
- Boolean count = getParameter ("count" , request ).toBoolean ();
225
- if (count !=null && count ==true ){
226
- params .set ("count" , true );
227
- }
228
-
229
-
230
-
231
- //Create job
232
- User user = (User ) request .getUser ();
233
- QueryJob job = new QueryJob (user .getID (), select , offset , limit , params );
234
- if (createTempTable !=null ) job .addTempTable (createTempTable );
235
- String key = job .getKey ();
236
- job .log ();
237
- notify (job );
238
-
239
-
240
- //Update list of jobs
241
- synchronized (jobs ) {
242
- jobs .put (key , job );
243
- jobs .notify ();
244
- }
245
-
246
-
247
- //Update pendingJobs
248
- synchronized (pendingJobs ) {
249
- pendingJobs .add (key );
250
- pendingJobs .notify ();
251
- }
252
-
253
-
254
- //Generate response
255
- if (async ){
256
- return new ServiceResponse (job .toJson ());
257
- }
258
- else {
259
- synchronized (completedJobs ) {
260
- while (!completedJobs .contains (key )) {
261
- try {
262
- completedJobs .wait ();
263
- }
264
- catch (InterruptedException e ) {
265
- break ;
266
- }
267
- }
268
- }
269
- return getJobResponse (job );
270
- }
271
- }
272
- catch (Exception e ){
273
- if (e instanceof net .sf .jsqlparser .JSQLParserException ){
274
- e = new Exception ("Unsupported or Invalid SQL Statement" );
275
- }
276
- return new ServiceResponse (e );
277
- }
278
- }
279
-
280
-
281
- //**************************************************************************
282
- //** checkSelect
283
- //**************************************************************************
284
- /** Used to check whether the select statement has illegal or unsupported
285
- * functions
286
- */
287
- protected void checkSelect (PlainSelect plainSelect ){}
288
-
289
-
290
- //**************************************************************************
291
- //** Writer
292
- //**************************************************************************
293
- /** Used to generate json, csv, tsv, etc using records from the database
294
- */
295
- private class Writer {
296
-
297
- private String format ;
298
- private StringBuilder str ;
299
- private long x = 0 ;
300
- private Long elapsedTime ;
301
- private Long count ;
302
- private JSONArray metadata ;
303
- private boolean addMetadata = false ;
304
- private boolean isClosed = false ;
305
-
306
- public Writer (String format , boolean addMetadata ){
307
- str = new StringBuilder ();
308
- this .format = format ;
309
- this .addMetadata = addMetadata ;
310
- }
311
-
312
- public void write (Recordset rs ){
313
- if (isClosed ) return ; //throw exception?
314
-
315
- Field [] fields = rs .getFields ();
316
- if (x ==0 ){
317
-
318
- metadata = new JSONArray ();
319
- int count = 1 ;
320
- for (Field field : fields ) {
321
- JSONObject json = new JSONObject ();
322
- json .set ("id" , count );
323
- json .set ("name" , field .getName ());
324
- json .set ("type" , field .getType ());
325
- json .set ("class" , field .getClassName ());
326
- json .set ("table" , field .getTableName ());
327
- metadata .add (json );
328
- count ++;
329
- }
330
-
331
- if (format .equals ("tsv" ) || format .equals ("csv" )){
332
- String s = format .equals ("tsv" ) ? "\t " : "," ;
333
- for (int i =0 ; i <fields .length ; i ++){
334
- if (i >0 ) str .append (s );
335
- str .append (fields [i ].getName ());
336
- }
337
- str .append ("\r \n " );
338
- }
339
- else if (format .equals ("jsv" )){
340
- str .append ("{\" cols\" :[" );
341
- for (int i =0 ; i <fields .length ; i ++){
342
- if (i >0 ) str .append ("," );
343
- str .append ("\" " );
344
- str .append (fields [i ].getName ());
345
- str .append ("\" " );
346
- }
347
- str .append ("]," );
348
- str .append ("\" rows\" :[" );
349
- }
350
- else if (format .equals ("json" )){
351
- str .append ("{\" rows\" :[" );
352
- }
353
- }
354
-
355
-
356
- if (format .equals ("json" )){
357
- JSONObject json = new JSONObject ();
358
- JSONObject values = DbUtils .getJson (fields );
359
- for (Field field : fields ){
360
- String fieldName = field .getName ();
361
- String key = StringUtils .underscoreToCamelCase (fieldName .toLowerCase ());
362
- Object val = field .getValue ().toObject ();
363
- if (val ==null ){
364
- json .set (fieldName , "null" );
365
- }
366
- else {
367
- json .set (fieldName , values .get (key ));
368
- }
369
- }
370
-
371
- if (x >0 ) str .append ("," );
372
- str .append (json .toString ().replace ("\" null\" " , "null" )); //<-- this is a bit of a hack...
373
-
374
- }
375
- else if (format .equals ("jsv" )){
376
- JSONArray arr = new JSONArray ();
377
- JSONObject values = DbUtils .getJson (fields );
378
- for (Field field : fields ){
379
- String fieldName = field .getName ();
380
- String key = StringUtils .underscoreToCamelCase (fieldName .toLowerCase ());
381
- arr .add (values .get (key ));
382
- }
383
-
384
- if (x >0 ) str .append ("," );
385
- str .append (arr .toString ().replace ("\" null\" " , "null" )); //<-- same logic as json...
386
-
387
- }
388
- else if (format .equals ("tsv" ) || format .equals ("csv" )){
389
- String s = format .equals ("tsv" ) ? "\t " : "," ;
390
- for (int i =0 ; i <fields .length ; i ++){
391
- if (i >0 ) str .append (s );
392
- Object value = fields [i ].getValue ().toObject ();
393
- if (value ==null ){
394
- value = "" ;
395
- }
396
- else {
397
- if (value instanceof String ){
398
- String v = (String ) value ;
399
- if (v .contains (s )){
400
- value = "\" " + v + "\" " ;
401
- }
402
- }
403
- else if (value instanceof javaxt .utils .Date ) {
404
- value = ((javaxt .utils .Date ) value ).toISOString ();
405
- }
406
- else if (value instanceof java .util .Date ) {
407
- value = new javaxt .utils .Date (((java .util .Date ) value )).toISOString ();
408
- }
409
- else if (value instanceof java .util .Calendar ) {
410
- value = new javaxt .utils .Date (((java .util .Calendar ) value )).toISOString ();
411
- }
412
-
413
- }
414
- str .append (value );
415
- }
416
- str .append ("\r \n " );
417
- }
418
-
419
- x ++;
420
- }
421
-
422
-
423
- public void includeMetadata (boolean b ){
424
- addMetadata = b ;
425
- }
426
-
427
-
428
- public void setElapsedTime (long elapsedTime ){
429
- this .elapsedTime = elapsedTime ;
430
- }
431
-
432
-
433
- public void setCount (long count ){
434
- this .count = count ;
435
- }
436
-
437
-
438
- public void close (){
439
- isClosed = true ;
440
-
441
- if (format .equals ("json" )){
442
- if (x >0 ){
443
- str .append ("]" );
444
-
445
-
446
- if (addMetadata ){
447
- if (metadata !=null ){
448
- str .append (",\" metadata\" :" );
449
- str .append (metadata );
450
- }
451
- }
452
-
453
-
454
- if (count !=null ){
455
- str .append (",\" total_rows\" :" );
456
- str .append (count );
457
- }
458
-
459
- if (this .elapsedTime !=null ){
460
- double elapsedTime = (double )(this .elapsedTime )/1000d ;
461
- BigDecimal time = new BigDecimal (elapsedTime ).setScale (3 , BigDecimal .ROUND_HALF_UP );
462
- str .append (",\" time\" :" );
463
- str .append (time );
464
- }
465
-
466
- str .append ("}" );
467
-
468
- }
469
- else {
470
- str .append ("{}" );
471
- }
472
- }
473
- else if (format .equals ("jsv" )){
474
- if (x >0 ){
475
- str .append ("]}" );
476
- }
477
- else {
478
- str .append ("{}" );
479
- }
480
- }
481
-
482
- }
483
-
484
-
485
- public String toString (){
486
- if (!isClosed ) close ();
487
- return str .toString ();
488
- }
489
- }
490
-
491
-
492
- //**************************************************************************
493
- //** list
494
- //**************************************************************************
495
- /** Returns an unordered list of jobs
496
- */
497
- private ServiceResponse list (ServiceRequest request ) {
498
-
499
- User user = (User ) request .getUser ();
500
- JSONArray arr = new JSONArray ();
501
- synchronized (jobs ) {
502
- Iterator <String > it = jobs .keySet ().iterator ();
503
- while (it .hasNext ()){
504
- String key = it .next ();
505
- QueryJob job = jobs .get (key );
506
- if (job .userID ==user .getID ()){
507
- arr .add (job .toJson ());
508
- }
509
- }
510
- }
511
-
512
- return new ServiceResponse (arr );
513
- }
514
-
515
-
516
- //**************************************************************************
517
- //** getJob
518
- //**************************************************************************
519
- /** Used to return the status or results for a given jobID. Example:
520
- * [GET] sql/job/{jobID}
521
- */
522
- private ServiceResponse getJob (ServiceRequest request ) {
523
- String id = request .getPath (1 ).toString ();
524
- User user = (User ) request .getUser ();
525
- QueryJob job = getJob (id , user );
526
- if (job ==null ) return new ServiceResponse (404 );
527
- return getJobResponse (job );
528
- }
529
-
530
-
531
- //**************************************************************************
532
- //** getJob
533
- //**************************************************************************
534
- /** Returns a job for a given jobID and user. Checks both the pending and
535
- * completed job queues.
536
- */
537
- private QueryJob getJob (String jobID , User user ){
538
- synchronized (jobs ) {
539
- return jobs .get (user .getID () + ":" + jobID );
540
- }
541
- }
542
-
543
-
544
- //**************************************************************************
545
- //** getJobResponse
546
- //**************************************************************************
547
- /** Used to generate a ServiceResponse for a given job. If a job has failed
548
- * or is complete, returns the output of the job. If the job is pending or
549
- * running, simply returns the job status.
550
- */
551
- private ServiceResponse getJobResponse (QueryJob job ){
552
- ServiceResponse response ;
553
- if (job .status .equals ("failed" )){
554
- javaxt .io .File file = job .getOutput ();
555
- String str = file .getText ();
556
- response = new ServiceResponse (500 , str );
557
- deleteJob (job );
558
- }
559
- else if (job .status .equals ("complete" )){
560
- javaxt .io .File file = job .getOutput ();
561
- String str = file .getText ();
562
- response = new ServiceResponse (str );
563
- response .setContentType (file .getContentType ());
564
- deleteJob (job );
565
- }
566
- else {
567
- response = new ServiceResponse (job .status );
568
- }
569
- return response ;
570
- }
571
-
572
-
573
- //**************************************************************************
574
- //** deleteJob
575
- //**************************************************************************
576
- /** Removes a job from the queue and deletes any output files that might
577
- * have been created with the job.
578
- */
579
- private void deleteJob (QueryJob job ){
580
-
581
- String key = job .getKey ();
582
- synchronized (pendingJobs ){
583
- pendingJobs .remove (key );
584
- pendingJobs .notify ();
585
- }
586
-
587
- synchronized (completedJobs ) {
588
- completedJobs .remove (key );
589
- completedJobs .notify ();
590
- }
591
-
592
- synchronized (jobs ) {
593
- jobs .remove (key );
594
- jobs .notify ();
595
- }
596
-
597
- javaxt .io .File file = job .getOutput ();
598
- file .delete ();
599
- }
600
-
601
-
602
- //**************************************************************************
603
- //** cancel
604
- //**************************************************************************
605
- /** Used to cancel a pending or running job.
606
- */
607
- private ServiceResponse cancel (ServiceRequest request , Database database ) {
608
- String id = request .getPath (1 ).toString ();
609
- User user = (User ) request .getUser ();
610
- QueryJob job = getJob (id , user );
611
- if (job ==null ) return new ServiceResponse (404 );
612
-
613
-
614
- String key = job .getKey ();
615
- synchronized (pendingJobs ){
616
- pendingJobs .remove (key );
617
- pendingJobs .notify ();
618
- }
619
-
620
-
621
-
622
- try (Connection conn = database .getConnection ()) {
623
-
624
- //Update job status
625
- job .status = "canceled" ;
626
- job .updated = new javaxt .utils .Date ();
627
- notify (job );
628
-
629
-
630
- //Cancel the query in the database
631
- if (database .getDriver ().equals ("PostgreSQL" )){
632
- Integer pid = getPid (job .getKey (), conn );
633
- if (pid !=null ){
634
- boolean jobCanceled = false ;
635
-
636
- javaxt .sql .Record record = conn .getRecord ("SELECT pg_cancel_backend(" + pid + ")" );
637
- if (record !=null ) jobCanceled = record .get (0 ).toBoolean ();
638
-
639
- if (!jobCanceled ){
640
- record = conn .getRecord ("SELECT pg_terminate_backend(" + pid + ")" );
641
- if (record !=null ) jobCanceled = record .get (0 ).toBoolean ();
642
- }
643
-
644
-
645
- if (!jobCanceled ){
646
- throw new Exception ();
647
- }
648
- }
649
- }
650
-
651
- //Update queue
652
- deleteJob (job );
653
-
654
-
655
- //return response
656
- return new ServiceResponse (job .toJson ());
657
- }
658
- catch (Exception e ){
659
- return new ServiceResponse (500 , "failed to cancel query" );
660
- }
661
- }
662
-
663
-
664
- //**************************************************************************
665
- //** getPid
666
- //**************************************************************************
667
- /** Returns process id for a given jobId
668
- */
669
- private Integer getPid (String key , Connection conn ) throws SQLException {
670
- javaxt .sql .Record record = conn .getRecord (
671
- "SELECT pid from pg_stat_activity where query like '--" + key + "%'" );
672
- return record ==null ? null : record .get (0 ).toInteger ();
673
- }
674
-
675
-
676
- //**************************************************************************
677
- //** getTables
678
- //**************************************************************************
679
- /** Returns a list of tables and columns
680
- */
681
- public ServiceResponse getTables (ServiceRequest request , Database database ) {
682
- try {
683
- JSONArray arr = new JSONArray ();
684
- for (Table table : database .getTables ()){
685
-
686
-
687
- //Get schema
688
- String schema = table .getSchema ();
689
- if (schema !=null ){
690
-
691
- //Skip PostgreSQL metadata tables
692
- if (schema .equalsIgnoreCase ("information_schema" )) continue ;
693
- if (schema .toLowerCase ().startsWith ("pg_" )) continue ;
694
- }
695
-
696
-
697
- //Get columns
698
- JSONArray columns = new JSONArray ();
699
- for (Column column : table .getColumns ()){
700
-
701
- JSONObject col = new JSONObject ();
702
- col .set ("name" , column .getName ());
703
- col .set ("type" , column .getType ());
704
- if (column .isPrimaryKey ()){
705
- col .set ("primaryKey" , true );
706
- }
707
- columns .add (col );
708
- }
709
-
710
-
711
- //Update array
712
- JSONObject json = new JSONObject ();
713
- json .set ("name" , table .getName ());
714
- json .set ("schema" , schema );
715
- json .set ("columns" , columns );
716
- arr .add (json );
717
- }
718
-
719
-
720
- JSONObject json = new JSONObject ();
721
- json .set ("tables" , arr );
722
- return new ServiceResponse (json );
723
- }
724
- catch (Exception e ){
725
- return new ServiceResponse (e );
726
- }
727
- }
728
-
729
-
730
- //**************************************************************************
731
- //** getParameter
732
- //**************************************************************************
733
- /** Used to extract a parameter either from the URL query string or the json
734
- * in the request payload.
735
- */
736
- private javaxt .utils .Value getParameter (String name , ServiceRequest request ){
737
- if (request .getRequest ().getMethod ().equals ("GET" )){
738
- return request .getParameter (name );
739
- }
740
- else {
741
- JSONObject json = request .getJson ();
742
- if (json .has (name )){
743
- return new javaxt .utils .Value (json .get (name ).toObject ());
744
- }
745
- else {
746
- return request .getParameter (name );
747
- }
748
- }
749
- }
750
-
751
-
752
- //**************************************************************************
753
- //** QueryJob
754
- //**************************************************************************
755
- public class QueryJob {
756
-
757
- private String id ;
758
- private long userID ;
759
- private Select select ;
760
- private Long offset ;
761
- private LongValue limit ;
762
- private javaxt .utils .Date created ;
763
- private javaxt .utils .Date updated ;
764
- private String status ;
765
- private String format ;
766
- private boolean countTotal = false ;
767
- private boolean addMetadata = false ;
768
- private CreateTable tempTable ;
769
-
770
-
771
- public QueryJob (long userID , Select select , Long offset , Long limit , JSONObject params ) {
772
- this .id = UUID .randomUUID ().toString ();
773
- this .userID = userID ;
774
- this .select = select ;
775
- this .offset = offset ;
776
- this .limit = limit ==null ? null : new LongValue (limit );
777
- this .created = new javaxt .utils .Date ();
778
- this .updated = this .created .clone ();
779
- this .status = "pending" ;
780
-
781
- String format = params .get ("format" ).toString ();
782
- if (format ==null ) format ="" ;
783
- format = format .trim ().toLowerCase ();
784
- if (format .equals ("csv" ) || format .equals ("tsv" ) || format .equals ("jsv" )){
785
- this .format = format ;
786
- }
787
- else this .format = "json" ;
788
-
789
-
790
- if (params .has ("count" )){
791
- countTotal = params .get ("count" ).toBoolean ();
792
- }
793
-
794
- if (params .has ("metadata" )){
795
- addMetadata = params .get ("metadata" ).toBoolean ();
796
- }
797
- }
798
-
799
- public String getID (){
800
- return id ;
801
- }
802
-
803
- public long getUserID (){
804
- return userID ;
805
- }
806
-
807
- public String getStatus (){
808
- return status ;
809
- }
810
-
811
- public void addTempTable (CreateTable stmt ){
812
- tempTable = stmt ;
813
- }
814
-
815
- public CreateTable getTempTable (){
816
- return tempTable ;
817
- }
818
-
819
- public String getKey (){
820
- return userID + ":" + id ;
821
- }
822
-
823
- public boolean isCanceled (){
824
- return status .equals ("canceled" );
825
- }
826
-
827
- public String getQuery (){
828
- PlainSelect plainSelect = (PlainSelect ) select .getSelectBody ();
829
-
830
- //Update offset and limit
831
- if (offset !=null ){
832
- Offset o = plainSelect .getOffset ();
833
- if (o ==null ) o = new Offset ();
834
- o .setOffset (offset );
835
- plainSelect .setOffset (o );
836
- }
837
- if (limit !=null ){
838
- Limit l = plainSelect .getLimit ();
839
- if (l ==null ){
840
- l = new Limit ();
841
- l .setRowCount (limit );
842
- plainSelect .setLimit (l );
843
- }
844
- }
845
- String query = plainSelect .toString ();
846
-
847
-
848
-
849
-
850
- //Prepend any "with" clause that might be present
851
- if (select .getWithItemsList ()!=null ){
852
- java .util .Iterator <WithItem > i2 = select .getWithItemsList ().iterator ();
853
- query = "with " + i2 .next () + " \r \n " + query ;
854
- }
855
-
856
- return query ;
857
- }
858
-
859
- public String getCountQuery (){
860
-
861
- PlainSelect plainSelect = (PlainSelect ) select .getSelectBody ();
862
- plainSelect .setSelectItems (selectCount );
863
- String query = plainSelect .toString ();
864
-
865
-
866
- //Add prepend any "with" clause that might be present
867
- if (!select .getWithItemsList ().isEmpty ()){
868
- java .util .Iterator <WithItem > i2 = select .getWithItemsList ().iterator ();
869
- query = "with " + i2 .next () + " \r \n " + query ;
870
- }
871
-
872
- return query ;
873
- }
874
-
875
- public boolean countTotal (){
876
- if (countTotal ){
877
- if (format .equals ("json" )) return true ;
878
- }
879
- return false ;
880
- }
881
-
882
- public boolean addMetadata (){
883
- return addMetadata ;
884
- }
885
-
886
- public String getOutputFormat (){
887
- return format ;
888
- }
889
-
890
- public javaxt .io .File getOutput (){
891
- return new javaxt .io .File (jobDir .toString () + userID + "/" + id + "." + format );
892
- }
893
-
894
-
895
- public String getContentType (){
896
- if (format .equals ("tsv" )){
897
- return "text/plain" ;
898
- }
899
- else if (format .equals ("csv" ))
900
- return "text/csv" ;
901
- else {
902
- return "application/json" ;
903
- }
904
- }
905
-
906
-
907
- public void log (){
908
- if (logDir !=null ){
909
- javaxt .io .File file = new javaxt .io .File (logDir .toString () + userID + "/" + id + ".json" );
910
- file .write (toJson ().toString ());
911
- }
912
- }
913
-
914
- public JSONObject toJson () {
915
- JSONObject json = new JSONObject ();
916
- json .set ("user_id" , userID );
917
- json .set ("job_id" , id );
918
- json .set ("status" , status );
919
- json .set ("query" , getQuery ());
920
- json .set ("created_at" , created );
921
- json .set ("updated_at" , updated );
922
- return json ;
923
- }
924
- }
925
-
926
-
927
- //**************************************************************************
928
- //** QueryProcessor
929
- //**************************************************************************
930
- /** Thread used to execute queries
931
- */
932
- private class QueryProcessor implements Runnable {
933
- private Database database ;
934
- private QueryService queryService ;
935
-
936
- public QueryProcessor (Database database , QueryService queryService ){
937
- this .database = database ;
938
- this .queryService = queryService ;
939
- }
940
-
941
- public void run () {
942
-
943
- while (true ) {
944
-
945
- Object obj = null ;
946
- synchronized (pendingJobs ) {
947
- while (pendingJobs .isEmpty ()) {
948
- try {
949
- pendingJobs .wait ();
950
- }
951
- catch (InterruptedException e ) {
952
- return ;
953
- }
954
- }
955
- obj = pendingJobs .get (0 );
956
- if (obj !=null ) pendingJobs .remove (0 );
957
- pendingJobs .notifyAll ();
958
- }
959
-
960
- if (obj !=null ){
961
-
962
- //Find query job
963
- String key = (String ) obj ;
964
- QueryJob job = null ;
965
- synchronized (jobs ) {
966
- job = jobs .get (key );
967
- }
968
-
969
- if (job !=null && !job .isCanceled ()){
970
- Connection conn = null ;
971
- try {
972
-
973
- //Update job status and set start time
974
- job .status = "running" ;
975
- job .updated = new javaxt .utils .Date ();
976
- long startTime = System .currentTimeMillis ();
977
- queryService .notify (job );
978
-
979
-
980
- //Open database connection
981
- conn = database .getConnection ();
982
-
983
-
984
- //Create temp table as needed
985
- CreateTable createTempTable = job .getTempTable ();
986
- if (createTempTable !=null ){
987
- conn .execute ("--" + job .getKey () + "\n " + createTempTable .toString ());
988
- if (job .isCanceled ()){
989
- conn .execute ("DROP TABLE " + createTempTable .getTable ().getName ());
990
- throw new Exception ();
991
- }
992
- }
993
-
994
-
995
- //Execute query and generate response
996
- String query = job .getQuery ();
997
- Writer writer = new Writer (job .getOutputFormat (), job .addMetadata ());
998
- Recordset rs = conn .getRecordset ("--" + job .getKey () + "\n " + query );
999
- while (rs .next ()){
1000
- writer .write (rs );
1001
- }
1002
- rs .close ();
1003
- if (job .isCanceled ()) throw new Exception ();
1004
-
1005
-
1006
- //Count total records as needed
1007
- if (job .countTotal ()){
1008
- javaxt .sql .Record record = conn .getRecord (job .getCountQuery ());
1009
- if (record !=null ){
1010
- Long ttl = record .get (0 ).toLong ();
1011
- if (ttl !=null ){
1012
- writer .setCount (ttl );
1013
- }
1014
- }
1015
- }
1016
- if (job .isCanceled ()) throw new Exception ();
1017
-
1018
-
1019
-
1020
- //Drop temp table
1021
- if (createTempTable !=null ){
1022
- conn .execute ("DROP TABLE " + createTempTable .getTable ().getName ());
1023
- }
1024
- if (job .isCanceled ()) throw new Exception ();
1025
-
1026
-
1027
- //Close database connection
1028
- conn .close ();
1029
-
1030
-
1031
-
1032
- //Set elapsed time
1033
- writer .setElapsedTime (System .currentTimeMillis ()-startTime );
1034
-
1035
-
1036
- //Write output to a file
1037
- javaxt .io .File file = job .getOutput ();
1038
- file .write (writer .toString ());
1039
-
1040
-
1041
- //Update job status
1042
- job .status = "complete" ;
1043
- job .updated = new javaxt .utils .Date ();
1044
- queryService .notify (job );
1045
- }
1046
- catch (Exception e ){
1047
- if (conn !=null ) conn .close ();
1048
- javaxt .io .File file = job .getOutput ();
1049
- if (job .isCanceled ()){
1050
- file .delete ();
1051
- }
1052
- else {
1053
- job .status = "failed" ;
1054
- job .updated = new javaxt .utils .Date ();
1055
- queryService .notify (job );
1056
-
1057
-
1058
- java .io .PrintStream ps = null ;
1059
- try {
1060
- file .create ();
1061
- ps = new java .io .PrintStream (file .toFile ());
1062
- e .printStackTrace (ps );
1063
- ps .close ();
1064
- }
1065
- catch (Exception ex ) {
1066
- if (ps !=null ) ps .close ();
1067
- file .write (e .getMessage ());
1068
- }
1069
- }
1070
- }
1071
-
1072
-
1073
- //Add job to the completedJobs
1074
- if (!job .isCanceled ()){
1075
- synchronized (completedJobs ){
1076
- completedJobs .add (job .getKey ());
1077
- completedJobs .notify ();
1078
- }
1079
- }
1080
- }
1081
- }
1082
- else {
1083
- return ;
1084
- }
1085
- }
1086
- }
1087
- }
1
+ package javaxt .express .services ;
2
+ import javaxt .express .utils .*;
3
+ import javaxt .express .*;
4
+
5
+ import java .util .*;
6
+ import java .util .concurrent .ConcurrentHashMap ;
7
+ import java .io .StringReader ;
8
+ import java .math .BigDecimal ;
9
+ import java .sql .SQLException ;
10
+
11
+ import javaxt .sql .*;
12
+ import javaxt .json .*;
13
+ import static javaxt .utils .Console .console ;
14
+
15
+ import net .sf .jsqlparser .parser .*;
16
+ import net .sf .jsqlparser .statement .select .*;
17
+ import net .sf .jsqlparser .statement .Statement ;
18
+ import net .sf .jsqlparser .statement .Statements ;
19
+ import net .sf .jsqlparser .statement .create .table .CreateTable ;
20
+ import net .sf .jsqlparser .expression .LongValue ;
21
+
22
+ //******************************************************************************
23
+ //** QueryService
24
+ //******************************************************************************
25
+ /**
26
+ * WebService used to query a database. Queries are executed asynchronously.
27
+ * Clients submit a query and by executing an HTTP POST request to "/job"
28
+ * which, in turn returns a job ID. Clients can get job status and, when
29
+ * ready, the query results by executinig an HTTP GET request to "/job/{ID}".
30
+ * Query results maybe encapsulated using csv, tsv, jsv, or json (default).
31
+ * The format is specified when submitting a new job. Additional routes and
32
+ * capabilties are documented in the getServiceResponse() method.
33
+ * <p/>
34
+ * Note that this class requires JSqlParser which is used to validate
35
+ * queries and mitigate SQL injection by only allowing "select" statements.
36
+ * The validation logic is not foolproof and it is therefore recommended to
37
+ * restrict access to this service to authorized, trusted users (e.g. admins).
38
+ *
39
+ ******************************************************************************/
40
+
41
+ public class QueryService extends WebService {
42
+
43
+ private Database database ;
44
+ private javaxt .io .Directory jobDir ;
45
+ private javaxt .io .Directory logDir ;
46
+ private Map <String , QueryJob > jobs = new ConcurrentHashMap <>();
47
+ private List <String > pendingJobs = new LinkedList <>();
48
+ private List <String > completedJobs = new LinkedList <>();
49
+ private java .util .List <SelectItem > selectCount ;
50
+
51
+
52
+ //**************************************************************************
53
+ //** Constructor
54
+ //**************************************************************************
55
+ /** Used to create a new instance of this class
56
+ * @param database Database connection info
57
+ * @param jobDir Temp directory used to save query results (required)
58
+ * @param logDir Directory used to log queries (optional)
59
+ */
60
+ public QueryService (Database database , javaxt .io .Directory jobDir , javaxt .io .Directory logDir ){
61
+ this .database = database ;
62
+
63
+ //Set path to the jobs directory
64
+ if (jobDir !=null ) if (!jobDir .exists ()) jobDir .create ();
65
+ if (jobDir ==null || !jobDir .exists ()){
66
+ throw new IllegalArgumentException ("Invalid \" jobDir\" " );
67
+ }
68
+ this .jobDir = jobDir ;
69
+
70
+
71
+
72
+ //Set path to the log directory
73
+ if (logDir !=null ) if (!logDir .exists ()) logDir .create ();
74
+ if (logDir !=null && logDir .exists ()){
75
+ this .logDir = logDir ;
76
+ }
77
+
78
+
79
+
80
+ //Delete any orphan sql jobs
81
+ for (javaxt .io .Directory dir : jobDir .getSubDirectories ()){
82
+ dir .delete ();
83
+ }
84
+
85
+
86
+
87
+ //Test whether JSqlParser is in the classpath and parse a default "select count(*)" statement
88
+ try {
89
+ CCJSqlParserManager parserManager = new CCJSqlParserManager ();
90
+ Select select = (Select ) parserManager .parse (new StringReader ("SELECT count(*) FROM T" ));
91
+ PlainSelect plainSelect = (PlainSelect ) select .getSelectBody ();
92
+ selectCount = plainSelect .getSelectItems ();
93
+ }
94
+ catch (Throwable t ){
95
+ throw new IllegalArgumentException ("Failed to instantiate JSqlParser" );
96
+ }
97
+
98
+
99
+
100
+ //Spawn threads used to execute queries
101
+ int numThreads = 1 ; //TODO: Make configurable...
102
+ for (int i =0 ; i <numThreads ; i ++){
103
+ new Thread (new QueryProcessor (database , this )).start ();
104
+ }
105
+ }
106
+
107
+
108
+ //**************************************************************************
109
+ //** getServiceResponse
110
+ //**************************************************************************
111
+ /** Used to generate a response to an HTTP request. The default routes are
112
+ * as follows:
113
+ * <ul>
114
+ * <li>POST /job - Used to create a new query job and return a jobID</li>
115
+ * <li>GET /job/{jobID} - Returns query results or job status for a given jobID</li>
116
+ * <li>DELETE /job/{jobID} - Used to cancel query for a given jobID </li>
117
+ * <li>GET /jobs - Returns a list of all query jobs associated with the user</li>
118
+ * <li>GET /tables - Returns a list of all the tables in the database</li>
119
+ * </ul>
120
+ * If no path is provided in the request, the method will wait until the
121
+ * query is executed before returning a response. This may result in long
122
+ * wait times and the client might hang up or disconnect before the query
123
+ * is done (not recommended).
124
+ */
125
+ public ServiceResponse getServiceResponse (ServiceRequest request , Database database ) {
126
+ if (database ==null ) database = this .database ;
127
+
128
+ String path = request .getPath (0 ).toString ();
129
+ if (path !=null ){
130
+ if (path .equals ("jobs" )){
131
+ return list (request );
132
+ }
133
+ else if (path .equals ("job" )){
134
+ String method = request .getRequest ().getMethod ();
135
+ if (method .equals ("GET" )){
136
+ return getJob (request );
137
+ }
138
+ else if (method .equals ("POST" )){
139
+ return query (request , true );
140
+ }
141
+ else if (method .equals ("DELETE" )){
142
+ return cancel (request , database );
143
+ }
144
+ else {
145
+ return new ServiceResponse (501 , "Not implemented" );
146
+ }
147
+ }
148
+ else if (path .equals ("tables" )){
149
+ return getTables (request , database );
150
+ }
151
+ else {
152
+ return new ServiceResponse (501 , "Not implemented" );
153
+ }
154
+ }
155
+ else {
156
+ return query (request , false );
157
+ }
158
+ }
159
+
160
+
161
+ //**************************************************************************
162
+ //** notify
163
+ //**************************************************************************
164
+ /** Called whenever a job is created, updated, or deleted. You can override
165
+ * this method in your application (e.g. relay job status via websockets).
166
+ */
167
+ public void notify (QueryJob job ){}
168
+
169
+
170
+ //**************************************************************************
171
+ //** query
172
+ //**************************************************************************
173
+ /** Used to create a query job.
174
+ * @param request Parameters include
175
+ * <ul>
176
+ * <li>query or q - SQL select statement (required)</li>
177
+ * <li>offset - Used to specify a start row (optional)</li>
178
+ * <li>limit - Used to specify number of rows to return (optional). Default is 25.</li>
179
+ * <li>format - Used to specify output format (optional). Default is json.</li>
180
+ * <li>metadata - Returns column info (optional). Default is false.</li>
181
+ * <li>count - Returns total number of records regardless of offset and/or
182
+ * limit (optional). Default is false.
183
+ * </li>
184
+ * </ul>
185
+ */
186
+ private ServiceResponse query (ServiceRequest request , boolean async ) {
187
+ try {
188
+
189
+ //Get query
190
+ String query = getParameter ("q" , request ).toString ();
191
+ if (query ==null ) query = getParameter ("query" , request ).toString ();
192
+ if (query ==null ) throw new IllegalArgumentException ("Query is required" );
193
+
194
+
195
+ //Get Offset and Limit
196
+ Long offset = getParameter ("offset" , request ).toLong ();
197
+ Long limit = getParameter ("limit" , request ).toLong ();
198
+ if (limit ==null ) limit = 25L ;
199
+ if (offset ==null ){
200
+ Long page = getParameter ("page" , request ).toLong ();
201
+ if (page !=null ) offset = (page *limit )-limit ;
202
+ }
203
+
204
+
205
+
206
+ //Parse sql statement using JSQLParser
207
+ Select select = null ;
208
+ CreateTable createTempTable = null ;
209
+ Statements statements = CCJSqlParserUtil .parseStatements (query );
210
+ if (statements !=null ){
211
+
212
+ Iterator <Statement > it = statements .getStatements ().iterator ();
213
+ while (it .hasNext ()){
214
+ Statement statement = it .next ();
215
+ if (statement instanceof CreateTable ){
216
+ CreateTable createTable = (CreateTable ) statement ;
217
+ boolean isTemporaryTable = false ;
218
+ Iterator <String > i2 = createTable .getCreateOptionsStrings ().iterator ();
219
+ while (i2 .hasNext ()){
220
+ String option = i2 .next ();
221
+ if (option .equalsIgnoreCase ("TEMPORARY" )) isTemporaryTable = true ;
222
+ break ;
223
+ }
224
+
225
+ if (isTemporaryTable ){
226
+ if (select !=null ) throw new IllegalArgumentException ("Temporary table must be created before the SELECT statement" );
227
+ if (createTempTable !=null ) throw new IllegalArgumentException ("Only 1 temp table allowed" );
228
+ createTempTable = createTable ;
229
+ }
230
+ else {
231
+ throw new IllegalArgumentException ("CREATE TABLE statements not allowed" );
232
+ }
233
+ }
234
+ else if (statement instanceof Select ){
235
+ if (select !=null ) throw new IllegalArgumentException ("Only 1 SELECT statement allowed" );
236
+ select = (Select ) statement ;
237
+ }
238
+ else {
239
+ throw new IllegalArgumentException (statement .getClass ().getSimpleName () + " statements not allowed" );
240
+ }
241
+ }
242
+ }
243
+
244
+
245
+
246
+
247
+ //Check whether the select statement has illegal or unsupported functions
248
+ checkSelect ((PlainSelect ) select .getSelectBody ());
249
+
250
+
251
+
252
+ //Collect misc params
253
+ JSONObject params = new JSONObject ();
254
+ params .set ("format" , getParameter ("format" , request ).toString ());
255
+ Boolean addMetadata = getParameter ("metadata" , request ).toBoolean ();
256
+ if (addMetadata !=null && addMetadata ==true ){
257
+ params .set ("metadata" , true );
258
+ }
259
+ Boolean count = getParameter ("count" , request ).toBoolean ();
260
+ if (count !=null && count ==true ){
261
+ params .set ("count" , true );
262
+ }
263
+
264
+
265
+
266
+ //Create job
267
+ User user = (User ) request .getUser ();
268
+ Long userID = user ==null ? 0 : user .getID ();
269
+ QueryJob job = new QueryJob (userID , select , offset , limit , params );
270
+ if (createTempTable !=null ) job .addTempTable (createTempTable );
271
+ String key = job .getKey ();
272
+ job .log ();
273
+ notify (job );
274
+
275
+
276
+ //Update list of jobs
277
+ synchronized (jobs ) {
278
+ jobs .put (key , job );
279
+ jobs .notify ();
280
+ }
281
+
282
+
283
+ //Update pendingJobs
284
+ synchronized (pendingJobs ) {
285
+ pendingJobs .add (key );
286
+ pendingJobs .notify ();
287
+ }
288
+
289
+
290
+ //Generate response
291
+ if (async ){
292
+ return new ServiceResponse (job .toJson ());
293
+ }
294
+ else {
295
+ synchronized (completedJobs ) {
296
+ while (!completedJobs .contains (key )) {
297
+ try {
298
+ completedJobs .wait ();
299
+ }
300
+ catch (InterruptedException e ) {
301
+ break ;
302
+ }
303
+ }
304
+ }
305
+ return getJobResponse (job );
306
+ }
307
+ }
308
+ catch (Exception e ){
309
+ if (e instanceof net .sf .jsqlparser .JSQLParserException ){
310
+ e = new Exception ("Unsupported or Invalid SQL Statement" );
311
+ }
312
+ return new ServiceResponse (e );
313
+ }
314
+ }
315
+
316
+
317
+ //**************************************************************************
318
+ //** checkSelect
319
+ //**************************************************************************
320
+ /** Used to check whether the select statement has illegal or unsupported
321
+ * functions
322
+ */
323
+ protected void checkSelect (PlainSelect plainSelect ){}
324
+
325
+
326
+ //**************************************************************************
327
+ //** Writer
328
+ //**************************************************************************
329
+ /** Used to generate json, csv, tsv, etc using records from the database
330
+ */
331
+ private class Writer {
332
+
333
+ private String format ;
334
+ private StringBuilder str ;
335
+ private long x = 0 ;
336
+ private Long elapsedTime ;
337
+ private Long count ;
338
+ private JSONArray metadata ;
339
+ private boolean addMetadata = false ;
340
+ private boolean isClosed = false ;
341
+
342
+ public Writer (String format , boolean addMetadata ){
343
+ str = new StringBuilder ();
344
+ this .format = format ;
345
+ this .addMetadata = addMetadata ;
346
+ }
347
+
348
+ public void write (Recordset rs ){
349
+ if (isClosed ) return ; //throw exception?
350
+
351
+ Field [] fields = rs .getFields ();
352
+ if (x ==0 ){
353
+
354
+ metadata = new JSONArray ();
355
+ int count = 1 ;
356
+ for (Field field : fields ) {
357
+ JSONObject json = new JSONObject ();
358
+ json .set ("id" , count );
359
+ json .set ("name" , field .getName ());
360
+ json .set ("type" , field .getType ());
361
+ json .set ("class" , field .getClassName ());
362
+ json .set ("table" , field .getTableName ());
363
+ metadata .add (json );
364
+ count ++;
365
+ }
366
+
367
+ if (format .equals ("tsv" ) || format .equals ("csv" )){
368
+ String s = format .equals ("tsv" ) ? "\t " : "," ;
369
+ for (int i =0 ; i <fields .length ; i ++){
370
+ if (i >0 ) str .append (s );
371
+ str .append (fields [i ].getName ());
372
+ }
373
+ str .append ("\r \n " );
374
+ }
375
+ else if (format .equals ("jsv" )){
376
+ str .append ("{\" cols\" :[" );
377
+ for (int i =0 ; i <fields .length ; i ++){
378
+ if (i >0 ) str .append ("," );
379
+ str .append ("\" " );
380
+ str .append (fields [i ].getName ());
381
+ str .append ("\" " );
382
+ }
383
+ str .append ("]," );
384
+ str .append ("\" rows\" :[" );
385
+ }
386
+ else if (format .equals ("json" )){
387
+ str .append ("{\" rows\" :[" );
388
+ }
389
+ }
390
+
391
+
392
+ if (format .equals ("json" )){
393
+ JSONObject json = new JSONObject ();
394
+ JSONObject values = DbUtils .getJson (fields );
395
+ for (Field field : fields ){
396
+ String fieldName = field .getName ();
397
+ String key = StringUtils .underscoreToCamelCase (fieldName .toLowerCase ());
398
+ Object val = field .getValue ().toObject ();
399
+ if (val ==null ){
400
+ json .set (fieldName , "null" );
401
+ }
402
+ else {
403
+ json .set (fieldName , values .get (key ));
404
+ }
405
+ }
406
+
407
+ if (x >0 ) str .append ("," );
408
+ str .append (json .toString ().replace ("\" null\" " , "null" )); //<-- this is a bit of a hack...
409
+
410
+ }
411
+ else if (format .equals ("jsv" )){
412
+ JSONArray arr = new JSONArray ();
413
+ JSONObject values = DbUtils .getJson (fields );
414
+ for (Field field : fields ){
415
+ String fieldName = field .getName ();
416
+ String key = StringUtils .underscoreToCamelCase (fieldName .toLowerCase ());
417
+ arr .add (values .get (key ));
418
+ }
419
+
420
+ if (x >0 ) str .append ("," );
421
+ str .append (arr .toString ().replace ("\" null\" " , "null" )); //<-- same logic as json...
422
+
423
+ }
424
+ else if (format .equals ("tsv" ) || format .equals ("csv" )){
425
+ String s = format .equals ("tsv" ) ? "\t " : "," ;
426
+ for (int i =0 ; i <fields .length ; i ++){
427
+ if (i >0 ) str .append (s );
428
+ Object value = fields [i ].getValue ().toObject ();
429
+ if (value ==null ){
430
+ value = "" ;
431
+ }
432
+ else {
433
+ if (value instanceof String ){
434
+ String v = (String ) value ;
435
+ if (v .contains (s )){
436
+ value = "\" " + v + "\" " ;
437
+ }
438
+ }
439
+ else if (value instanceof javaxt .utils .Date ) {
440
+ value = ((javaxt .utils .Date ) value ).toISOString ();
441
+ }
442
+ else if (value instanceof java .util .Date ) {
443
+ value = new javaxt .utils .Date (((java .util .Date ) value )).toISOString ();
444
+ }
445
+ else if (value instanceof java .util .Calendar ) {
446
+ value = new javaxt .utils .Date (((java .util .Calendar ) value )).toISOString ();
447
+ }
448
+
449
+ }
450
+ str .append (value );
451
+ }
452
+ str .append ("\r \n " );
453
+ }
454
+
455
+ x ++;
456
+ }
457
+
458
+
459
+ public void includeMetadata (boolean b ){
460
+ addMetadata = b ;
461
+ }
462
+
463
+
464
+ public void setElapsedTime (long elapsedTime ){
465
+ this .elapsedTime = elapsedTime ;
466
+ }
467
+
468
+
469
+ public void setCount (long count ){
470
+ this .count = count ;
471
+ }
472
+
473
+
474
+ public void close (){
475
+ isClosed = true ;
476
+
477
+ if (format .equals ("json" )){
478
+ if (x >0 ){
479
+ str .append ("]" );
480
+
481
+
482
+ if (addMetadata ){
483
+ if (metadata !=null ){
484
+ str .append (",\" metadata\" :" );
485
+ str .append (metadata );
486
+ }
487
+ }
488
+
489
+
490
+ if (count !=null ){
491
+ str .append (",\" total_rows\" :" );
492
+ str .append (count );
493
+ }
494
+
495
+ if (this .elapsedTime !=null ){
496
+ double elapsedTime = (double )(this .elapsedTime )/1000d ;
497
+ BigDecimal time = new BigDecimal (elapsedTime ).setScale (3 , BigDecimal .ROUND_HALF_UP );
498
+ str .append (",\" time\" :" );
499
+ str .append (time );
500
+ }
501
+
502
+ str .append ("}" );
503
+
504
+ }
505
+ else {
506
+ str .append ("{}" );
507
+ }
508
+ }
509
+ else if (format .equals ("jsv" )){
510
+ if (x >0 ){
511
+ str .append ("]}" );
512
+ }
513
+ else {
514
+ str .append ("{}" );
515
+ }
516
+ }
517
+
518
+ }
519
+
520
+
521
+ public String toString (){
522
+ if (!isClosed ) close ();
523
+ return str .toString ();
524
+ }
525
+ }
526
+
527
+
528
+ //**************************************************************************
529
+ //** list
530
+ //**************************************************************************
531
+ /** Returns an unordered list of jobs
532
+ */
533
+ private ServiceResponse list (ServiceRequest request ) {
534
+
535
+ User user = (User ) request .getUser ();
536
+ JSONArray arr = new JSONArray ();
537
+ synchronized (jobs ) {
538
+ for (String key : jobs .keySet ()){
539
+ QueryJob job = jobs .get (key );
540
+ if (job .userID ==user .getID ()){
541
+ arr .add (job .toJson ());
542
+ }
543
+ }
544
+ }
545
+
546
+ return new ServiceResponse (arr );
547
+ }
548
+
549
+
550
+ //**************************************************************************
551
+ //** getJob
552
+ //**************************************************************************
553
+ /** Used to return the status or results for a given jobID. Example:
554
+ * [GET] sql/job/{jobID}
555
+ */
556
+ private ServiceResponse getJob (ServiceRequest request ) {
557
+ String id = request .getPath (1 ).toString ();
558
+ User user = (User ) request .getUser ();
559
+ Long userID = user ==null ? 0 : user .getID ();
560
+ QueryJob job = getJob (id , userID );
561
+ if (job ==null ) return new ServiceResponse (404 );
562
+ return getJobResponse (job );
563
+ }
564
+
565
+
566
+ //**************************************************************************
567
+ //** getJob
568
+ //**************************************************************************
569
+ /** Returns a job for a given jobID and user. Checks both the pending and
570
+ * completed job queues.
571
+ */
572
+ private QueryJob getJob (String jobID , long userID ){
573
+ synchronized (jobs ) {
574
+ return jobs .get (userID + ":" + jobID );
575
+ }
576
+ }
577
+
578
+
579
+ //**************************************************************************
580
+ //** getJobResponse
581
+ //**************************************************************************
582
+ /** Used to generate a ServiceResponse for a given job. If a job has failed
583
+ * or is complete, returns the output of the job. If the job is pending or
584
+ * running, simply returns the job status.
585
+ */
586
+ private ServiceResponse getJobResponse (QueryJob job ){
587
+ ServiceResponse response ;
588
+ if (job .status .equals ("failed" )){
589
+ javaxt .io .File file = job .getOutput ();
590
+ String str = file .getText ();
591
+ response = new ServiceResponse (500 , str );
592
+ deleteJob (job );
593
+ }
594
+ else if (job .status .equals ("complete" )){
595
+ javaxt .io .File file = job .getOutput ();
596
+ String str = file .getText ();
597
+ response = new ServiceResponse (str );
598
+ response .setContentType (file .getContentType ());
599
+ deleteJob (job );
600
+ }
601
+ else {
602
+ response = new ServiceResponse (job .status );
603
+ }
604
+ return response ;
605
+ }
606
+
607
+
608
+ //**************************************************************************
609
+ //** deleteJob
610
+ //**************************************************************************
611
+ /** Removes a job from the queue and deletes any output files that might
612
+ * have been created with the job.
613
+ */
614
+ private void deleteJob (QueryJob job ){
615
+
616
+ String key = job .getKey ();
617
+ synchronized (pendingJobs ){
618
+ pendingJobs .remove (key );
619
+ pendingJobs .notify ();
620
+ }
621
+
622
+ synchronized (completedJobs ) {
623
+ completedJobs .remove (key );
624
+ completedJobs .notify ();
625
+ }
626
+
627
+ synchronized (jobs ) {
628
+ jobs .remove (key );
629
+ jobs .notify ();
630
+ }
631
+
632
+ javaxt .io .File file = job .getOutput ();
633
+ file .delete ();
634
+ }
635
+
636
+
637
+ //**************************************************************************
638
+ //** cancel
639
+ //**************************************************************************
640
+ /** Used to cancel a pending or running job.
641
+ */
642
+ private ServiceResponse cancel (ServiceRequest request , Database database ) {
643
+ String id = request .getPath (1 ).toString ();
644
+ User user = (User ) request .getUser ();
645
+ Long userID = user ==null ? 0 : user .getID ();
646
+ QueryJob job = getJob (id , userID );
647
+ if (job ==null ) return new ServiceResponse (404 );
648
+
649
+
650
+ String key = job .getKey ();
651
+ synchronized (pendingJobs ){
652
+ pendingJobs .remove (key );
653
+ pendingJobs .notify ();
654
+ }
655
+
656
+
657
+
658
+ try (Connection conn = database .getConnection ()) {
659
+
660
+ //Update job status
661
+ job .status = "canceled" ;
662
+ job .updated = new javaxt .utils .Date ();
663
+ notify (job );
664
+
665
+
666
+ //Cancel the query in the database
667
+ if (database .getDriver ().equals ("PostgreSQL" )){
668
+ Integer pid = getPid (job .getKey (), conn );
669
+ if (pid !=null ){
670
+ boolean jobCanceled = false ;
671
+
672
+ javaxt .sql .Record record = conn .getRecord ("SELECT pg_cancel_backend(" + pid + ")" );
673
+ if (record !=null ) jobCanceled = record .get (0 ).toBoolean ();
674
+
675
+ if (!jobCanceled ){
676
+ record = conn .getRecord ("SELECT pg_terminate_backend(" + pid + ")" );
677
+ if (record !=null ) jobCanceled = record .get (0 ).toBoolean ();
678
+ }
679
+
680
+
681
+ if (!jobCanceled ){
682
+ throw new Exception ();
683
+ }
684
+ }
685
+ }
686
+
687
+ //Update queue
688
+ deleteJob (job );
689
+
690
+
691
+ //return response
692
+ return new ServiceResponse (job .toJson ());
693
+ }
694
+ catch (Exception e ){
695
+ return new ServiceResponse (500 , "failed to cancel query" );
696
+ }
697
+ }
698
+
699
+
700
+ //**************************************************************************
701
+ //** getPid
702
+ //**************************************************************************
703
+ /** Returns process id for a given jobId
704
+ */
705
+ private Integer getPid (String key , Connection conn ) throws SQLException {
706
+ javaxt .sql .Record record = conn .getRecord (
707
+ "SELECT pid from pg_stat_activity where query like '--" + key + "%'" );
708
+ return record ==null ? null : record .get (0 ).toInteger ();
709
+ }
710
+
711
+
712
+ //**************************************************************************
713
+ //** getTables
714
+ //**************************************************************************
715
+ /** Returns a list of tables and columns
716
+ */
717
+ public ServiceResponse getTables (ServiceRequest request , Database database ) {
718
+ try {
719
+ JSONArray arr = new JSONArray ();
720
+ for (Table table : database .getTables ()){
721
+
722
+
723
+ //Get schema
724
+ String schema = table .getSchema ();
725
+ if (schema !=null ){
726
+
727
+ //Skip PostgreSQL metadata tables
728
+ if (schema .equalsIgnoreCase ("information_schema" )) continue ;
729
+ if (schema .toLowerCase ().startsWith ("pg_" )) continue ;
730
+ }
731
+
732
+
733
+ //Get columns
734
+ JSONArray columns = new JSONArray ();
735
+ for (Column column : table .getColumns ()){
736
+
737
+ JSONObject col = new JSONObject ();
738
+ col .set ("name" , column .getName ());
739
+ col .set ("type" , column .getType ());
740
+ if (column .isPrimaryKey ()){
741
+ col .set ("primaryKey" , true );
742
+ }
743
+ columns .add (col );
744
+ }
745
+
746
+
747
+ //Update array
748
+ JSONObject json = new JSONObject ();
749
+ json .set ("name" , table .getName ());
750
+ json .set ("schema" , schema );
751
+ json .set ("columns" , columns );
752
+ arr .add (json );
753
+ }
754
+
755
+
756
+ JSONObject json = new JSONObject ();
757
+ json .set ("tables" , arr );
758
+ return new ServiceResponse (json );
759
+ }
760
+ catch (Exception e ){
761
+ return new ServiceResponse (e );
762
+ }
763
+ }
764
+
765
+
766
+ //**************************************************************************
767
+ //** getParameter
768
+ //**************************************************************************
769
+ /** Used to extract a parameter either from the URL query string or the json
770
+ * in the request payload.
771
+ */
772
+ private javaxt .utils .Value getParameter (String name , ServiceRequest request ){
773
+ if (request .getRequest ().getMethod ().equals ("GET" )){
774
+ return request .getParameter (name );
775
+ }
776
+ else {
777
+ JSONObject json = request .getJson ();
778
+ if (json .has (name )){
779
+ return new javaxt .utils .Value (json .get (name ).toObject ());
780
+ }
781
+ else {
782
+ return request .getParameter (name );
783
+ }
784
+ }
785
+ }
786
+
787
+
788
+ //**************************************************************************
789
+ //** QueryJob
790
+ //**************************************************************************
791
+ public class QueryJob {
792
+
793
+ private String id ;
794
+ private long userID ;
795
+ private Select select ;
796
+ private Long offset ;
797
+ private LongValue limit ;
798
+ private javaxt .utils .Date created ;
799
+ private javaxt .utils .Date updated ;
800
+ private String status ;
801
+ private String format ;
802
+ private boolean countTotal = false ;
803
+ private boolean addMetadata = false ;
804
+ private CreateTable tempTable ;
805
+
806
+
807
+ public QueryJob (long userID , Select select , Long offset , Long limit , JSONObject params ) {
808
+ this .id = UUID .randomUUID ().toString ();
809
+ this .userID = userID ;
810
+ this .select = select ;
811
+ this .offset = offset ;
812
+ this .limit = limit ==null ? null : new LongValue (limit );
813
+ this .created = new javaxt .utils .Date ();
814
+ this .updated = this .created .clone ();
815
+ this .status = "pending" ;
816
+
817
+ String format = params .get ("format" ).toString ();
818
+ if (format ==null ) format ="" ;
819
+ format = format .trim ().toLowerCase ();
820
+ if (format .equals ("csv" ) || format .equals ("tsv" ) || format .equals ("jsv" )){
821
+ this .format = format ;
822
+ }
823
+ else this .format = "json" ;
824
+
825
+
826
+ if (params .has ("count" )){
827
+ countTotal = params .get ("count" ).toBoolean ();
828
+ }
829
+
830
+ if (params .has ("metadata" )){
831
+ addMetadata = params .get ("metadata" ).toBoolean ();
832
+ }
833
+ }
834
+
835
+ public String getID (){
836
+ return id ;
837
+ }
838
+
839
+ public long getUserID (){
840
+ return userID ;
841
+ }
842
+
843
+ public String getStatus (){
844
+ return status ;
845
+ }
846
+
847
+ public void addTempTable (CreateTable stmt ){
848
+ tempTable = stmt ;
849
+ }
850
+
851
+ public CreateTable getTempTable (){
852
+ return tempTable ;
853
+ }
854
+
855
+ public String getKey (){
856
+ return userID + ":" + id ;
857
+ }
858
+
859
+ public boolean isCanceled (){
860
+ return status .equals ("canceled" );
861
+ }
862
+
863
+ public String getQuery (){
864
+ PlainSelect plainSelect = (PlainSelect ) select .getSelectBody ();
865
+
866
+ //Update offset and limit
867
+ if (offset !=null ){
868
+ Offset o = plainSelect .getOffset ();
869
+ if (o ==null ) o = new Offset ();
870
+ o .setOffset (offset );
871
+ plainSelect .setOffset (o );
872
+ }
873
+ if (limit !=null ){
874
+ Limit l = plainSelect .getLimit ();
875
+ if (l ==null ){
876
+ l = new Limit ();
877
+ l .setRowCount (limit );
878
+ plainSelect .setLimit (l );
879
+ }
880
+ }
881
+ String query = plainSelect .toString ();
882
+
883
+
884
+
885
+
886
+ //Prepend any "with" clause that might be present
887
+ if (select .getWithItemsList ()!=null ){
888
+ java .util .Iterator <WithItem > i2 = select .getWithItemsList ().iterator ();
889
+ query = "with " + i2 .next () + " \r \n " + query ;
890
+ }
891
+
892
+ return query ;
893
+ }
894
+
895
+ public String getCountQuery (){
896
+
897
+ PlainSelect plainSelect = (PlainSelect ) select .getSelectBody ();
898
+ plainSelect .setSelectItems (selectCount );
899
+ String query = plainSelect .toString ();
900
+
901
+
902
+ //Add prepend any "with" clause that might be present
903
+ if (!select .getWithItemsList ().isEmpty ()){
904
+ java .util .Iterator <WithItem > i2 = select .getWithItemsList ().iterator ();
905
+ query = "with " + i2 .next () + " \r \n " + query ;
906
+ }
907
+
908
+ return query ;
909
+ }
910
+
911
+ public boolean countTotal (){
912
+ if (countTotal ){
913
+ if (format .equals ("json" )) return true ;
914
+ }
915
+ return false ;
916
+ }
917
+
918
+ public boolean addMetadata (){
919
+ return addMetadata ;
920
+ }
921
+
922
+ public String getOutputFormat (){
923
+ return format ;
924
+ }
925
+
926
+ public javaxt .io .File getOutput (){
927
+ return new javaxt .io .File (jobDir .toString () + userID + "/" + id + "." + format );
928
+ }
929
+
930
+
931
+ public String getContentType (){
932
+ if (format .equals ("tsv" )){
933
+ return "text/plain" ;
934
+ }
935
+ else if (format .equals ("csv" ))
936
+ return "text/csv" ;
937
+ else {
938
+ return "application/json" ;
939
+ }
940
+ }
941
+
942
+
943
+ public void log (){
944
+ if (logDir !=null ){
945
+ javaxt .io .File file = new javaxt .io .File (logDir .toString () + userID + "/" + id + ".json" );
946
+ file .write (toJson ().toString ());
947
+ }
948
+ }
949
+
950
+ public JSONObject toJson () {
951
+ JSONObject json = new JSONObject ();
952
+ json .set ("user_id" , userID );
953
+ json .set ("job_id" , id );
954
+ json .set ("status" , status );
955
+ json .set ("query" , getQuery ());
956
+ json .set ("created_at" , created );
957
+ json .set ("updated_at" , updated );
958
+ return json ;
959
+ }
960
+ }
961
+
962
+
963
+ //**************************************************************************
964
+ //** QueryProcessor
965
+ //**************************************************************************
966
+ /** Thread used to execute queries
967
+ */
968
+ private class QueryProcessor implements Runnable {
969
+ private Database database ;
970
+ private QueryService queryService ;
971
+
972
+ public QueryProcessor (Database database , QueryService queryService ){
973
+ this .database = database ;
974
+ this .queryService = queryService ;
975
+ }
976
+
977
+ public void run () {
978
+
979
+ while (true ) {
980
+
981
+ Object obj = null ;
982
+ synchronized (pendingJobs ) {
983
+ while (pendingJobs .isEmpty ()) {
984
+ try {
985
+ pendingJobs .wait ();
986
+ }
987
+ catch (InterruptedException e ) {
988
+ return ;
989
+ }
990
+ }
991
+ obj = pendingJobs .get (0 );
992
+ if (obj !=null ) pendingJobs .remove (0 );
993
+ pendingJobs .notifyAll ();
994
+ }
995
+
996
+ if (obj !=null ){
997
+
998
+ //Find query job
999
+ String key = (String ) obj ;
1000
+ QueryJob job = null ;
1001
+ synchronized (jobs ) {
1002
+ job = jobs .get (key );
1003
+ }
1004
+
1005
+ if (job !=null && !job .isCanceled ()){
1006
+ Connection conn = null ;
1007
+ try {
1008
+
1009
+ //Update job status and set start time
1010
+ job .status = "running" ;
1011
+ job .updated = new javaxt .utils .Date ();
1012
+ long startTime = System .currentTimeMillis ();
1013
+ queryService .notify (job );
1014
+
1015
+
1016
+ //Open database connection
1017
+ conn = database .getConnection ();
1018
+
1019
+
1020
+ //Create temp table as needed
1021
+ CreateTable createTempTable = job .getTempTable ();
1022
+ if (createTempTable !=null ){
1023
+ conn .execute ("--" + job .getKey () + "\n " + createTempTable .toString ());
1024
+ if (job .isCanceled ()){
1025
+ conn .execute ("DROP TABLE " + createTempTable .getTable ().getName ());
1026
+ throw new Exception ();
1027
+ }
1028
+ }
1029
+
1030
+
1031
+ //Execute query and generate response
1032
+ String query = job .getQuery ();
1033
+ Writer writer = new Writer (job .getOutputFormat (), job .addMetadata ());
1034
+ Recordset rs = conn .getRecordset ("--" + job .getKey () + "\n " + query );
1035
+ while (rs .next ()){
1036
+ writer .write (rs );
1037
+ }
1038
+ rs .close ();
1039
+ if (job .isCanceled ()) throw new Exception ();
1040
+
1041
+
1042
+ //Count total records as needed
1043
+ if (job .countTotal ()){
1044
+ javaxt .sql .Record record = conn .getRecord (job .getCountQuery ());
1045
+ if (record !=null ){
1046
+ Long ttl = record .get (0 ).toLong ();
1047
+ if (ttl !=null ){
1048
+ writer .setCount (ttl );
1049
+ }
1050
+ }
1051
+ }
1052
+ if (job .isCanceled ()) throw new Exception ();
1053
+
1054
+
1055
+
1056
+ //Drop temp table
1057
+ if (createTempTable !=null ){
1058
+ conn .execute ("DROP TABLE " + createTempTable .getTable ().getName ());
1059
+ }
1060
+ if (job .isCanceled ()) throw new Exception ();
1061
+
1062
+
1063
+ //Close database connection
1064
+ conn .close ();
1065
+
1066
+
1067
+
1068
+ //Set elapsed time
1069
+ writer .setElapsedTime (System .currentTimeMillis ()-startTime );
1070
+
1071
+
1072
+ //Write output to a file
1073
+ javaxt .io .File file = job .getOutput ();
1074
+ file .write (writer .toString ());
1075
+
1076
+
1077
+ //Update job status
1078
+ job .status = "complete" ;
1079
+ job .updated = new javaxt .utils .Date ();
1080
+ queryService .notify (job );
1081
+ }
1082
+ catch (Exception e ){
1083
+ if (conn !=null ) conn .close ();
1084
+ javaxt .io .File file = job .getOutput ();
1085
+ if (job .isCanceled ()){
1086
+ file .delete ();
1087
+ }
1088
+ else {
1089
+ job .status = "failed" ;
1090
+ job .updated = new javaxt .utils .Date ();
1091
+ queryService .notify (job );
1092
+
1093
+
1094
+ java .io .PrintStream ps = null ;
1095
+ try {
1096
+ file .create ();
1097
+ ps = new java .io .PrintStream (file .toFile ());
1098
+ e .printStackTrace (ps );
1099
+ ps .close ();
1100
+ }
1101
+ catch (Exception ex ) {
1102
+ if (ps !=null ) ps .close ();
1103
+ file .write (e .getMessage ());
1104
+ }
1105
+ }
1106
+ }
1107
+
1108
+
1109
+ //Add job to the completedJobs
1110
+ if (!job .isCanceled ()){
1111
+ synchronized (completedJobs ){
1112
+ completedJobs .add (job .getKey ());
1113
+ completedJobs .notify ();
1114
+ }
1115
+ }
1116
+ }
1117
+ }
1118
+ else {
1119
+ return ;
1120
+ }
1121
+ }
1122
+ }
1123
+ }
1088
1124
}
0 commit comments