1
1
package com .flipcast .rmq
2
2
3
- import scala .collection .mutable
4
- import com .github .sstone .amqp .{Amqp , RabbitMQConnection }
3
+ import com .github .sstone .amqp .{ChannelOwner , Consumer , ConnectionOwner , Amqp }
5
4
import java .util .concurrent .atomic .AtomicInteger
6
5
import com .flipcast .Flipcast
7
- import akka .actor .{ActorRef , ActorSystem }
8
- import com .rabbitmq .client .{AMQP , Address }
9
- import scala .concurrent .duration .Duration
10
- import java .util .concurrent .TimeUnit ._
6
+ import akka .actor .{PoisonPill , ActorRef , ActorSystem }
7
+ import com .rabbitmq .client .{ConnectionFactory , AMQP , Address }
11
8
import com .github .sstone .amqp .Amqp ._
12
9
import akka .event .slf4j .Logger
13
10
import com .github .sstone .amqp .Amqp .DeclareQueue
@@ -16,6 +13,8 @@ import com.github.sstone.amqp.Amqp.QueueParameters
16
13
import com .github .sstone .amqp .Amqp .DeclareExchange
17
14
import com .github .sstone .amqp .Amqp .QueueBind
18
15
import java .util .concurrent .{LinkedBlockingQueue , TimeUnit , ThreadPoolExecutor }
16
+ import scala .concurrent .duration ._
17
+ import collection .JavaConverters ._
19
18
20
19
/**
21
20
* RabbitMQ connection helper
@@ -37,14 +36,14 @@ object ConnectionHelper {
37
36
38
37
39
38
/**
40
- * Keep track of open connections
39
+ * RMQ Connection
41
40
*/
42
- val connections : mutable. HashMap [ String , RabbitMQConnection ] = new mutable. HashMap [ String , RabbitMQConnection ]() with mutable. SynchronizedMap [ String , RabbitMQConnection ]
41
+ var rmqConnection : ActorRef = null
43
42
44
43
/**
45
44
* Client properties that we need to set (in case of rmq cluster)
46
45
*/
47
- val clientProps = Map ( " x-ha-policy" -> " all" )
46
+ val clientProps = Map [ String , AnyRef ]( " ha-mode " -> " all " , " x-ha-policy" -> " all" , " x-priority " -> new Integer ( 10 ) )
48
47
49
48
/**
50
49
* Keep a count of connection
@@ -59,23 +58,18 @@ object ConnectionHelper {
59
58
* @return A RabbitMQ connection
60
59
*/
61
60
private def createConnection () (implicit system : ActorSystem ) = {
62
- val name = " flipcast-%s" .format(connectionCount.incrementAndGet())
63
- val reconnectDelay = Duration (Flipcast .rmqConfig.reconnectDelay, MILLISECONDS )
64
- val executor = Option (executorService)
65
- val addresses = Flipcast .rmqConfig.hosts.map( h => {
66
- val tokens = h.split(" :" )
67
- new Address (tokens(0 ), tokens(1 ).toInt)
68
- }).toArray
69
- val conn = new RabbitMQConnection (name = name,
70
- vhost = Flipcast .rmqConfig.vhost,
71
- user = Flipcast .rmqConfig.user,
72
- password = Flipcast .rmqConfig.pass,
73
- reconnectionDelay = reconnectDelay,
74
- executor = executor,
75
- addresses = Option (addresses))
76
- conn.waitForConnection.await()
77
- connections.put(name, conn)
78
- conn
61
+ if (rmqConnection == null ) {
62
+ val connectionFactory = new ConnectionFactory ()
63
+ connectionFactory.setClientProperties(clientProps.asJava)
64
+ connectionFactory.setAutomaticRecoveryEnabled(true )
65
+ connectionFactory.setNetworkRecoveryInterval(10 )
66
+ val addresses = Flipcast .rmqConfig.hosts.map( h => {
67
+ val tokens = h.split(" :" )
68
+ new Address (tokens(0 ), tokens(1 ).toInt)
69
+ }).toArray
70
+ rmqConnection = system.actorOf(ConnectionOwner .props(connectionFactory, 1 second, addresses = Option (addresses), executor = Option (executorService)))
71
+ }
72
+ rmqConnection
79
73
}
80
74
81
75
/**
@@ -87,14 +81,13 @@ object ConnectionHelper {
87
81
* @return ActorRef of consumer
88
82
*/
89
83
def createConsumer (queueName : String , exchange : String , listener : ActorRef , qos : Int ) (implicit system : ActorSystem ) = {
90
- val channelParameters = Option (ChannelParameters (qos))
91
84
val exchangeParams = ExchangeParameters (name = exchange, passive = false ,
92
85
exchangeType = " direct" , durable = true , autodelete = false , clientProps)
86
+ val cParams = Option (ChannelParameters (qos))
87
+ val queueParams = QueueParameters (queueName, passive = false , durable = true , exclusive = false , autodelete = false , clientProps)
93
88
val connection = createConnection()
94
- val queueParams = QueueParameters (queueName, passive = false , durable = true , exclusive = false ,
95
- autodelete = false , clientProps)
96
- val consumer = connection.createConsumer(exchangeParams, queueParams, queueName, listener,
97
- channelParams = channelParameters, autoack = false )
89
+ val consumer = ConnectionOwner .createChildActor(connection, Consumer .props(listener, exchangeParams,queueParams, queueName,
90
+ cParams, autoack = false ))
98
91
Amqp .waitForConnection(system, consumer).await()
99
92
consumer ! DeclareExchange (exchangeParams)
100
93
consumer ! DeclareQueue (queueParams)
@@ -114,10 +107,10 @@ object ConnectionHelper {
114
107
val channelParameters = Option (ChannelParameters (1 ))
115
108
val exchangeParams = ExchangeParameters (name = exchange, passive = false ,
116
109
exchangeType = " direct" , durable = true , autodelete = false , clientProps)
117
- val connection = createConnection()
118
110
val queueParams = QueueParameters (queueName, passive = false , durable = true , exclusive = false , autodelete = false ,
119
111
clientProps)
120
- val producer = connection.createChannelOwner(channelParameters)
112
+ val connection = createConnection()
113
+ val producer = ConnectionOwner .createChildActor(connection, ChannelOwner .props(channelParams = channelParameters))
121
114
Amqp .waitForConnection(system, producer).await()
122
115
producer ! DeclareExchange (exchangeParams)
123
116
producer ! DeclareQueue (queueParams)
@@ -129,10 +122,9 @@ object ConnectionHelper {
129
122
* Close all connections
130
123
*/
131
124
def stop () {
132
- connections.foreach( c => {
133
- log.info(" Closing connection: " + c._1)
134
- c._2.stop
135
- })
125
+ if (rmqConnection != null ) {
126
+ rmqConnection ! PoisonPill
127
+ }
136
128
}
137
129
138
130
}
0 commit comments