27
27
import org .elasticsearch .common .lang3 .StringUtils ;
28
28
import org .elasticsearch .common .unit .TimeValue ;
29
29
import org .xbib .elasticsearch .common .util .IndexableObject ;
30
+ import org .xbib .elasticsearch .support .client .Ingest ;
30
31
31
32
import java .io .File ;
32
33
import java .io .IOException ;
@@ -98,14 +99,8 @@ public RebuildSink<C> setContext(StandardContext context) {
98
99
99
100
@ Override
100
101
public synchronized void beforeFetch () throws IOException {
101
- if (ingest == null ) {
102
- if (context .getIngestFactory () != null ) {
103
- ingest = context .getIngestFactory ().create ();
104
- ingest .setMetric (getMetric ());
105
- } else {
106
- logger .warn ("no ingest factory found" );
107
- }
108
- }
102
+ Ingest ingest = context .getOrCreateIngest (getMetric ());
103
+
109
104
if (ingest == null ) {
110
105
logger .warn ("no ingest found" );
111
106
return ;
@@ -120,6 +115,7 @@ public synchronized void beforeFetch() throws IOException {
120
115
}
121
116
122
117
private void createIndex () throws IOException {
118
+ Ingest ingest = context .getOrCreateIngest (getMetric ());
123
119
rebuild_index = generateNewIndexName ();
124
120
index = rebuild_index ;
125
121
@@ -153,11 +149,11 @@ private String generateNewIndexName() {
153
149
154
150
@ Override
155
151
public synchronized void afterFetch () throws IOException {
156
- if (ingest == null ) {
157
- ingest = context .getIngestFactory ().create ();
158
- }
152
+ Ingest ingest = context .getIngest ();
159
153
160
- flushIngest ();
154
+ if (ingest == null ) {
155
+ return ;
156
+ }
161
157
162
158
ingest .stopBulk (rebuild_index );
163
159
ingest .refreshIndex (rebuild_index );
@@ -177,11 +173,6 @@ public synchronized void afterFetch() throws IOException {
177
173
allExistingIndices .remove (rebuild_index );
178
174
179
175
removeOldIndices (allExistingIndices );
180
-
181
- if (ingest != null ) {
182
- ingest .shutdown ();
183
- }
184
- ingest = null ;
185
176
}
186
177
187
178
@ Override
@@ -218,6 +209,7 @@ public int compare(DateTime o1, DateTime o2) {
218
209
}
219
210
220
211
private List <String > getAllExistingIndices () {
212
+ Ingest ingest = context .getIngest ();
221
213
try {
222
214
GetIndexResponse indexResponse = ingest .client ().admin ().indices ().prepareGetIndex ().addIndices (index_prefix + "*" ).execute ().get ();
223
215
@@ -237,6 +229,7 @@ private List<String> getAllExistingIndices() {
237
229
}
238
230
239
231
private List <String > getCurrentActiveIndices () {
232
+ Ingest ingest = context .getIngest ();
240
233
GetAliasesResponse getAliasesResponse = ingest .client ().admin ().indices ().prepareGetAliases (alias ).execute ().actionGet ();
241
234
242
235
List <String > existingIndices = new ArrayList <>();
@@ -250,6 +243,7 @@ private List<String> getCurrentActiveIndices() {
250
243
}
251
244
252
245
private void removeOldIndices (List <String > indices ) {
246
+ Ingest ingest = context .getIngest ();
253
247
List <String > orderedIndexList = getOrderedIndexList (indices );
254
248
for (int i = 0 ; i < (orderedIndexList .size () - keep_last_indices ); i ++) {
255
249
String existingIndex = orderedIndexList .get (i );
@@ -259,6 +253,7 @@ private void removeOldIndices(List<String> indices) {
259
253
}
260
254
261
255
private void switchAliasToNewIndex (List <String > existingIndices ) {
256
+ Ingest ingest = context .getIngest ();
262
257
IndicesAliasesRequestBuilder prepareAliasesRequest = ingest .client ().admin ().indices ().prepareAliases ();
263
258
prepareAliasesRequest .addAlias (rebuild_index , alias ).execute ().actionGet ().isAcknowledged ();
264
259
0 commit comments