From efdf67aad7ef475a1cf6c7e497617436b0724f9d Mon Sep 17 00:00:00 2001 From: John Date: Mon, 18 Jan 2016 11:59:47 +0800 Subject: [PATCH] Update IndexDocumentProducer.java Add date format --- .../elasticsearch/river/kafka/IndexDocumentProducer.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/elasticsearch/river/kafka/IndexDocumentProducer.java b/src/main/java/org/elasticsearch/river/kafka/IndexDocumentProducer.java index b40a379..caea84d 100644 --- a/src/main/java/org/elasticsearch/river/kafka/IndexDocumentProducer.java +++ b/src/main/java/org/elasticsearch/river/kafka/IndexDocumentProducer.java @@ -53,7 +53,8 @@ public void addMessagesToBulkProcessor(final MessageAndMetadata messageAndMetada String message = null; IndexRequest request = null; - + String sdate = Calendar.getInstance().get(Calendar.YEAR) + "-" + (Calendar.getInstance().get(Calendar.MONTH) + 1)+ "-" + Calendar.getInstance().get(Calendar.DATE); + switch (riverConfig.getMessageType()) { case STRING: message = XContentFactory.jsonBuilder() @@ -61,13 +62,13 @@ public void addMessagesToBulkProcessor(final MessageAndMetadata messageAndMetada .field("value", new String(messageBytes, "UTF-8")) .endObject() .string(); - request = Requests.indexRequest(riverConfig.getIndexName()). + request = Requests.indexRequest(riverConfig.getIndexName()+sdate). type(riverConfig.getTypeName()). source(message); break; case JSON: final Map messageMap = reader.readValue(messageBytes); - request = Requests.indexRequest(riverConfig.getIndexName()). + request = Requests.indexRequest(riverConfig.getIndexName()+sdate). type(riverConfig.getTypeName()). source(messageMap); }