Skip to content

Move ingest creation to context object #655

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions src/main/java/org/xbib/elasticsearch/jdbc/strategy/Sink.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,6 @@ public interface Sink<C extends Context> {
*/
void delete(IndexableObject object) throws IOException;

/**
* Flush data to the sink
*
* @throws IOException when flush fails
*/
void flushIngest() throws IOException;

/**
* Shutdown and release all resources, e.g. bulk processor and client
* @throws IOException when shutdown fails
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class StandardContext<S extends JDBCSource> implements Context<S, Sink> {

private Throwable throwable;

private Ingest ingest;

private final static List<Future> futures = new LinkedList<>();

private final static SourceMetric sourceMetric = new SourceMetric().start();
Expand Down Expand Up @@ -177,6 +179,24 @@ public Throwable getThrowable() {
return throwable;
}

public Ingest getIngest() {
return ingest;
}

public Ingest getOrCreateIngest(Metric metric) throws IOException {
if (ingest == null) {
if (getIngestFactory() != null) {
ingest = getIngestFactory().create();
if (ingest != null) {
ingest.setMetric(metric);
}
} else {
logger.warn("no ingest factory found");
}
}
return ingest;
}

public DateTime getDateOfThrowable() {
return dateOfThrowable;
}
Expand Down Expand Up @@ -230,7 +250,18 @@ public void afterFetch() throws Exception {
logger.error(e.getMessage(), e);
}
try {

logger.debug("afterFetch: flush ingest");
flushIngest();

getSink().afterFetch();

logger.debug("afterFetch: before ingest shutdown");
if(ingest != null) {
ingest.shutdown();
ingest = null;
}

} catch (Throwable e) {
setThrowable(e);
logger.error(e.getMessage(), e);
Expand All @@ -250,17 +281,45 @@ public void shutdown() {
logger.error(e.getMessage(), e);
}
}

try {
flushIngest();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}

if (sink != null) {
try {
sink.shutdown();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}

if (ingest != null) {
logger.info("shutdown in progress");
ingest.shutdown();
ingest = null;
}

logger.info("shutdown completed");
writeState();
}

public void flushIngest() throws IOException {
if (ingest == null) {
return;
}
ingest.flushIngest();
// wait for all outstanding bulk requests before continuing. Estimation is 60 seconds
try {
ingest.waitForResponses(TimeValue.timeValueSeconds(60));
} catch (InterruptedException e) {
logger.warn("interrupted while waiting for responses");
Thread.currentThread().interrupt();
}
}

@Override
public void resetCounter() {
if (sourceMetric != null) {
Expand Down Expand Up @@ -401,6 +460,8 @@ protected void prepareContext(S source, Sink sink) throws IOException {
sink.setContext(this);
}



private IngestFactory createIngestFactory(final Settings settings) {
return new IngestFactory() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ public class StandardSink<C extends StandardContext> implements Sink<C> {

protected C context;

protected Ingest ingest;

protected Settings indexSettings;

protected Map<String, String> indexMappings;
Expand Down Expand Up @@ -89,16 +87,8 @@ public Metric getMetric() {

@Override
public synchronized void beforeFetch() throws IOException {
if (ingest == null) {
if (context.getIngestFactory() != null) {
ingest = context.getIngestFactory().create();
if(ingest != null) {
ingest.setMetric(metric);
}
} else {
logger.warn("no ingest factory found");
}
}
Ingest ingest = context.getOrCreateIngest(metric);

if (ingest == null) {
logger.warn("no ingest found");
return;
Expand All @@ -121,31 +111,26 @@ public synchronized void beforeFetch() throws IOException {

@Override
public synchronized void afterFetch() throws IOException {
Ingest ingest = context.getIngest();
if (ingest == null) {
return;
}
logger.debug("afterFetch: flush ingest");
flushIngest();
logger.debug("afterFetch: stop bulk");
ingest.stopBulk(index);
logger.debug("afterFetch: refresh index");
ingest.refreshIndex(index);
logger.debug("afterFetch: before ingest shutdown");
ingest.shutdown();
ingest = null;

logger.debug("afterFetch: after ingest shutdown");
}

@Override
public synchronized void shutdown() {
if (ingest == null) {
Ingest ingest = context.getIngest();
if(ingest == null) {
return;
}
try {
logger.info("shutdown in progress");
flushIngest();
ingest.stopBulk(index);
ingest.shutdown();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
Expand Down Expand Up @@ -198,6 +183,7 @@ public String getId() {

@Override
public void index(IndexableObject object, boolean create) throws IOException {
Ingest ingest = context.getIngest();
if (ingest == null) {
return;
}
Expand Down Expand Up @@ -238,6 +224,8 @@ public void index(IndexableObject object, boolean create) throws IOException {

@Override
public void delete(IndexableObject object) {
Ingest ingest = context.getIngest();

if (ingest == null) {
return;
}
Expand Down Expand Up @@ -270,19 +258,4 @@ public void delete(IndexableObject object) {
ingest.bulkDelete(request);
}

@Override
public void flushIngest() throws IOException {
if (ingest == null) {
return;
}
ingest.flushIngest();
// wait for all outstanding bulk requests before continuing. Estimation is 60 seconds
try {
ingest.waitForResponses(TimeValue.timeValueSeconds(60));
} catch (InterruptedException e) {
logger.warn("interrupted while waiting for responses");
Thread.currentThread().interrupt();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,6 @@ public String getId() {
return null;
}

@Override
public void flushIngest() throws IOException {
}

@Override
public void shutdown() throws IOException {
}
Expand Down