diff --git a/Makefile b/Makefile index bb073c0..67c950d 100644 --- a/Makefile +++ b/Makefile @@ -8,16 +8,14 @@ PYTHON=python all: build add-submodules: - -git submodule add -b v0.8.0 https://github.com/alanxz/rabbitmq-c.git + git submodule add -b v0.9.0 https://github.com/alanxz/rabbitmq-c.git submodules: git submodule init git submodule update rabbitmq-c: submodules - (cd $(RABBIT_DIR); test -f configure || autoreconf -i) - (cd $(RABBIT_DIR); test -f Makefile || automake --add-missing) - + (cd $(RABBIT_DIR); cmake .; cmake --build .) rabbitmq-clean: -(cd $(RABBIT_DIR) && make clean) @@ -50,8 +48,7 @@ distclean: pyclean rabbitmq-distclean removepyc -rm -f erl_crash.dump $(RABBIT_TARGET): - (test -f config.h || cd $(RABBIT_DIR); ./configure --disable-tools --disable-docs) - (cd $(RABBIT_DIR); make) + (cd $(RABBIT_DIR); cmake .; cmake --build .;) dist: rabbitmq-c $(RABBIT_TARGET) diff --git a/Modules/_librabbitmq/connection.c b/Modules/_librabbitmq/connection.c index 3ace2e3..5cf3c75 100644 --- a/Modules/_librabbitmq/connection.c +++ b/Modules/_librabbitmq/connection.c @@ -7,6 +7,9 @@ #include #include +#include +#include +#include #include "connection.h" #include "distmeta.h" @@ -974,6 +977,8 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self, "channel_max", "frame_max", "heartbeat", + "ssl", + "confirmed", "client_properties", NULL }; @@ -985,12 +990,15 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self, int channel_max = 0xffff; int frame_max = 131072; int heartbeat = 0; + int ssl = 0; + int confirmed = 0; int port = 5672; PyObject *client_properties = NULL; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|ssssiiiiO", kwlist, + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|ssssiiiiiiO", kwlist, &hostname, &userid, &password, &virtual_host, &port, - &channel_max, &frame_max, &heartbeat, &client_properties)) { + &channel_max, &frame_max, &heartbeat, &ssl, &confirmed, + &client_properties)) { return -1; } @@ -1012,6 +1020,8 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self, self->channel_max = channel_max; self->frame_max = frame_max; self->heartbeat = heartbeat; + self->ssl = ssl; + self->confirmed = confirmed; self->weakreflist = NULL; self->callbacks = PyDict_New(); if (self->callbacks == NULL) return -1; @@ -1057,7 +1067,13 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self) } Py_BEGIN_ALLOW_THREADS; self->conn = amqp_new_connection(); - socket = amqp_tcp_socket_new(self->conn); + if (self->ssl == 1 ) { + socket = amqp_ssl_socket_new(self->conn); + amqp_ssl_socket_set_verify_peer(socket, 0); + amqp_ssl_socket_set_verify_hostname(socket, 0); + } else { + socket = amqp_tcp_socket_new(self->conn); + } Py_END_ALLOW_THREADS; if (!socket) { @@ -1132,14 +1148,22 @@ PyRabbitMQ_Connection_close(PyRabbitMQ_Connection *self) unsigned int PyRabbitMQ_Connection_create_channel(PyRabbitMQ_Connection *self, unsigned int channel) { - amqp_rpc_reply_t reply; + amqp_rpc_reply_t replyopen; + amqp_rpc_reply_t replyconfirm; Py_BEGIN_ALLOW_THREADS; amqp_channel_open(self->conn, channel); - reply = amqp_get_rpc_reply(self->conn); + replyopen = amqp_get_rpc_reply(self->conn); + if (self->confirmed){ + amqp_confirm_select(self->conn, (amqp_channel_t)channel); + replyconfirm = amqp_get_rpc_reply(self->conn); + } Py_END_ALLOW_THREADS; - - return PyRabbitMQ_HandleAMQError(self, 0, reply, "Couldn't create channel"); + if ((replyopen.reply_type != AMQP_RESPONSE_NORMAL) || !(self->confirmed)) { + return PyRabbitMQ_HandleAMQError(self, 0, replyopen, "Couldn't create channel"); + } else { + return PyRabbitMQ_HandleAMQError(self, 0, replyconfirm, "Couldn't set confirm mode"); + } } @@ -1811,6 +1835,8 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self, PyObject *exchange = NULL; PyObject *routing_key = NULL; PyObject *propdict; + amqp_frame_t frame; + unsigned int channel = 0; unsigned int mandatory = 0; unsigned int immediate = 0; @@ -1818,6 +1844,7 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self, char *body_buf = NULL; Py_ssize_t body_size = 0; + int status = 0; int ret = 0; amqp_basic_properties_t props; amqp_bytes_t bytes; @@ -1852,12 +1879,20 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self, (amqp_boolean_t)immediate, &props, bytes); + if (self->confirmed){ + status = amqp_simple_wait_frame_on_channel(self->conn,channel,&frame); + } amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; if (!PyRabbitMQ_HandleError(ret, "basic.publish")) { goto error; } + if ((self->confirmed) && (status != AMQP_STATUS_OK) && + (frame.frame_type != AMQP_FRAME_METHOD) && + (frame.payload.method.id != AMQP_BASIC_ACK_METHOD )){ + goto error; + } Py_RETURN_NONE; error: @@ -1865,8 +1900,6 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self, bail: return 0; } - - /* * Connection._basic_ack */ diff --git a/Modules/_librabbitmq/connection.h b/Modules/_librabbitmq/connection.h index 273ae75..e903e0a 100644 --- a/Modules/_librabbitmq/connection.h +++ b/Modules/_librabbitmq/connection.h @@ -161,6 +161,8 @@ typedef struct { int frame_max; int channel_max; int heartbeat; + int ssl; + int confirmed; int sockfd; int connected; @@ -271,6 +273,10 @@ static PyMemberDef PyRabbitMQ_ConnectionType_members[] = { offsetof(PyRabbitMQ_Connection, port), READONLY, NULL}, {"heartbeat", T_INT, offsetof(PyRabbitMQ_Connection, heartbeat), READONLY, NULL}, + {"ssl", T_INT, + offsetof(PyRabbitMQ_Connection, ssl), READONLY, NULL}, + {"confirmed", T_INT, + offsetof(PyRabbitMQ_Connection, confirmed), READONLY, NULL}, {"server_properties", T_OBJECT_EX, offsetof(PyRabbitMQ_Connection, server_properties), READONLY, NULL}, {"connected", T_INT, diff --git a/librabbitmq/__init__.py b/librabbitmq/__init__.py index 6a4189f..56bbc63 100644 --- a/librabbitmq/__init__.py +++ b/librabbitmq/__init__.py @@ -190,14 +190,17 @@ class Connection(_librabbitmq.Connection): def __init__(self, host='localhost', userid='guest', password='guest', virtual_host='/', port=5672, channel_max=0xffff, - frame_max=131072, heartbeat=0, lazy=False, + frame_max=131072, heartbeat=0, ssl=False, confirmed=False, lazy=False, client_properties=None, **kwargs): if ':' in host: host, port = host.split(':') + if ssl: + ssl = True + confirmed = confirmed if confirmed else kwargs.pop("confirm_publish",False) super(Connection, self).__init__( hostname=host, port=int(port), userid=userid, password=password, virtual_host=virtual_host, channel_max=channel_max, - frame_max=frame_max, heartbeat=heartbeat, + frame_max=frame_max, heartbeat=heartbeat, ssl=int(ssl),confirmed=int(confirmed), client_properties=client_properties, ) self.channels = {} diff --git a/rabbitmq-c b/rabbitmq-c index caad0ef..75a21e5 160000 --- a/rabbitmq-c +++ b/rabbitmq-c @@ -1 +1 @@ -Subproject commit caad0ef1533783729c7644a226c989c79b4c497b +Subproject commit 75a21e51db5d70ea807473621141b4417d81b56f diff --git a/setup.py b/setup.py index 09b1c89..db8897e 100644 --- a/setup.py +++ b/setup.py @@ -64,6 +64,9 @@ def append_env(L, e): 'amqp_socket.c', 'amqp_table.c', 'amqp_tcp_socket.c', + 'amqp_openssl_hostname_validation.c', + 'amqp_openssl.c', + 'amqp_openssl_bio.c', 'amqp_time.c', 'amqp_url.c', ]) @@ -72,7 +75,7 @@ def append_env(L, e): if is_linux: # Issue #42 libs.append('rt') # -lrt for clock_gettime - + libs.append('ssl') librabbitmq_ext = Extension( '_librabbitmq', sources=list(PyC_files) + list(librabbit_files), @@ -96,6 +99,7 @@ def append_env(L, e): class build(_build): stdcflags = [ '-DHAVE_CONFIG_H', + '-DENABLE_SSL_SUPPORT=ON', ] if os.environ.get('PEDANTIC'): # Python.h breaks -pedantic, so can only use it while developing. @@ -123,26 +127,22 @@ def run(self): ) try: + if not os.path.isdir(os.path.join(LRMQDIST(), '.git')): - print('- pull submodule rabbitmq-c...') - if os.path.isfile('Makefile'): - os.system(' '.join([make, 'submodules'])) - else: - os.system(' '.join(['git', 'clone', '-b', 'v0.8.0', - 'https://github.com/alanxz/rabbitmq-c.git', - 'rabbitmq-c'])) + print('- pull submodule rabbitmq-c...') + if os.path.isfile('Makefile'): + os.system(' '.join([make, 'submodules'])) + else: + os.system(' '.join(['git', 'clone', '-b', 'v0.9.0', + 'https://github.com/alanxz/rabbitmq-c.git', + 'rabbitmq-c'])) os.chdir(LRMQDIST()) - if not os.path.isfile('configure'): - print('- autoreconf') - os.system('autoreconf -i') - - if not os.path.isfile('config.h'): - print('- configure rabbitmq-c...') - if os.system('/bin/sh configure --disable-tools \ - --disable-docs --disable-dependency-tracking'): - return + print('- cmake') + os.system('cmake .') + print(' -build') + os.system('cmake --build .') finally: os.environ.update(restore) finally: @@ -157,7 +157,7 @@ def run(self): return librabbitmq_ext, build -def find_make(alt=('gmake', 'gnumake', 'make', 'nmake')): +def find_make(alt=('gmake', 'gnumake', 'make', 'nmake','cmake')): for path in os.environ['PATH'].split(':'): for make in (os.path.join(path, m) for m in alt): if os.path.isfile(make):