Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 103 additions & 77 deletions src/elasticsearch/elasticsearch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,60 +49,74 @@ bool ElasticSearch::isActive() {
return true;
}

// Request the document by index/type/id.
bool ElasticSearch::getDocument(const char* index, const char* type, const char* id, Json::Object& msg){
void ElasticSearch::readVersionString(){
Json::Object msg;
if (200 == _http.get("", 0, &msg)){
if (msg.member("version") && msg["version"].getObject().member("number")){
_versionString = msg["version"].getObject()["number"].getString();
}
}
}

int ElasticSearch::getMajorVersion(){
return std::stoi(getVersionString());
}

const std::string& ElasticSearch::getVersionString(){
if (_versionString.empty()) {
readVersionString();
}
return _versionString;
}

// Request the document by index/_doc/id.
bool ElasticSearch::getDocument(const char* index, const char* id, Json::Object& msg){
std::ostringstream oss;
oss << index << "/" << type << "/" << id;
oss << index << "/_doc/" << id;
_http.get(oss.str().c_str(), 0, &msg);
return msg["found"];
}

// Request the document by index/type/ query key:value.
void ElasticSearch::getDocument(const std::string& index, const std::string& type, const std::string& key, const std::string& value, Json::Object& msg){
// Request the document by index query key:value.
void ElasticSearch::getDocument(const std::string& index, const std::string& key, const std::string& value, Json::Object& msg){
std::ostringstream oss;
oss << index << "/" << type << "/_search";
oss << index << "/_search";
std::stringstream query;
query << "{\"query\":{\"match\":{\""<< key << "\":\"" << value << "\"}}}";
_http.post(oss.str().c_str(), query.str().c_str(), &msg);
}

/// Delete the document by index/type/id.
bool ElasticSearch::deleteDocument(const char* index, const char* type, const char* id){
/// Delete the document by index/_doc/id.
bool ElasticSearch::deleteDocument(const char* index, const char* id){
if(_readOnly)
return false;

std::ostringstream oss;
oss << index << "/" << type << "/" << id;
oss << index << "/_doc/" << id;
Json::Object msg;
_http.remove(oss.str().c_str(), 0, &msg);

return msg.getValue("found");
return msg.member("result") && msg.getValue("result").getString().compare("deleted")==0;
}

/// Delete the document by index/type.
bool ElasticSearch::deleteAll(const char* index, const char* type){
/// Delete the document by index
bool ElasticSearch::deleteAll(const char* index){
if(_readOnly)
return false;

std::ostringstream uri, data;
uri << index << "/" << type << "/_query";
uri << index << "/_delete_by_query?conflicts=proceed";
data << "{\"query\":{\"match_all\": {}}}";
Json::Object msg;
_http.remove(uri.str().c_str(), data.str().c_str(), &msg);

if(!msg.member("_indices") || !msg["_indices"].getObject().member(index) || !msg["_indices"].getObject()[index].getObject().member("_shards"))
return false;
_http.post(uri.str().c_str(), data.str().c_str(), &msg);

if(!msg["_indices"].getObject()[index].getObject()["_shards"].getObject().member("failed"))
return false;

return (msg["_indices"].getObject()[index].getObject()["_shards"].getObject()["failed"].getInt() == 0);
return (msg.member("total") && msg.member("deleted") && (msg["total"].getInt() - msg["deleted"].getInt())==0);
}

// Request the document number of type T in index I.
long unsigned int ElasticSearch::getDocumentCount(const char* index, const char* type){
// Request the number of documents in index.
long unsigned int ElasticSearch::getDocumentCount(const char* index){
std::ostringstream oss;
oss << index << "/" << type << "/_count";
oss << index << "/_count";
Json::Object msg;
_http.get(oss.str().c_str(),0,&msg);

Expand All @@ -116,12 +130,12 @@ long unsigned int ElasticSearch::getDocumentCount(const char* index, const char*
}

// Test if document exists
bool ElasticSearch::exist(const std::string& index, const std::string& type, const std::string& id){
bool ElasticSearch::exist(const std::string& index, const std::string& id){
std::stringstream url;
url << index << "/" << type << "/" << id;
url << index << "/_source/" << id;

Json::Object result;
_http.get(url.str().c_str(), 0, &result);
_http.head(url.str().c_str(), 0, &result);

if(!result.member("found")){
std::cout << result << std::endl;
Expand All @@ -132,27 +146,33 @@ bool ElasticSearch::exist(const std::string& index, const std::string& type, con
}

/// Index a document.
bool ElasticSearch::index(const std::string& index, const std::string& type, const std::string& id, const Json::Object& jData){
bool ElasticSearch::index(const std::string& index, const std::string& id, const Json::Object& jData){

if(_readOnly)
return false;

std::stringstream url;
url << index << "/" << type << "/" << id;
url << index << "/_doc/" << id;

std::stringstream data;
data << jData;

Json::Object result;
_http.put(url.str().c_str(), data.str().c_str(), &result);

if(!result.member("created"))
EXCEPTION("The index induces error.");
//ES2
if (result.member("created") && result.getValue("created")) {
return true;
}

if(result.getValue("created"))
//ES7
if (result.member("result") && result.getValue("result").getString().compare("created")==0) {
return true;
}

std::cout << "endPoint: " << index << "/" << type << "/" << id << std::endl;
EXCEPTION("The index induces error.");

std::cout << "endPoint: " << index << "/_doc/" << id << std::endl;
std::cout << "jData" << jData.pretty() << std::endl;
std::cout << "result" << result.pretty() << std::endl;

Expand All @@ -161,21 +181,22 @@ bool ElasticSearch::index(const std::string& index, const std::string& type, con
}

/// Index a document with automatic id creation
std::string ElasticSearch::index(const std::string& index, const std::string& type, const Json::Object& jData){
std::string ElasticSearch::index(const std::string& index, const Json::Object& jData){

if(_readOnly)
return "";

std::stringstream url;
url << index << "/" << type << "/";
url << index << "/_doc/";

std::stringstream data;
data << jData;

Json::Object result;
_http.post(url.str().c_str(), data.str().c_str(), &result);

if(!result.member("created") || !result.getValue("created")){
if ((!result.member("created") || !result.getValue("created")) &&
(!result.member("result") || result.getValue("result").getString().compare("created")!=0)) {
std::cout << "url: " << url.str() << std::endl;
std::cout << "data: " << data.str() << std::endl;
std::cout << "result: " << result.str() << std::endl;
Expand All @@ -186,12 +207,12 @@ std::string ElasticSearch::index(const std::string& index, const std::string& ty
}

// Update a document field.
bool ElasticSearch::update(const std::string& index, const std::string& type, const std::string& id, const std::string& key, const std::string& value){
bool ElasticSearch::update(const std::string& index, const std::string& id, const std::string& key, const std::string& value){
if(_readOnly)
return false;

std::stringstream url;
url << index << "/" << type << "/" << id << "/_update";
url << index << "/_update/" << id;

std::stringstream data;
data << "{\"doc\":{\"" << key << "\":\""<< value << "\"}}";
Expand All @@ -206,12 +227,12 @@ bool ElasticSearch::update(const std::string& index, const std::string& type, co
}

// Update doccument fields.
bool ElasticSearch::update(const std::string& index, const std::string& type, const std::string& id, const Json::Object& jData){
bool ElasticSearch::update(const std::string& index, const std::string& id, const Json::Object& jData){
if(_readOnly)
return false;

std::stringstream url;
url << index << "/" << type << "/" << id << "/_update";
url << index << "/_update/" << id;

std::stringstream data;
data << "{\"doc\":" << jData;
Expand All @@ -227,13 +248,13 @@ bool ElasticSearch::update(const std::string& index, const std::string& type, co
}

// Update or insert if the document does not already exists.
bool ElasticSearch::upsert(const std::string& index, const std::string& type, const std::string& id, const Json::Object& jData){
bool ElasticSearch::upsert(const std::string& index, const std::string& id, const Json::Object& jData){

if(_readOnly)
return false;

std::stringstream url;
url << index << "/" << type << "/" << id << "/_update";
url << index << "/_update/" << id;

std::stringstream data;
data << "{\"doc\":" << jData;
Expand All @@ -249,11 +270,10 @@ bool ElasticSearch::upsert(const std::string& index, const std::string& type, co
}

/// Search API of ES.
long ElasticSearch::search(const std::string& index, const std::string& type, const std::string& query, Json::Object& result){
long ElasticSearch::search(const std::string& index, const std::string& query, Json::Object& result){

std::stringstream url;
url << index << "/" << type << "/_search";

url << index << "/_search";

_http.post(url.str().c_str(), query.c_str(), &result);

Expand All @@ -268,14 +288,7 @@ long ElasticSearch::search(const std::string& index, const std::string& type, co
EXCEPTION("Search timed out.");
}

return result.getValue("hits").getObject().getValue("total").getLong();
}

/// Delete given type (and all documents, mappings)
bool ElasticSearch::deleteType(const std::string& index, const std::string& type){
std::ostringstream uri;
uri << index << "/" << type;
return (200 == _http.remove(uri.str().c_str(), 0, 0));
return result.getValue("hits").getObject().getValue("total").getObject().getValue("value").getLong();
}

// Test if index exists
Expand All @@ -288,7 +301,7 @@ bool ElasticSearch::createIndex(const std::string& index, const char* data){
return (200 == _http.put(index.c_str(), data, 0));
}

// Delete given index (and all types, documents, mappings)
// Delete given index (and all documents, mappings)
bool ElasticSearch::deleteIndex(const std::string& index){
return (200 == _http.remove(index.c_str(), 0, 0));
}
Expand All @@ -302,22 +315,36 @@ void ElasticSearch::refresh(const std::string& index){
_http.get(oss.str().c_str(), 0, &msg);
}

bool ElasticSearch::initScroll(std::string& scrollId, const std::string& index, const std::string& type, const std::string& query, int scrollSize) {
bool ElasticSearch::initScroll(std::string& scrollId, const std::string& index, const std::string& query, Json::Array& resultArray, int scrollSize) {
std::ostringstream oss;
oss << index << "/" << type << "/_search?scroll=1m&search_type=scan&size=" << scrollSize;
oss << index << "/_search?scroll=1m&size=" << scrollSize;

Json::Object msg;
if (200 != _http.post(oss.str().c_str(), query.c_str(), &msg))
return false;

scrollId = msg["_scroll_id"].getString();

appendHitsToArray(msg, resultArray);
return true;
}

bool ElasticSearch::scrollNext(std::string& scrollId, Json::Array& resultArray) {
Json::Object msg;
if (200 != _http.post("/_search/scroll?scroll=1m", scrollId.c_str(), &msg))
return false;

if (getMajorVersion() < 7){
if (200 != _http.post("/_search/scroll?scroll=1m", scrollId.c_str(), &msg))
return false;
}
else
{
Json::Object body;
body.addMemberByKey("scroll", "1m");
body.addMemberByKey("scroll_id", scrollId);

if (200 != _http.post("_search/scroll", body.str().c_str(), &msg))
return false;
}

scrollId = msg["_scroll_id"].getString();

Expand All @@ -329,14 +356,14 @@ void ElasticSearch::clearScroll(const std::string& scrollId) {
_http.remove("/_search/scroll", scrollId.c_str(), 0);
}

int ElasticSearch::fullScan(const std::string& index, const std::string& type, const std::string& query, Json::Array& resultArray, int scrollSize) {
int ElasticSearch::fullScan(const std::string& index, const std::string& query, Json::Array& resultArray, int scrollSize) {
resultArray.clear();

std::string scrollId;
if (!initScroll(scrollId, index, type, query, scrollSize))
if (!initScroll(scrollId, index, query, resultArray, scrollSize))
return 0;

size_t currentSize=0, newSize;
size_t currentSize=resultArray.size(), newSize;
while (scrollNext(scrollId, resultArray))
{
newSize = resultArray.size();
Expand Down Expand Up @@ -371,7 +398,7 @@ bool ElasticSearch::bulk(const char* data, Json::Object& jResult) {

BulkBuilder::BulkBuilder() {}

void BulkBuilder::createCommand(const std::string &op, const std::string &index, const std::string &type, const std::string &id = "") {
void BulkBuilder::createCommand(const std::string &op, const std::string &index, const std::string &id = "") {
Json::Object command;
Json::Object commandParams;

Expand All @@ -380,39 +407,38 @@ void BulkBuilder::createCommand(const std::string &op, const std::string &index,
}

commandParams.addMemberByKey("_index", index);
commandParams.addMemberByKey("_type", type);

command.addMemberByKey(op, commandParams);
operations.push_back(command);
}

void BulkBuilder::index(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields) {
createCommand("index", index, type, id);
void BulkBuilder::index(const std::string &index, const std::string &id, const Json::Object &fields) {
createCommand("index", index, id);
operations.push_back(fields);
}

void BulkBuilder::create(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields) {
createCommand("create", index, type, id);
void BulkBuilder::create(const std::string &index, const std::string &id, const Json::Object &fields) {
createCommand("create", index, id);
operations.push_back(fields);
}

void BulkBuilder::index(const std::string &index, const std::string &type, const Json::Object &fields) {
createCommand("index", index, type);
void BulkBuilder::index(const std::string &index, const Json::Object &fields) {
createCommand("index", index);
operations.push_back(fields);
}

void BulkBuilder::create(const std::string &index, const std::string &type, const Json::Object &fields) {
createCommand("create", index, type);
void BulkBuilder::create(const std::string &index, const Json::Object &fields) {
createCommand("create", index);
operations.push_back(fields);
}

void BulkBuilder::update(const std::string &index, const std::string &type, const std::string &id, const Json::Object &body) {
createCommand("update", index, type, id);
void BulkBuilder::update(const std::string &index, const std::string &id, const Json::Object &body) {
createCommand("update", index, id);
operations.push_back(body);
}

void BulkBuilder::update_doc(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields, bool upsert) {
createCommand("update", index, type, id);
void BulkBuilder::update_doc(const std::string &index, const std::string &id, const Json::Object &fields, bool upsert) {
createCommand("update", index, id);

Json::Object updateFields;
updateFields.addMemberByKey("doc", fields);
Expand All @@ -421,8 +447,8 @@ void BulkBuilder::update_doc(const std::string &index, const std::string &type,
operations.push_back(updateFields);
}

void BulkBuilder::del(const std::string &index, const std::string &type, const std::string &id) {
createCommand("delete", index, type, id);
void BulkBuilder::del(const std::string &index, const std::string &id) {
createCommand("delete", index, id);
}

std::string BulkBuilder::str() {
Expand Down
Loading