-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathloadelastic-gmeta.py
More file actions
93 lines (79 loc) · 2.38 KB
/
Copy pathloadelastic-gmeta.py
File metadata and controls
93 lines (79 loc) · 2.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# this script loads a Globus (Gmeta) formatted JSON file into ES
import requests, json, os
import argparse
import pandas as pd
import ijson
import time
# Elasticsearch python libs
from elasticsearch import Elasticsearch
from elasticsearch import helpers
directory = ""
indexName = "pathology"
typeName = "reports"
THRESHOLD = 20000 # this regulates how much data gets loaded then is processed in a bulk group
def loadit():
es = Elasticsearch([{'host': 'localhost', 'port': '9200'}])
for filename in os.listdir(directory):
if filename.endswith(".json"):
json_filename = directory+filename
print("Loading " + json_filename)
with open(json_filename, 'r') as input_file:
i = 1
batchCtr = 1
bulk_action = []
bulkCount = 0
for rec in ijson.items(input_file, 'ingest_data'): #item
#print(rec)
gmeta = rec['gmeta']
print(gmeta)
for gm in gmeta:
bulk = {
"_index" : indexName,
"_type" : typeName,
"_id" : gm['subject'], #id
"_source" : gm,
}
bulk_action.append(bulk)
i = i + 1
batchCtr = batchCtr + 1
if batchCtr > THRESHOLD:
try:
#print(bulk_action)
bulkCount = bulkCount + batchCtr
helpers.bulk(es, bulk_action)
print ('Imported data ' + str(bulkCount-1) + ' successfully from ' + json_filename)
batchCtr = 1
bulk_action = []
except Exception as ex:
print ('Error:' + ex)
if i < THRESHOLD:
try:
helpers.bulk(es, bulk_action)
print ('Imported data ' + str(i-1) + ' successfully from ' + json_filename)
batchCtr = 1
bulk_action = []
except Exception as ex:
print ('Error:' + ex)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-d", required=True, help="dir path to json file(s)")
parser.add_argument("-thres", help="set the batch threshold")
parser.add_argument("-i", help="set the index name")
parser.add_argument("-t", help="set the type")
args = parser.parse_args()
if args.d:
directory = args.d
if directory[-1] != '/':
directory = directory + '/'
if args.thres:
THRESHOLD = int(args.thres)
print ("Batch threshold: " + str(THRESHOLD))
print(type(THRESHOLD))
if args.i:
indexName = args.i
if args.t:
typeName = args.t
start = time.time()
loadit()
end = time.time()
print("Elapsed time: {}".format((end-start)))