Skip to content

... #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Dec 13, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 47 additions & 48 deletions lud4ik/command_client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import os
import signal
import socket
import threading
import logging

from work.utils import format_reply
from work.protocol import Feeder, Packet
from work.models import cmd
from work.utils import configure_logging
from work.cmdargs import get_cmd_args
from work.exceptions import ClientFinishException

Expand All @@ -15,9 +17,9 @@ def shutdown_handler(signum, frame):
class CommandClient:

session_id = None
TIMEOUT = 10.0
reply_commands = ['connected', 'pong', 'pongd', 'ackquit', 'ackfinish']
print_reply_commands = ['pong', 'pongd']
TIMEOUT = 1.0
CHUNK_SIZE = 1024
commands = [cmd.CONNECTED, cmd.PONG, cmd.PONGD, cmd.ACKQUIT, cmd.ACKFINISH]

def __init__(self, host, port):
self.socket = socket.socket(socket.AF_INET,
Expand All @@ -29,64 +31,61 @@ def __init__(self, host, port):
def run_client(cls, host, port):
client = cls(host, port)
try:
handler = signal.signal(signal.SIGINT, shutdown_handler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Мелочь, но эту строчку нужно вынести за try.
Потому что есть signa.signal вывалится с ошибкой то в finally возвращать старый обработчик не нужно (он и так стоит), да и не из чего (handler еще не присвоет и просто не существует).
Аналогично, например, нужно делать с блокировками без with:

l = RLock()
# ....
l.acquire()
try:
  do_something()
finally:
  l.release()

client.run()
signal.signal(signal.SIGUSR1, shutdown_handler)
except ClientFinishException:
except (OSError, socket.timeout, ClientFinishException):
client.shutdown()
finally:
pass
signal.signal(signal.SIGINT, handler)

def run(self):
self.thread = threading.Thread(target=self.recv_response)
self.thread.start()
self.feeder = Feeder(self.commands)
while True:
command = input()
command_name = command.split()[0]
command = command.replace(' ', '\n')
self.socket.sendall(format_reply(command))
command = input().split()
kwargs = {}
cmd_input = getattr(cmd, command[0].upper())
if cmd_input == cmd.PINGD:
kwargs['data'] = command[1]
packet = eval('{}(**kwargs).pack()'.format(command[0]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Можешь написать это без eval?

self.socket.sendall(packet)
self.recv_response()

def recv_response(self):
tail = bytes()
while True:
msg = self.get_reply()
parts = msg.split('\n')
command_name = parts[0]
if command_name in self.print_reply_commands:
print(msg)
elif command_name == 'connected':
if parts[-1].startswith('session'):
self.session_id = parts[-1][7:]
print(msg)
elif command_name == 'ackquit':
if parts[-1] == self.session_id:
self.close()
else:
print(msg)
elif command_name == 'ackfinish':
self.close()

def get_reply(self):
msg = bytes()
msg_len = int(self.socket.recv(4))
while len(msg) < msg_len:
try:
chunk = self.socket.recv(msg_len - len(msg))
except socket.timeout:
self.close()
msg += chunk
msg = msg.decode('utf-8')
return msg

def close(self):
os.kill(os.getpid(), signal.SIGUSR1)
chunk = tail + self.socket.recv(self.CHUNK_SIZE)
packet, tail = self.feeder.feed(chunk)
if not packet:
continue
else:
getattr(self, packet.__class__.__name__.lower())(packet)
break

def connected(self, packet):
self.session = packet.session
print('{} {}'.format(packet.cmd, packet.session))

def pong(self, packet):
print(packet.cmd)

def pongd(self, packet):
print('{} {}'.format(packet.cmd, packet.data))

def ackquit(self, packet):
print('{} {}'.format(packet.cmd, packet.session))
self.shutdown()

def ackfinish(self, packet):
print(packet.cmd)
self.shutdown()

def shutdown(self):
self.socket.close()
print('socket closed')
self.thread.join()
print('thread closed')
logging.info('socket closed')
raise SystemExit()


if __name__ == '__main__':
configure_logging('Client')
args = get_cmd_args()
CommandClient.run_client(args.host, args.port)
136 changes: 73 additions & 63 deletions lud4ik/command_server.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import os
import os.path
import socket
import signal
import logging
import threading
from operator import attrgetter
from collections import namedtuple

from work.protocol import Feeder
from work.models import cmd
from work.cmdargs import get_cmd_args
from work.exceptions import ServerFinishException
from work.utils import format_reply, get_random_hash
from work.utils import (get_random_hash,
handle_timeout,
get_keyword_args,
configure_logging)


def shutdown_handler(signum, frame):
Expand All @@ -17,105 +24,108 @@ def shutdown_handler(signum, frame):
class CommandServer:

MAX_CONN = 5
TIMEOUT = 100.0
TIMEOUT = 1.0
CHUNK_SIZE = 1024
PID_FILE = 'server.pid'
clients = {}
commands = ['connect', 'ping', 'pingd', 'quit', 'finish']
single_reply_commands = ['ping', 'pingd']
commands = [cmd.CONNECT, cmd.PING, cmd.PINGD, cmd.QUIT, cmd.FINISH]
templ = namedtuple('templ', 'addr, thread, session')

def __init__(self, host, port):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.settimeout(self.TIMEOUT)
self.socket.bind((host, port))

@classmethod
def run_server(cls, host, port):
server = cls(host, port)
try:
handler = signal.signal(signal.SIGINT, shutdown_handler)
with open(cls.PID_FILE, 'w') as f:
f.write(str(os.getpid()))
server.run()
signal.signal(signal.SIGUSR1, shutdown_handler)
except ServerFinishException:
except (ServerFinishException, OSError):
server.shutdown()
finally:
pass
signal.signal(signal.SIGINT, handler)

def run(self):
self.socket.listen(self.MAX_CONN)
while True:
self.socket.listen(self.MAX_CONN)
conn, addr = self.socket.accept()
th = threading.Thread(target=self.run_client, args=(conn, ))
self.clients[conn] = self.templ(addr=addr, thread=th,
session=get_random_hash())
th.start()
with handle_timeout():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это ты удачно придумала.
handle_timeout, конечно, сейчас совсем простой — но то что попробовала такой подход это хорошо.

conn, addr = self.socket.accept()
th = threading.Thread(target=self.run_client, args=(conn, ))
self.clients[conn] = self.templ(addr=addr,
thread=th,
session=get_random_hash())
th.start()

def run_client(self, conn):
feeder = Feeder(self.commands)
tail = bytes()
while True:
msg = bytes()
msg_len = int(conn.recv(4))
while len(msg) < msg_len:
try:
chunk = conn.recv(msg_len - len(msg))
except socket.timeout:
conn.close()
del self.clients[conn]
return
msg += chunk

msg = msg.decode('utf-8').split('\n')
command_name = msg[0]
command = getattr(self, command_name)
args = [conn]
if len(msg) > 1:
args.append(msg[1])
command(*args)

def connect(self, conn):
self.condition_reply(conn, "connected", reply_templ="{}\nsession{}")

def ping(self, conn):
reply = format_reply('pong')
try:
chunk = tail + conn.recv(self.CHUNK_SIZE)
packet, tail = feeder.feed(chunk)
if not packet:
continue
process = getattr(self, packet.__class__.__name__.lower())
kwargs = {}
kw_only = get_keyword_args(process)
if 'conn' in kw_only:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Немного странный способ.
Думаю, правильней было бы создавать класс Connection (Protocol по-твистедовски или назови его как угодно) и держать в нем состояние.
Тем более что состояние может быть довольно сложным.
Впрочем, прямо сейчас можно ничего не менять.

kwargs['conn'] = conn
process(packet, **kwargs)
except (socket.timeout, OSError):
conn.close()
self.clients.pop(conn, None)
return

def connect(self, packet, *, conn):
session = self.clients[conn].session
reply = packet.reply(session)
for client in list(self.clients.keys()):
conn.sendall(reply)

def ping(self, packet, *, conn):
reply = packet.reply()
conn.sendall(reply)

def pingd(self, conn, data):
reply = format_reply('{}\n{}'.format('pongd', data))
def pingd(self, packet, *, conn):
reply = packet.reply()
conn.sendall(reply)

def quit(self, conn):
self.condition_reply(conn, "ackquit", shared_templ="{}\n{} disconnected.")
def quit(self, packet, *, conn):
session = self.clients[conn].session
reply = packet.reply(session)
for client in list(self.clients.keys()):
conn.sendall(reply)
conn.close()
del self.clients[conn]
self.clients.pop(conn, None)
raise SystemExit()

def condition_reply(self, conn, reply_command, shared_templ="{}\n{}", reply_templ="{}\n{}"):
addr = self.clients[conn].addr
shared_reply = format_reply(shared_templ.format(reply_command, addr))
session_id = self.clients[conn].session
reply = format_reply(reply_templ.format(reply_command, session_id))
for client in self.clients.keys():
if client == conn:
conn.sendall(reply)
else:
conn.sendall(shared_reply)

def finish(self, conn):
addr = self.clients[conn].addr
reply = format_reply("{}\n{} finished server.".format('ackfinish', addr))
for client in self.clients.keys():
def finish(self, packet):
reply = packet.reply()
for client in list(self.clients.keys()):
client.sendall(reply)
os.kill(os.getpid(), signal.SIGUSR1)
os.kill(os.getpid(), signal.SIGINT)
raise SystemExit()

def shutdown(self):
self.socket.close()
print('socket closed')
for conn in self.clients.keys():
logging.info('socket closed')
for conn in list(self.clients.keys()):
conn.close()
print('connections closed')
for th in map(attrgetter('thread'), self.clients.values()):
logging.info('connections closed')
for th in map(attrgetter('thread'), list(self.clients.values())):
th.join()
print('threads closed')
logging.info('threads closed')
if os.path.exists(self.PID_FILE):
os.remove(self.PID_FILE)
raise SystemExit()


if __name__ == '__main__':
configure_logging('Server')
args = get_cmd_args()
CommandServer.run_server(args.host, args.port)
15 changes: 2 additions & 13 deletions lud4ik/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,2 @@
import unittest


class BaseTestCase(unittest.TestCase):

def setUp(self):
pass

def tearDown(self):
pass

def test_connect(self):
self.assertRaises(FileNotFoundError, open, '/doesnotexist.py')
from .test_server import ServerTestCase
from .test_command import CommandTestCase
Loading