diff --git a/src/main/java/org/xbib/elasticsearch/jdbc/strategy/Sink.java b/src/main/java/org/xbib/elasticsearch/jdbc/strategy/Sink.java index d651557b..5c16982b 100644 --- a/src/main/java/org/xbib/elasticsearch/jdbc/strategy/Sink.java +++ b/src/main/java/org/xbib/elasticsearch/jdbc/strategy/Sink.java @@ -143,13 +143,6 @@ public interface Sink { */ void delete(IndexableObject object) throws IOException; - /** - * Flush data to the sink - * - * @throws IOException when flush fails - */ - void flushIngest() throws IOException; - /** * Shutdown and release all resources, e.g. bulk processor and client * @throws IOException when shutdown fails diff --git a/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardContext.java b/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardContext.java index abc23cee..42d01c98 100644 --- a/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardContext.java +++ b/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardContext.java @@ -78,6 +78,8 @@ public class StandardContext implements Context { private Throwable throwable; + private Ingest ingest; + private final static List futures = new LinkedList<>(); private final static SourceMetric sourceMetric = new SourceMetric().start(); @@ -177,6 +179,24 @@ public Throwable getThrowable() { return throwable; } + public Ingest getIngest() { + return ingest; + } + + public Ingest getOrCreateIngest(Metric metric) throws IOException { + if (ingest == null) { + if (getIngestFactory() != null) { + ingest = getIngestFactory().create(); + if (ingest != null) { + ingest.setMetric(metric); + } + } else { + logger.warn("no ingest factory found"); + } + } + return ingest; + } + public DateTime getDateOfThrowable() { return dateOfThrowable; } @@ -230,7 +250,18 @@ public void afterFetch() throws Exception { logger.error(e.getMessage(), e); } try { + + logger.debug("afterFetch: flush ingest"); + flushIngest(); + getSink().afterFetch(); + + logger.debug("afterFetch: before ingest shutdown"); + if(ingest != null) { + ingest.shutdown(); + ingest = null; + } + } catch (Throwable e) { setThrowable(e); logger.error(e.getMessage(), e); @@ -250,6 +281,13 @@ public void shutdown() { logger.error(e.getMessage(), e); } } + + try { + flushIngest(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + if (sink != null) { try { sink.shutdown(); @@ -257,10 +295,31 @@ public void shutdown() { logger.error(e.getMessage(), e); } } + + if (ingest != null) { + logger.info("shutdown in progress"); + ingest.shutdown(); + ingest = null; + } + logger.info("shutdown completed"); writeState(); } + public void flushIngest() throws IOException { + if (ingest == null) { + return; + } + ingest.flushIngest(); + // wait for all outstanding bulk requests before continuing. Estimation is 60 seconds + try { + ingest.waitForResponses(TimeValue.timeValueSeconds(60)); + } catch (InterruptedException e) { + logger.warn("interrupted while waiting for responses"); + Thread.currentThread().interrupt(); + } + } + @Override public void resetCounter() { if (sourceMetric != null) { @@ -401,6 +460,8 @@ protected void prepareContext(S source, Sink sink) throws IOException { sink.setContext(this); } + + private IngestFactory createIngestFactory(final Settings settings) { return new IngestFactory() { @Override diff --git a/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardSink.java b/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardSink.java index cb6503bd..6a34384a 100644 --- a/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardSink.java +++ b/src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardSink.java @@ -46,8 +46,6 @@ public class StandardSink implements Sink { protected C context; - protected Ingest ingest; - protected Settings indexSettings; protected Map indexMappings; @@ -89,16 +87,8 @@ public Metric getMetric() { @Override public synchronized void beforeFetch() throws IOException { - if (ingest == null) { - if (context.getIngestFactory() != null) { - ingest = context.getIngestFactory().create(); - if(ingest != null) { - ingest.setMetric(metric); - } - } else { - logger.warn("no ingest factory found"); - } - } + Ingest ingest = context.getOrCreateIngest(metric); + if (ingest == null) { logger.warn("no ingest found"); return; @@ -121,31 +111,26 @@ public synchronized void beforeFetch() throws IOException { @Override public synchronized void afterFetch() throws IOException { + Ingest ingest = context.getIngest(); if (ingest == null) { return; } - logger.debug("afterFetch: flush ingest"); - flushIngest(); logger.debug("afterFetch: stop bulk"); ingest.stopBulk(index); logger.debug("afterFetch: refresh index"); ingest.refreshIndex(index); - logger.debug("afterFetch: before ingest shutdown"); - ingest.shutdown(); - ingest = null; + logger.debug("afterFetch: after ingest shutdown"); } @Override public synchronized void shutdown() { - if (ingest == null) { + Ingest ingest = context.getIngest(); + if(ingest == null) { return; } try { - logger.info("shutdown in progress"); - flushIngest(); ingest.stopBulk(index); - ingest.shutdown(); } catch (IOException e) { logger.error(e.getMessage(), e); } @@ -198,6 +183,7 @@ public String getId() { @Override public void index(IndexableObject object, boolean create) throws IOException { + Ingest ingest = context.getIngest(); if (ingest == null) { return; } @@ -238,6 +224,8 @@ public void index(IndexableObject object, boolean create) throws IOException { @Override public void delete(IndexableObject object) { + Ingest ingest = context.getIngest(); + if (ingest == null) { return; } @@ -270,19 +258,4 @@ public void delete(IndexableObject object) { ingest.bulkDelete(request); } - @Override - public void flushIngest() throws IOException { - if (ingest == null) { - return; - } - ingest.flushIngest(); - // wait for all outstanding bulk requests before continuing. Estimation is 60 seconds - try { - ingest.waitForResponses(TimeValue.timeValueSeconds(60)); - } catch (InterruptedException e) { - logger.warn("interrupted while waiting for responses"); - Thread.currentThread().interrupt(); - } - } - } diff --git a/src/test/java/org/xbib/elasticsearch/jdbc/strategy/mock/MockSink.java b/src/test/java/org/xbib/elasticsearch/jdbc/strategy/mock/MockSink.java index a025c6e7..617726d8 100644 --- a/src/test/java/org/xbib/elasticsearch/jdbc/strategy/mock/MockSink.java +++ b/src/test/java/org/xbib/elasticsearch/jdbc/strategy/mock/MockSink.java @@ -123,10 +123,6 @@ public String getId() { return null; } - @Override - public void flushIngest() throws IOException { - } - @Override public void shutdown() throws IOException { }