diff --git a/src/main/java/org/elasticsearch/river/kafka/IndexDocumentProducer.java b/src/main/java/org/elasticsearch/river/kafka/IndexDocumentProducer.java index b40a379..caea84d 100644 --- a/src/main/java/org/elasticsearch/river/kafka/IndexDocumentProducer.java +++ b/src/main/java/org/elasticsearch/river/kafka/IndexDocumentProducer.java @@ -53,7 +53,8 @@ public void addMessagesToBulkProcessor(final MessageAndMetadata messageAndMetada String message = null; IndexRequest request = null; - + String sdate = Calendar.getInstance().get(Calendar.YEAR) + "-" + (Calendar.getInstance().get(Calendar.MONTH) + 1)+ "-" + Calendar.getInstance().get(Calendar.DATE); + switch (riverConfig.getMessageType()) { case STRING: message = XContentFactory.jsonBuilder() @@ -61,13 +62,13 @@ public void addMessagesToBulkProcessor(final MessageAndMetadata messageAndMetada .field("value", new String(messageBytes, "UTF-8")) .endObject() .string(); - request = Requests.indexRequest(riverConfig.getIndexName()). + request = Requests.indexRequest(riverConfig.getIndexName()+sdate). type(riverConfig.getTypeName()). source(message); break; case JSON: final Map messageMap = reader.readValue(messageBytes); - request = Requests.indexRequest(riverConfig.getIndexName()). + request = Requests.indexRequest(riverConfig.getIndexName()+sdate). type(riverConfig.getTypeName()). source(messageMap); }