diff --git a/.TODO b/.TODO deleted file mode 100644 index 9e7894d..0000000 --- a/.TODO +++ /dev/null @@ -1,12 +0,0 @@ -DONE # set publishers instances (p1,p2,..) to get topic and OS as an argument ( for scripting purposes ) -DONE # subscriber's register mechanism -DONE # generate / decide network topology for mininet. ( to test and showcase our code ) - # add a logging feature to be able to see outputs ( for when we run it in mininet ) -DONE # we should assume that publishers might fail and prepare for it. implement heartbeat -DONE # unregister method and recalculation of new dominant publishers - -# write a readme for the grader to see -# Do end-to-end measurements (time between publication and receipt of info; since the clock is the same on all emulated hosts, we do not have the issue of clocks drifting apart from each other). -DONE # Complete the history container, which is able to hold message of different topic respectively - -*take a look at the next assignment. diff --git a/classes/publisher.py b/classes/publisher.py index 5443f7f..2ac422f 100644 --- a/classes/publisher.py +++ b/classes/publisher.py @@ -30,27 +30,20 @@ def __init__(self, knownEsAddress, strength ,topic): self.strength = strength def register(self,serverAddress): - # TODO address = lookup(self.topic) self.socket.disconnect("tcp://" + self.knownEsAddress) self.socket.connect("tcp://" + serverAddress) heartbeatClient(self.pId,serverAddress).start() msg = {'msgType':'publisherRegisterReq','pId':self.pId,'address':self.addr, 'topic':self.topic,'os':self.strength} self.socket.send_string(json.dumps(msg)) self.socket.recv() - # self.socket.send_string("rp{}-{}, {}, {}".format(self.pId, self.addr,self.topic,self.strength)) logger.info('register request sent') def lookup(self,key): - # TODO call to any known eventservice to findout where it should register. - # return: ES address (ip:port) msg = {'msgType':'nodeLookup', 'key': key} self.socket.send_string(json.dumps(msg)) designatedServer = self.socket.recv() print('designated server:' , designatedServer) - - self.register(designatedServer) return designatedServer - # TODO go register to the designate diff --git a/classes/subscriber.py b/classes/subscriber.py index 2126229..db67c8f 100644 --- a/classes/subscriber.py +++ b/classes/subscriber.py @@ -29,24 +29,19 @@ def __init__(self,esAddr = "127.0.0.1:5555"): # logging.basicConfig(filename="log/{}.log".format('S' + self.addr),level=logging.DEBUG) def register(self,topic, serverAddress): - # TODO address = lookup(topic) self.socket.disconnect("tcp://" + getPubFromAddress(self.knownEsAddress)) self.socket.connect("tcp://" + serverAddress) msg = {'msgType':'subscriberRegisterReq','sId':self.sId,'address':self.addr, 'topic':topic} self.reqSocket.send_string(json.dumps(msg)) self.reqSocket.recv() - # self.reqSocket.send_string("rs{}-{}, {}".format(self.sId, self.addr,topic)) logger.info( 'register req sent') def lookup(self,key): - # TODO call to any known eventservice to findout where it should register. - # return: ES address (ip:port) msg = {'msgType':'nodeLookup', 'key': key} self.reqSocket.send_string(json.dumps(msg)) designatedServer = self.reqSocket.recv() print('designated server:' , designatedServer) return designatedServer - # TODO go register to the designate def subscribe(self, sFilter): # any subscriber must use the SUBSCRIBE to set a subscription, i.e., tell the diff --git a/p1.py b/p1.py index 1e6f99d..a0e8780 100644 --- a/p1.py +++ b/p1.py @@ -17,7 +17,8 @@ topic = 'book' p1 = Publisher(eventserver_address,owner_strength,topic) -p1.lookup(topic) +srvAddr = p1.lookup(topic) +p1.register(srvAddr) # keep publishing while True: body = "{}".format(randint(0,9))