@@ -386,6 +386,20 @@ Baton KafkaConsumer::Position(std::vector<RdKafka::TopicPartition*> &toppars) {
386386 return Baton (err);
387387}
388388
389+
390+ Baton KafkaConsumer::AssigmentLost () {
391+ if (!IsConnected ()) {
392+ return Baton (RdKafka::ERR__STATE, " KafkaConsumer is not connected" );
393+ }
394+
395+ RdKafka::KafkaConsumer* consumer =
396+ dynamic_cast <RdKafka::KafkaConsumer*>(m_client);
397+
398+ // XXX: Returning a bool by casting it to a pointer,
399+ return Baton (reinterpret_cast <void *>(
400+ static_cast <uintptr_t >(consumer->assignment_lost () ? true : false )));
401+ }
402+
389403Baton KafkaConsumer::Subscription () {
390404 if (!IsConnected ()) {
391405 return Baton (RdKafka::ERR__STATE, " Consumer is not connected" );
@@ -585,6 +599,7 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
585599
586600 Nan::SetPrototypeMethod (tpl, " committed" , NodeCommitted);
587601 Nan::SetPrototypeMethod (tpl, " position" , NodePosition);
602+ Nan::SetPrototypeMethod (tpl, " assignemntLost" , NodeAssignmentLost);
588603 Nan::SetPrototypeMethod (tpl, " assign" , NodeAssign);
589604 Nan::SetPrototypeMethod (tpl, " incrementalAssign" , NodeIncrementalAssign);
590605 Nan::SetPrototypeMethod (tpl, " unassign" , NodeUnassign);
@@ -745,6 +760,20 @@ NAN_METHOD(KafkaConsumer::NodePosition) {
745760 RdKafka::TopicPartition::destroy (toppars);
746761}
747762
763+ NAN_METHOD (KafkaConsumer::NodeAssignmentLost) {
764+
765+ Nan::HandleScope scope;
766+
767+ KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This ());
768+
769+ Baton b = consumer->AssigmentLost ();
770+ if (b.err () != RdKafka::ERR_NO_ERROR) {
771+ Nan::ThrowError (RdKafka::err2str (b.err ()).c_str ());
772+ }
773+ bool result = static_cast <bool >(reinterpret_cast <uintptr_t >(b.data <void *>()));
774+ info.GetReturnValue ().Set (Nan::New<v8::Boolean>(result));
775+ }
776+
748777NAN_METHOD (KafkaConsumer::NodeAssignments) {
749778 Nan::HandleScope scope;
750779
0 commit comments