diff --git a/src/elasticsearch/elasticsearch.cpp b/src/elasticsearch/elasticsearch.cpp index f500b87..b8d8f69 100644 --- a/src/elasticsearch/elasticsearch.cpp +++ b/src/elasticsearch/elasticsearch.cpp @@ -49,60 +49,74 @@ bool ElasticSearch::isActive() { return true; } -// Request the document by index/type/id. -bool ElasticSearch::getDocument(const char* index, const char* type, const char* id, Json::Object& msg){ +void ElasticSearch::readVersionString(){ + Json::Object msg; + if (200 == _http.get("", 0, &msg)){ + if (msg.member("version") && msg["version"].getObject().member("number")){ + _versionString = msg["version"].getObject()["number"].getString(); + } + } +} + +int ElasticSearch::getMajorVersion(){ + return std::stoi(getVersionString()); +} + +const std::string& ElasticSearch::getVersionString(){ + if (_versionString.empty()) { + readVersionString(); + } + return _versionString; +} + +// Request the document by index/_doc/id. +bool ElasticSearch::getDocument(const char* index, const char* id, Json::Object& msg){ std::ostringstream oss; - oss << index << "/" << type << "/" << id; + oss << index << "/_doc/" << id; _http.get(oss.str().c_str(), 0, &msg); return msg["found"]; } -// Request the document by index/type/ query key:value. -void ElasticSearch::getDocument(const std::string& index, const std::string& type, const std::string& key, const std::string& value, Json::Object& msg){ +// Request the document by index query key:value. +void ElasticSearch::getDocument(const std::string& index, const std::string& key, const std::string& value, Json::Object& msg){ std::ostringstream oss; - oss << index << "/" << type << "/_search"; + oss << index << "/_search"; std::stringstream query; query << "{\"query\":{\"match\":{\""<< key << "\":\"" << value << "\"}}}"; _http.post(oss.str().c_str(), query.str().c_str(), &msg); } -/// Delete the document by index/type/id. -bool ElasticSearch::deleteDocument(const char* index, const char* type, const char* id){ +/// Delete the document by index/_doc/id. +bool ElasticSearch::deleteDocument(const char* index, const char* id){ if(_readOnly) return false; std::ostringstream oss; - oss << index << "/" << type << "/" << id; + oss << index << "/_doc/" << id; Json::Object msg; _http.remove(oss.str().c_str(), 0, &msg); - return msg.getValue("found"); + return msg.member("result") && msg.getValue("result").getString().compare("deleted")==0; } -/// Delete the document by index/type. -bool ElasticSearch::deleteAll(const char* index, const char* type){ +/// Delete the document by index +bool ElasticSearch::deleteAll(const char* index){ if(_readOnly) return false; std::ostringstream uri, data; - uri << index << "/" << type << "/_query"; + uri << index << "/_delete_by_query?conflicts=proceed"; data << "{\"query\":{\"match_all\": {}}}"; Json::Object msg; - _http.remove(uri.str().c_str(), data.str().c_str(), &msg); - - if(!msg.member("_indices") || !msg["_indices"].getObject().member(index) || !msg["_indices"].getObject()[index].getObject().member("_shards")) - return false; + _http.post(uri.str().c_str(), data.str().c_str(), &msg); - if(!msg["_indices"].getObject()[index].getObject()["_shards"].getObject().member("failed")) - return false; - - return (msg["_indices"].getObject()[index].getObject()["_shards"].getObject()["failed"].getInt() == 0); + return (msg.member("total") && msg.member("deleted") && (msg["total"].getInt() - msg["deleted"].getInt())==0); } -// Request the document number of type T in index I. -long unsigned int ElasticSearch::getDocumentCount(const char* index, const char* type){ +// Request the number of documents in index. +long unsigned int ElasticSearch::getDocumentCount(const char* index){ std::ostringstream oss; - oss << index << "/" << type << "/_count"; + oss << index << "/_count"; Json::Object msg; _http.get(oss.str().c_str(),0,&msg); @@ -116,12 +130,12 @@ long unsigned int ElasticSearch::getDocumentCount(const char* index, const char* } // Test if document exists -bool ElasticSearch::exist(const std::string& index, const std::string& type, const std::string& id){ +bool ElasticSearch::exist(const std::string& index, const std::string& id){ std::stringstream url; - url << index << "/" << type << "/" << id; + url << index << "/_source/" << id; Json::Object result; - _http.get(url.str().c_str(), 0, &result); + _http.head(url.str().c_str(), 0, &result); if(!result.member("found")){ std::cout << result << std::endl; @@ -132,13 +146,13 @@ bool ElasticSearch::exist(const std::string& index, const std::string& type, con } /// Index a document. -bool ElasticSearch::index(const std::string& index, const std::string& type, const std::string& id, const Json::Object& jData){ +bool ElasticSearch::index(const std::string& index, const std::string& id, const Json::Object& jData){ if(_readOnly) return false; std::stringstream url; - url << index << "/" << type << "/" << id; + url << index << "/_doc/" << id; std::stringstream data; data << jData; @@ -146,13 +160,19 @@ bool ElasticSearch::index(const std::string& index, const std::string& type, con Json::Object result; _http.put(url.str().c_str(), data.str().c_str(), &result); - if(!result.member("created")) - EXCEPTION("The index induces error."); + //ES2 + if (result.member("created") && result.getValue("created")) { + return true; + } - if(result.getValue("created")) + //ES7 + if (result.member("result") && result.getValue("result").getString().compare("created")==0) { return true; + } - std::cout << "endPoint: " << index << "/" << type << "/" << id << std::endl; + EXCEPTION("The index induces error."); + + std::cout << "endPoint: " << index << "/_doc/" << id << std::endl; std::cout << "jData" << jData.pretty() << std::endl; std::cout << "result" << result.pretty() << std::endl; @@ -161,13 +181,13 @@ bool ElasticSearch::index(const std::string& index, const std::string& type, con } /// Index a document with automatic id creation -std::string ElasticSearch::index(const std::string& index, const std::string& type, const Json::Object& jData){ +std::string ElasticSearch::index(const std::string& index, const Json::Object& jData){ if(_readOnly) return ""; std::stringstream url; - url << index << "/" << type << "/"; + url << index << "/_doc/"; std::stringstream data; data << jData; @@ -175,7 +195,8 @@ std::string ElasticSearch::index(const std::string& index, const std::string& ty Json::Object result; _http.post(url.str().c_str(), data.str().c_str(), &result); - if(!result.member("created") || !result.getValue("created")){ + if ((!result.member("created") || !result.getValue("created")) && + (!result.member("result") || result.getValue("result").getString().compare("created")!=0)) { std::cout << "url: " << url.str() << std::endl; std::cout << "data: " << data.str() << std::endl; std::cout << "result: " << result.str() << std::endl; @@ -186,12 +207,12 @@ std::string ElasticSearch::index(const std::string& index, const std::string& ty } // Update a document field. -bool ElasticSearch::update(const std::string& index, const std::string& type, const std::string& id, const std::string& key, const std::string& value){ +bool ElasticSearch::update(const std::string& index, const std::string& id, const std::string& key, const std::string& value){ if(_readOnly) return false; std::stringstream url; - url << index << "/" << type << "/" << id << "/_update"; + url << index << "/_update/" << id; std::stringstream data; data << "{\"doc\":{\"" << key << "\":\""<< value << "\"}}"; @@ -206,12 +227,12 @@ bool ElasticSearch::update(const std::string& index, const std::string& type, co } // Update doccument fields. -bool ElasticSearch::update(const std::string& index, const std::string& type, const std::string& id, const Json::Object& jData){ +bool ElasticSearch::update(const std::string& index, const std::string& id, const Json::Object& jData){ if(_readOnly) return false; std::stringstream url; - url << index << "/" << type << "/" << id << "/_update"; + url << index << "/_update/" << id; std::stringstream data; data << "{\"doc\":" << jData; @@ -227,13 +248,13 @@ bool ElasticSearch::update(const std::string& index, const std::string& type, co } // Update or insert if the document does not already exists. -bool ElasticSearch::upsert(const std::string& index, const std::string& type, const std::string& id, const Json::Object& jData){ +bool ElasticSearch::upsert(const std::string& index, const std::string& id, const Json::Object& jData){ if(_readOnly) return false; std::stringstream url; - url << index << "/" << type << "/" << id << "/_update"; + url << index << "/_update/" << id; std::stringstream data; data << "{\"doc\":" << jData; @@ -249,11 +270,10 @@ bool ElasticSearch::upsert(const std::string& index, const std::string& type, co } /// Search API of ES. -long ElasticSearch::search(const std::string& index, const std::string& type, const std::string& query, Json::Object& result){ +long ElasticSearch::search(const std::string& index, const std::string& query, Json::Object& result){ std::stringstream url; - url << index << "/" << type << "/_search"; - + url << index << "/_search"; _http.post(url.str().c_str(), query.c_str(), &result); @@ -268,14 +288,7 @@ long ElasticSearch::search(const std::string& index, const std::string& type, co EXCEPTION("Search timed out."); } - return result.getValue("hits").getObject().getValue("total").getLong(); -} - -/// Delete given type (and all documents, mappings) -bool ElasticSearch::deleteType(const std::string& index, const std::string& type){ - std::ostringstream uri; - uri << index << "/" << type; - return (200 == _http.remove(uri.str().c_str(), 0, 0)); + return result.getValue("hits").getObject().getValue("total").getObject().getValue("value").getLong(); } // Test if index exists @@ -288,7 +301,7 @@ bool ElasticSearch::createIndex(const std::string& index, const char* data){ return (200 == _http.put(index.c_str(), data, 0)); } -// Delete given index (and all types, documents, mappings) +// Delete given index (and all documents, mappings) bool ElasticSearch::deleteIndex(const std::string& index){ return (200 == _http.remove(index.c_str(), 0, 0)); } @@ -302,22 +315,36 @@ void ElasticSearch::refresh(const std::string& index){ _http.get(oss.str().c_str(), 0, &msg); } -bool ElasticSearch::initScroll(std::string& scrollId, const std::string& index, const std::string& type, const std::string& query, int scrollSize) { +bool ElasticSearch::initScroll(std::string& scrollId, const std::string& index, const std::string& query, Json::Array& resultArray, int scrollSize) { std::ostringstream oss; - oss << index << "/" << type << "/_search?scroll=1m&search_type=scan&size=" << scrollSize; + oss << index << "/_search?scroll=1m&size=" << scrollSize; Json::Object msg; if (200 != _http.post(oss.str().c_str(), query.c_str(), &msg)) return false; scrollId = msg["_scroll_id"].getString(); + + appendHitsToArray(msg, resultArray); return true; } bool ElasticSearch::scrollNext(std::string& scrollId, Json::Array& resultArray) { Json::Object msg; - if (200 != _http.post("/_search/scroll?scroll=1m", scrollId.c_str(), &msg)) - return false; + + if (getMajorVersion() < 7){ + if (200 != _http.post("/_search/scroll?scroll=1m", scrollId.c_str(), &msg)) + return false; + } + else + { + Json::Object body; + body.addMemberByKey("scroll", "1m"); + body.addMemberByKey("scroll_id", scrollId); + + if (200 != _http.post("_search/scroll", body.str().c_str(), &msg)) + return false; + } scrollId = msg["_scroll_id"].getString(); @@ -329,14 +356,14 @@ void ElasticSearch::clearScroll(const std::string& scrollId) { _http.remove("/_search/scroll", scrollId.c_str(), 0); } -int ElasticSearch::fullScan(const std::string& index, const std::string& type, const std::string& query, Json::Array& resultArray, int scrollSize) { +int ElasticSearch::fullScan(const std::string& index, const std::string& query, Json::Array& resultArray, int scrollSize) { resultArray.clear(); std::string scrollId; - if (!initScroll(scrollId, index, type, query, scrollSize)) + if (!initScroll(scrollId, index, query, resultArray, scrollSize)) return 0; - size_t currentSize=0, newSize; + size_t currentSize=resultArray.size(), newSize; while (scrollNext(scrollId, resultArray)) { newSize = resultArray.size(); @@ -371,7 +398,7 @@ bool ElasticSearch::bulk(const char* data, Json::Object& jResult) { BulkBuilder::BulkBuilder() {} -void BulkBuilder::createCommand(const std::string &op, const std::string &index, const std::string &type, const std::string &id = "") { +void BulkBuilder::createCommand(const std::string &op, const std::string &index, const std::string &id = "") { Json::Object command; Json::Object commandParams; @@ -380,39 +407,38 @@ void BulkBuilder::createCommand(const std::string &op, const std::string &index, } commandParams.addMemberByKey("_index", index); - commandParams.addMemberByKey("_type", type); command.addMemberByKey(op, commandParams); operations.push_back(command); } -void BulkBuilder::index(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields) { - createCommand("index", index, type, id); +void BulkBuilder::index(const std::string &index, const std::string &id, const Json::Object &fields) { + createCommand("index", index, id); operations.push_back(fields); } -void BulkBuilder::create(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields) { - createCommand("create", index, type, id); +void BulkBuilder::create(const std::string &index, const std::string &id, const Json::Object &fields) { + createCommand("create", index, id); operations.push_back(fields); } -void BulkBuilder::index(const std::string &index, const std::string &type, const Json::Object &fields) { - createCommand("index", index, type); +void BulkBuilder::index(const std::string &index, const Json::Object &fields) { + createCommand("index", index); operations.push_back(fields); } -void BulkBuilder::create(const std::string &index, const std::string &type, const Json::Object &fields) { - createCommand("create", index, type); +void BulkBuilder::create(const std::string &index, const Json::Object &fields) { + createCommand("create", index); operations.push_back(fields); } -void BulkBuilder::update(const std::string &index, const std::string &type, const std::string &id, const Json::Object &body) { - createCommand("update", index, type, id); +void BulkBuilder::update(const std::string &index, const std::string &id, const Json::Object &body) { + createCommand("update", index, id); operations.push_back(body); } -void BulkBuilder::update_doc(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields, bool upsert) { - createCommand("update", index, type, id); +void BulkBuilder::update_doc(const std::string &index, const std::string &id, const Json::Object &fields, bool upsert) { + createCommand("update", index, id); Json::Object updateFields; updateFields.addMemberByKey("doc", fields); @@ -421,8 +447,8 @@ void BulkBuilder::update_doc(const std::string &index, const std::string &type, operations.push_back(updateFields); } -void BulkBuilder::del(const std::string &index, const std::string &type, const std::string &id) { - createCommand("delete", index, type, id); +void BulkBuilder::del(const std::string &index, const std::string &id) { + createCommand("delete", index, id); } std::string BulkBuilder::str() { diff --git a/src/elasticsearch/elasticsearch.h b/src/elasticsearch/elasticsearch.h index 1c49060..85003d2 100644 --- a/src/elasticsearch/elasticsearch.h +++ b/src/elasticsearch/elasticsearch.h @@ -20,52 +20,51 @@ class ElasticSearch { /// Test connection with node. bool isActive(); + int getMajorVersion(); + const std::string& getVersionString(); + /// Request document number of type T in index I. - long unsigned int getDocumentCount(const char* index, const char* type); + long unsigned int getDocumentCount(const char* index); /// Request the document by index/type/id. - bool getDocument(const char* index, const char* type, const char* id, Json::Object& msg); + bool getDocument(const char* index, const char* id, Json::Object& msg); /// Request the document by index/type/ query key:value. - void getDocument(const std::string& index, const std::string& type, const std::string& key, const std::string& value, Json::Object& msg); + void getDocument(const std::string& index, const std::string& key, const std::string& value, Json::Object& msg); /// Delete the document by index/type/id. - bool deleteDocument(const char* index, const char* type, const char* id); + bool deleteDocument(const char* index, const char* id); /// Delete the document by index/type. - bool deleteAll(const char* index, const char* type); + bool deleteAll(const char* index); /// Test if document exists - bool exist(const std::string& index, const std::string& type, const std::string& id); + bool exist(const std::string& index, const std::string& id); /// Get Id of document - bool getId(const std::string& index, const std::string& type, const std::string& key, const std::string& value, std::string& id); + bool getId(const std::string& index, const std::string& key, const std::string& value, std::string& id); /// Index a document. - bool index(const std::string& index, const std::string& type, const std::string& id, const Json::Object& jData); + bool index(const std::string& index, const std::string& id, const Json::Object& jData); /// Index a document with automatic id creation - std::string index(const std::string& index, const std::string& type, const Json::Object& jData); + std::string index(const std::string& index, const Json::Object& jData); /// Update a document field. - bool update(const std::string& index, const std::string& type, const std::string& id, const std::string& key, const std::string& value); + bool update(const std::string& index, const std::string& id, const std::string& key, const std::string& value); /// Update doccument fields. - bool update(const std::string& index, const std::string& type, const std::string& id, const Json::Object& jData); + bool update(const std::string& index, const std::string& id, const Json::Object& jData); /// Update or insert if the document does not already exists. - bool upsert(const std::string& index, const std::string& type, const std::string& id, const Json::Object& jData); + bool upsert(const std::string& index, const std::string& id, const Json::Object& jData); /// Search API of ES. Specify the doc type. - long search(const std::string& index, const std::string& type, const std::string& query, Json::Object& result); + long search(const std::string& index, const std::string& query, Json::Object& result); // Bulk API bool bulk(const char*, Json::Object& jResult); - public: - /// Delete given type (and all documents, mappings) - bool deleteType(const std::string& index, const std::string& type); - public: /// Test if index exists bool exist(const std::string& index); @@ -81,7 +80,7 @@ class ElasticSearch { public: /// Initialize a scroll search. Use the returned scroll id when calling scrollNext. Size is based on shardSize. Returns false on error - bool initScroll(std::string& scrollId, const std::string& index, const std::string& type, const std::string& query, int scrollSize = 1000); + bool initScroll(std::string& scrollId, const std::string& index, const std::string& query, Json::Array& resultArray, int scrollSize = 1000); /// Scroll to next matches of an initialized scroll search. scroll_id may be updated. End is reached when resultArray.empty() is true (in which scroll is automatically cleared). Returns false on error. bool scrollNext(std::string& scrollId, Json::Array& resultArray); @@ -90,7 +89,7 @@ class ElasticSearch { void clearScroll(const std::string& scrollId); /// Perform a scan to get all results from a query. - int fullScan(const std::string& index, const std::string& type, const std::string& query, Json::Array& resultArray, int scrollSize = 1000); + int fullScan(const std::string& index, const std::string& query, Json::Array& resultArray, int scrollSize = 1000); private: void appendHitsToArray(const Json::Object& msg, Json::Array& resultArray); @@ -98,6 +97,10 @@ class ElasticSearch { private: /// Private constructor. ElasticSearch(); + + void readVersionString(); + + std::string _versionString; /// HTTP Connexion module. HTTP _http; @@ -110,19 +113,27 @@ class BulkBuilder { private: std::vector operations; - void createCommand(const std::string &op, const std::string &index, const std::string &type, const std::string &id); + void createCommand(const std::string &op, const std::string &index, const std::string &id); public: BulkBuilder(); - void index(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields); - void create(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields); - void index(const std::string &index, const std::string &type, const Json::Object &fields); - void create(const std::string &index, const std::string &type, const Json::Object &fields); - void update(const std::string &index, const std::string &type, const std::string &id, const Json::Object &body); - void update_doc(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields, bool update = false); - void del(const std::string &index, const std::string &type, const std::string &id); - void upsert(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields); - void clear(); + void index(const std::string &index, const std::string &id, const Json::Object &fields); + + void create(const std::string &index, const std::string &id, const Json::Object &fields); + + void index(const std::string &index, const Json::Object &fields); + + void create(const std::string &index, const Json::Object &fields); + + void update(const std::string &index, const std::string &id, const Json::Object &body); + + void update_doc(const std::string &index, const std::string &id, const Json::Object &fields, bool update = false); + + void del(const std::string &index, const std::string &id); + + void upsert(const std::string &index, const std::string &id, const Json::Object &fields); + + void clear(); std::string str(); bool isEmpty(); }; diff --git a/src/http/http.cpp b/src/http/http.cpp index 7a80155..8517e94 100644 --- a/src/http/http.cpp +++ b/src/http/http.cpp @@ -734,13 +734,13 @@ unsigned int HTTP::parseMessage(std::string& output, size_t& contentLength, bool size_t headerSize = endHeader + 4 - recvline; // Extract and interpret the header. - char* contentLenghtPos = strstr(endStatus+2,"Content-Length:"); + char* contentLenghtPos = strcasestr(endStatus+2,"Content-Length:"); // If we've got the content-length. if(contentLenghtPos == NULL){ // If the message is chunked, restart the method with the flag on. - if( strstr(endStatus+2,"Transfer-Encoding: chunked") != NULL ){ + if( strcasestr(endStatus+2,"Transfer-Encoding: chunked") != NULL ){ // Set the transfer as chunked. isChunked = true; diff --git a/src/json/json.cpp b/src/json/json.cpp index a083456..31de5e2 100644 --- a/src/json/json.cpp +++ b/src/json/json.cpp @@ -390,6 +390,13 @@ void Json::Value::setArray(const Json::Array& array){ _data = array.str(); } +void Json::Value::appendArrayElement(const Json::Value& val) +{ + assert(_type == arrayType && _array != 0); + _array->addElement(val); + _data = _array->str(); +} + const char* Json::Value::read(const char* pCursor, const char* pEnd){ // Call this function only once. @@ -798,6 +805,20 @@ void Json::Object::append(const Json::Object& obj){ } } +void Json::Object::appendArrayElement(const std::string& key, const Json::Value& val) +{ + if (!member(key)) + { + Json::Array array; + array.addElement(val); + addMemberByKey(key, array); + } + else + { + const_cast(getValue(key)).appendArrayElement(val); + } +} + // Return the value of the member[key], key must exist in the map. const Json::Value& Json::Object::getValue(const std::string& key) const { @@ -825,8 +846,8 @@ std::string Json::Object::str() const { bool Json::Object::contain(const Object& o) const { - for(const std::pair< Key, Value >& p : o._memberMap){ - std::map< Key, Value >::const_iterator cit = _memberMap.find(p.first); + for(const std::pair& p : o._memberMap){ + std::map::const_iterator cit = _memberMap.find(p.first); if( cit == _memberMap.end() ){ return false; } diff --git a/src/json/json.h b/src/json/json.h index 37a4f54..9b85c1c 100644 --- a/src/json/json.h +++ b/src/json/json.h @@ -65,6 +65,7 @@ class Value { Value(); Value(const Value& val); ~Value(); + Value& operator=(const Value& other) = default; const char* showType() const; const char* read(const char* pStart, const char* pEnd); @@ -131,6 +132,9 @@ class Value { /// Set this value as an int. void setLong(long l); + /// append element to existing array. + void appendArrayElement(const Json::Value& val); + /// Give access to member for this operator. friend std::ostream& operator<<(std::ostream& os, const Value& value); @@ -213,6 +217,9 @@ class Object { /// Append another object to this one. void append(const Object& obj); + /// append element to array. + void appendArrayElement(const std::string& key, const Json::Value& val); + /// Return the value of the member[key], key must exist in the map. const Value& getValue(const std::string& key) const;