Skip to content

Commit d92ccee

Browse files
committed
Broadcasting is implemented.
1 parent 52c78a5 commit d92ccee

File tree

3 files changed

+66
-36
lines changed

3 files changed

+66
-36
lines changed

korobool/chat/Threaded.py

+30-12
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ def __init__(self, observer, conn, addr):
1313
self.name = 'no_name'
1414
self.observer = observer
1515
self.commands_queue = deque() # each client has independent commands queue
16-
self.__local_sync = threading.Lock() # synchronization between main and client thread
16+
self.__local_sync = threading.RLock() # synchronization between main and client thread
1717
self.conn = conn
1818
self.addr = addr
1919
self.thread = threading.Thread(target=ServingThreadWrapper.__serve, args=(self,))
@@ -24,20 +24,25 @@ def __init__(self, observer, conn, addr):
2424
def send(self, data):
2525
with self.__local_sync:
2626
self.commands_queue.appendleft(data)
27+
#self.__process_server_messages_queue()
2728

2829
# Thread safe first idle message
2930
def post(self, data):
3031
with self.__local_sync:
3132
self.commands_queue.append(data)
33+
#self.__process_server_messages_queue()
3234

3335
# Thread safe pop command operation
3436
def pop_command(self):
3537
with self.__local_sync:
3638
if len(self.commands_queue) > 0:
3739
return self.commands_queue.pop()
40+
else:
41+
return None
3842

3943
# Thread-safe sever notifier
4044
def notify(self, message):
45+
print('try to notify observer')
4146
with ServingThreadWrapper.__global_sync:
4247
self.observer.notify(self, message)
4348

@@ -53,30 +58,35 @@ def close(self):
5358
@staticmethod
5459
def __serve(stw):
5560
print('Connection attempt', stw.addr)
61+
stw.conn.settimeout(1)
5662
while not stw.closing:
5763
ServingThreadWrapper.__process_server_messages_queue(stw) # It might be good idea to move this to new thread
5864
command = ServingThreadWrapper.__receive_and_parse_client_command(stw)
5965
ServingThreadWrapper.__process_command(stw, command)
6066

6167
@staticmethod
6268
def __process_server_messages_queue(stw):
63-
package = stw.pop_command()
64-
if package is None:
65-
return
6669

67-
data = json.dumps(package)
68-
69-
b = bytes(data, 'utf-8')
70-
stw.conn.sendall(struct.pack('i', len(data)))
70+
package = stw.pop_command()
7171

72-
stw.conn.sendall(b)
73-
print('sent data...' , b.decode('utf-8'))
72+
while not package is None:
73+
data = json.dumps(package)
74+
b = bytes(data, 'utf-8')
75+
stw.conn.sendall(struct.pack('i', len(data)))
76+
stw.conn.sendall(b)
77+
print('sent data...' , b.decode('utf-8'))
78+
package = stw.pop_command()
7479

7580
@staticmethod
7681
def __receive_and_parse_client_command(stw):
7782

78-
header = stw.conn.recv(4)
79-
if not header: return
83+
header = None
84+
try:
85+
header = stw.conn.recv(4)
86+
if not header: return
87+
except:
88+
# I know that it is bad idea to swallow exceptions, this is temporary trick
89+
return
8090

8191
size = int(struct.unpack('i', header)[0])
8292
print('receiving bytes:', size)
@@ -99,6 +109,14 @@ def __receive_and_parse_client_command(stw):
99109

100110
@staticmethod
101111
def __process_command(stw, command):
112+
if command is None: return
102113
print('new command from client received', command['cmd'])
103114
if command['cmd'] == 'CMD_PING':
104115
stw.send({'cmd': 'CMD_PONG', 'msg': 'pong from server'})
116+
117+
if command['cmd'] == 'CMD_BROADCAST':
118+
stw.notify(command)
119+
120+
if command['cmd'] == 'CMD_MESSAGE':
121+
stw.notify(command)
122+

korobool/chat/chat_client.py

+26-13
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import threading
2+
13
__author__ = 'Oleksandr Korobov'
24

35
import socket
@@ -9,39 +11,50 @@
911
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1012
s.connect((HOST, PORT))
1113

12-
def receive_and_parse_command():
1314

14-
header = s.recv(4)
15-
if not header: return
15+
def receive_and_parse_command():
16+
while True:
17+
header = s.recv(4)
18+
if not header: return
1619

17-
size = int(struct.unpack('i', header)[0])
18-
print('receiving bytes:', size)
20+
size = int(struct.unpack('i', header)[0])
21+
print('receiving bytes:', size)
1922

20-
body = s.recv(size).decode('utf-8')
23+
body = s.recv(size).decode('utf-8')
24+
command = json.loads(body)
2125

22-
#print('!DATA!\n', body)
26+
print('data from server', command)
2327

24-
command = json.loads(body)
28+
read_thread = threading.Thread(target=receive_and_parse_command)
29+
read_thread.start()
2530

26-
return command
31+
print("Enter commands: ")
2732

2833
while True:
29-
command_text = input("Enter command: ")
34+
command_text = input()
3035

3136
package = None
3237

3338
if command_text.upper() == 'PING':
3439
package = {'cmd': 'CMD_PING'}
3540

41+
if command_text.upper() == 'BROADCAST':
42+
message = input("Enter broadcast message: ")
43+
package = {'cmd': 'CMD_BROADCAST', 'msg': message}
44+
45+
if command_text.upper() == 'MESSAGE':
46+
print('Not supported by server yet')
47+
user_id = input("Enter user id: ")
48+
message = input("Enter message: ")
49+
package = {'cmd': 'CMD_MESSAGE', 'msg': message}
50+
3651
data = json.dumps(package)
3752

3853
b = bytes(data, 'utf-8')
3954
s.sendall(struct.pack('i', len(data)))
4055

4156
s.sendall(b)
4257

43-
#print('sent data...' , b.decode('utf-8'))
44-
#s.sendall(b)
45-
print(receive_and_parse_command())
4658

59+
#read_thread.join()
4760
#s.close()

korobool/chat/sever.py

+10-11
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ def __close_all_clients(self):
2626
def serve(self):
2727
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2828
self.socket.bind((self.host, self.port))
29+
2930
while not self.closing:
3031
self.socket.listen(5)
3132
conn, addr = self.socket.accept()
@@ -34,25 +35,23 @@ def serve(self):
3435

3536
def stop_serve(self):
3637
self.closing = True
37-
print('Not implemented yet...')
38+
print('Proper server closing is not implemented yet...')
3839

3940
def notify(self, sender, message):
4041
print('Notification received:', message)
41-
# if message[0] == 'CMD_STOP':
42-
# self.__close_all_clients()
43-
# self.socket.close()
44-
#
45-
# if message[0] == 'CMD_GONE':
46-
# print('Not implemented yet')
47-
#
48-
# if message[0] == 'CMD_MSGE':
49-
# print('Not implemented yet')
42+
if message['cmd'] == 'CMD_BROADCAST':
43+
for client in self.__clients_pool:
44+
if not client is None and not client.closing:
45+
#print('Posting to '. client.name)
46+
client.post(message)
47+
if message['cmd'] == 'CMD_MESSAGE':
48+
client.post({'cmd': 'CMD_SEVER_WARNING', 'msg': 'message sending is not implemented yet'})
5049

5150

5251
chat_sever = ChatServer(PORT = 50007)
5352

5453
def signal_handler(signal, frame):
55-
print('Wait... Server is closing now....')
54+
print('CTRL+C caught. Wait... Server is closing now....')
5655
chat_sever.stop_serve()
5756
sys.exit(0)
5857

0 commit comments

Comments
 (0)