Skip to content

Commit f22a478

Browse files
committed
1) Priority support
2) External config files 3) Upgraded java-apns to latest available build
1 parent b92b1a2 commit f22a478

25 files changed

+185
-109
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,7 @@
1818
* Adaptive Loadbalancing on akka cluster
1919
* Scala 2.11.2 update
2020
* Java 8 update
21+
22+
## 2.2 (December 30, 2014)
23+
* Priority support
24+
* External configuration files (In config directory)

README.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ Support for MongoDB datasource is provided out-of-the-box.
1818
| Version 1.0.1 | May 2014 | Upgrade to Spray 1.3 & Scala 2.11.0 |
1919
| Version 2.0 | December 2014 | Move to akka.io cluster & distributed pubsub |
2020
| Version 2.1 | December 2014 | Adaptive Loadbalancing on akka cluster |
21+
| Version 2.2 | December 2014 | Priority support for push messages |
2122

2223
## Changelog
2324
Changelog can be viewed in [CHANGELOG.md](https://github.com/Flipkart/flipcast/blob/master/CHANGELOG.md) file
@@ -41,9 +42,9 @@ Changelog can be viewed in [CHANGELOG.md](https://github.com/Flipkart/flipcast/b
4142
## Library Dependencies
4243
--------------------
4344
* [spray](http://spray.io) 1.3.2
44-
* [Scala](http://www.scala-lang.org) 2.11.0
45+
* [Scala](http://www.scala-lang.org) 2.11.2
4546
* [akka.io](http://akka.io) 2.3.6
46-
* [java-apns](https://github.com/notnoop/java-apns) - 0.2.3
47+
* [java-apns](https://github.com/notnoop/java-apns) - 1.0.0.Beta4
4748
* [casbah](http://mongodb.github.io/casbah) - 2.8.0-RC0
4849

4950
## Infrastructure Dependencies

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ organization := "com.flipcast"
77

88
name := "flipcast"
99

10-
version := "2.1"
10+
version := "2.2"
1111

1212
scalaVersion := "2.11.2"
1313

src/main/resources/application.conf config/application.conf

+21-13
Original file line numberDiff line numberDiff line change
@@ -137,27 +137,35 @@ flipcast.config {
137137
source = "push_configuration"
138138
}
139139
}
140-
queue {
140+
worker {
141141
configs = [ "gcm", "apns", "mpns" ]
142142
gcm {
143-
workerName = "gcmRequestConsumer"
144-
sidelineWorkerName = "flipcastSidelineConsumer"
145-
workerInstances = 8
143+
priorityTags = ["default"]
144+
default {
145+
sidelineWorkerName = "flipcastSidelineConsumer"
146+
workerInstances = 8
147+
}
146148
}
147149
apns {
148-
workerName = "apnsRequestConsumer"
149-
sidelineWorkerName = "flipcastSidelineConsumer"
150-
workerInstances = 4
150+
priorityTags = ["default"]
151+
default {
152+
sidelineWorkerName = "flipcastSidelineConsumer"
153+
workerInstances = 4
154+
}
151155
}
152156
mpns {
153-
workerName = "mpnsRequestConsumer"
154-
sidelineWorkerName = "flipcastSidelineConsumer"
155-
workerInstances = 4
157+
priorityTags = ["default"]
158+
default {
159+
sidelineWorkerName = "flipcastSidelineConsumer"
160+
workerInstances = 4
161+
}
156162
}
157163
bulk {
158-
workerName = "bulkRequestConsumer"
159-
sidelineWorkerName = "flipcastSidelineConsumer"
160-
workerInstances = 2
164+
priorityTags = ["default"]
165+
default {
166+
sidelineWorkerName = "flipcastSidelineConsumer"
167+
workerInstances = 2
168+
}
161169
}
162170
}
163171
}
File renamed without changes.

flipcast-start

+4
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ JAVA_OPTS="${JAVA_OPTS} -Dcom.sun.management.jmxremote.port=29005"
6060
JAVA_OPTS="${JAVA_OPTS} -Dcom.sun.management.jmxremote.authenticate=false"
6161
JAVA_OPTS="${JAVA_OPTS} -Dcom.sun.management.jmxremote.ssl=false"
6262

63+
#Provide application configuration & log configuration file path
64+
JAVA_OPTS="${JAVA_OPTS} -Dapp.config=config/application.conf"
65+
JAVA_OPTS="${JAVA_OPTS} -Dlogback.configurationFile=config/logback.xml"
66+
6367
#Java Executable
6468
JAVA=`which java`
6569

flipcast-start.cmd

+4
Original file line numberDiff line numberDiff line change
@@ -56,5 +56,9 @@ set JAVA_OPTS="%JAVA_OPTS% -Dcom.sun.management.jmxremote.port=29005"
5656
set JAVA_OPTS="%JAVA_OPTS% -Dcom.sun.management.jmxremote.authenticate=false"
5757
set JAVA_OPTS="%JAVA_OPTS% -Dcom.sun.management.jmxremote.ssl=false"
5858

59+
REM Provide application configuration & log configuration file path
60+
set JAVA_OPTS="%JAVA_OPTS% -Dapp.config=config\\application.conf"
61+
set JAVA_OPTS="%JAVA_OPTS% -Dlogback.configurationFile=config\\logback.xml"
62+
5963
REM Execute main application JAR
6064
java -jar "%JAVA_OPTS%" "%APP_JAR%"

src/main/scala/com/flipcast/Flipcast.scala

+21-13
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.flipcast
22

3+
import java.io.File
34
import java.net.InetAddress
45
import java.util.concurrent.TimeUnit
56

@@ -53,7 +54,7 @@ object Flipcast extends App {
5354
/**
5455
* Load application configuration
5556
*/
56-
lazy val config = ConfigFactory.load()
57+
lazy val config = ConfigFactory.parseFile(new File(System.getProperty("app.config"))).resolve()
5758

5859
/**
5960
* Actor system for flipcast service
@@ -136,18 +137,25 @@ object Flipcast extends App {
136137
* Start all the message consumers
137138
*/
138139
def startMessageConsumers(isLocal: Boolean) {
139-
serviceRegistry.register[FlipcastGcmRequestConsumer]("gcmRequestConsumer",
140-
instances = QueueConfigurationManager.config("gcm").workerInstances,
141-
dispatcher = "akka.actor.gcm-dispatcher", isLocal = isLocal)
142-
serviceRegistry.register[FlipcastApnsRequestConsumer]("apnsRequestConsumer",
143-
instances = QueueConfigurationManager.config("apns").workerInstances,
144-
dispatcher = "akka.actor.apns-dispatcher", isLocal = isLocal)
145-
serviceRegistry.register[FlipcastMpnsRequestConsumer]("mpnsRequestConsumer",
146-
instances = QueueConfigurationManager.config("mpns").workerInstances,
147-
dispatcher = "akka.actor.mpns-dispatcher", isLocal = isLocal)
148-
serviceRegistry.register[BulkMessageConsumer]("bulkMessageConsumer",
149-
instances = QueueConfigurationManager.config("bulk").workerInstances,
150-
isLocal = isLocal)
140+
WorkerConfigurationManager.config("gcm").priorityConfigs.foreach{ case (w, c) => {
141+
serviceRegistry.register[FlipcastGcmRequestConsumer](c.workerName,
142+
instances = c.workerInstances,
143+
dispatcher = "akka.actor.gcm-dispatcher", isLocal = isLocal)
144+
}}
145+
WorkerConfigurationManager.config("apns").priorityConfigs.foreach{ case (w, c) => {
146+
serviceRegistry.register[FlipcastApnsRequestConsumer](c.workerName,
147+
instances = c.workerInstances,
148+
dispatcher = "akka.actor.apns-dispatcher", isLocal = isLocal)
149+
}}
150+
WorkerConfigurationManager.config("mpns").priorityConfigs.foreach{ case (w, c) => {
151+
serviceRegistry.register[FlipcastMpnsRequestConsumer](c.workerName,
152+
instances = c.workerInstances,
153+
dispatcher = "akka.actor.mpns-dispatcher", isLocal = isLocal)
154+
}}
155+
WorkerConfigurationManager.config("bulk").priorityConfigs.foreach{ case (w, c) => {
156+
serviceRegistry.register[BulkMessageConsumer](c.workerName,
157+
instances = c.workerInstances, isLocal = isLocal)
158+
}}
151159
}
152160

153161
def startMetrics() {

src/main/scala/com/flipcast/protocol/BulkMessageRequestProtocol.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ trait BulkMessageRequestProtocol extends DefaultJsonProtocol with SprayJsonSuppo
7777
}
7878
val message = json.asJsObject.fields.contains("message") match {
7979
case true => json.asJsObject.fields("message").convertTo[PushMessage]
80-
case _ => PushMessage("{}", Option(0), Option(true))
80+
case _ => PushMessage("{}", Option(0), Option(true), Option("default"))
8181
}
8282
BulkMessageRequest(configName, query, message, start, end)
8383
}

src/main/scala/com/flipcast/push/common/FlipcastRequestConsumer.scala

+9-7
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import akka.actor.{Actor, ActorSystem}
77
import akka.event.slf4j.Logger
88
import akka.util.Timeout
99
import com.flipcast.Flipcast
10-
import com.flipcast.push.config.QueueConfigurationManager
10+
import com.flipcast.push.config.WorkerConfigurationManager
1111
import com.flipcast.push.model.SidelinedMessage
1212
import com.flipcast.push.model.requests.{FlipcastPushRequest, FlipcastRequest}
1313
import com.flipcast.push.protocol.FlipcastPushProtocol
@@ -33,19 +33,19 @@ abstract class FlipcastRequestConsumer[T <: FlipcastRequest: ClassTag] extends
3333

3434
def consume(message: T) : Boolean
3535

36-
def config = QueueConfigurationManager.config(configType())
36+
def config = WorkerConfigurationManager.config(configType())
3737

3838
def init()
3939

4040
lazy val log = Logger(configType())
4141

4242
override def preStart() {
4343
init()
44-
log.info("Starting message consumer on: " +config.workerName +" Worker: " +self.path)
44+
log.info("Starting message consumer on: " +config.configName +" Worker: " +self.path)
4545
}
4646

4747
override def postStop(): Unit = {
48-
log.info("Stopping message consumer on: " +config.workerName +" Worker: " +self.path)
48+
log.info("Stopping message consumer on: " +config.configName +" Worker: " +self.path)
4949
}
5050

5151
def receive = {
@@ -67,14 +67,16 @@ abstract class FlipcastRequestConsumer[T <: FlipcastRequest: ClassTag] extends
6767
private def sideline(message: T) {
6868
message match {
6969
case x: FlipcastPushRequest =>
70-
Flipcast.serviceRegistry.actor(config.sidelineWorkerName) !
70+
Flipcast.serviceRegistry.actorLookup(config.priorityConfigs(x.priority.getOrElse("default")).sidelineWorkerName) !
7171
SidelinedMessage(UUID.randomUUID().toString,
7272
x.configName, configType(), x.toJson.compactPrint, new Date())
7373
}
7474
}
7575

7676
def resend(message: T) {
77-
Flipcast.serviceRegistry.actor(config.workerName) ! message
77+
message match {
78+
case x: FlipcastPushRequest =>
79+
Flipcast.serviceRegistry.actorLookup(config.priorityConfigs(x.priority.getOrElse("default")).workerName) ! message
80+
}
7881
}
79-
8082
}

src/main/scala/com/flipcast/push/config/QueueConfig.scala

-8
This file was deleted.

src/main/scala/com/flipcast/push/config/QueueConfigurationManager.scala

-30
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.flipcast.push.config
2+
3+
4+
case class PriorityConfig(workerInstances: Int = 1, workerName: String, sidelineWorkerName: String)
5+
6+
/**
7+
* Config to hold worker configuration
8+
*
9+
* @author Phaneesh Nagaraja
10+
*/
11+
case class WorkerConfig(configName: String, priorityConfigs: Map[String, PriorityConfig])
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.flipcast.push.config
2+
3+
import java.util.concurrent.ConcurrentHashMap
4+
5+
import com.flipcast.Flipcast
6+
import collection.JavaConverters._
7+
8+
/**
9+
* Provides queue configuration for different push service providers
10+
*
11+
* @author Phaneesh Nagaraja
12+
*/
13+
object WorkerConfigurationManager {
14+
15+
val workerConfigurationCache = new ConcurrentHashMap[String, WorkerConfig]()
16+
17+
def configs() : List[String] = {
18+
Flipcast.config.getStringList("flipcast.config.queue.configs").asScala.toList
19+
}
20+
21+
def config(configType: String) : WorkerConfig = {
22+
if(!workerConfigurationCache.containsKey(configType)) {
23+
val c = Flipcast.config.getConfig("flipcast.config.worker." + configType)
24+
val priorityTags = c.getStringList("priorityTags").asScala
25+
workerConfigurationCache.put(configType, WorkerConfig(
26+
configType,
27+
priorityTags.map( pt => {
28+
pt -> PriorityConfig(c.getInt(pt +".workerInstances"), String.format("%sMessageConsumer-%s", configType, pt),
29+
c.getString( pt +".sidelineWorkerName"))
30+
}).toMap))
31+
}
32+
workerConfigurationCache.get(configType)
33+
}
34+
35+
def worker(configType: String, priorityTag: String) = {
36+
config(configType).priorityConfigs(priorityTag).workerName
37+
}
38+
39+
def bulkConfig() = {
40+
config("bulk")
41+
}
42+
43+
}

src/main/scala/com/flipcast/push/gcm/service/FlipcastGcmRequestConsumer.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class FlipcastGcmRequestConsumer extends FlipcastRequestConsumer[FlipcastPushReq
109109
failedIds.isEmpty match {
110110
case true => None
111111
case false =>
112-
resend(FlipcastPushRequest(request.configName, failedIds.toList, request.data, request.ttl, request.delayWhileIdle))
112+
resend(FlipcastPushRequest(request.configName, failedIds.toList, request.data, request.ttl, request.delayWhileIdle, request.priority))
113113
}
114114
//Record history for all successful devices
115115
request.registration_ids.diff(failedIds.toList).par.foreach( r => {

src/main/scala/com/flipcast/push/model/PushMessage.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ package com.flipcast.push.model
55
*
66
* @author Phaneesh Nagaraja
77
*/
8-
case class PushMessage(message: String, ttl: Option[Int], delayWhileIdle: Option[Boolean])
8+
case class PushMessage(message: String, ttl: Option[Int], delayWhileIdle: Option[Boolean], priority: Option[String])

src/main/scala/com/flipcast/push/model/requests/FlipcastPushRequest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@ case class FlipcastPushRequest(configName: String,
1414
registration_ids: List[String],
1515
data: String,
1616
ttl: Option[Int],
17-
delayWhileIdle: Option[Boolean]) extends FlipcastRequest
17+
delayWhileIdle: Option[Boolean], priority: Option[String]) extends FlipcastRequest

src/main/scala/com/flipcast/push/mpns/service/FlipcastMpnsRequestConsumer.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class FlipcastMpnsRequestConsumer extends FlipcastRequestConsumer[FlipcastPushRe
8282
Flipcast.serviceRegistry.actor("deviceHouseKeepingManager") ! DeviceHousekeepingRequest(request.configName, r)
8383
})
8484
if(failedIds.size > 0) {
85-
resend(FlipcastPushRequest(request.configName, failedIds.toList, request.data, request.ttl, request.delayWhileIdle))
85+
resend(FlipcastPushRequest(request.configName, failedIds.toList, request.data, request.ttl, request.delayWhileIdle, request.priority))
8686
}
8787
true
8888
}

src/main/scala/com/flipcast/push/protocol/FlipcastPushProtocol.scala

+10-1
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,21 @@ trait FlipcastPushProtocol extends DefaultJsonProtocol with SprayJsonSupport {
6868
}
6969
case _ => false
7070
}
71+
val priority = json.asJsObject.fields.contains("priority") match {
72+
case true =>
73+
json.asJsObject.fields("priority") match {
74+
case a: JsString => Option(a.value)
75+
case _ => None
76+
}
77+
case _ => None
78+
}
79+
7180
val data = json.asJsObject.fields.contains("data") match {
7281
case true => json.asJsObject.fields("data").compactPrint
7382
case _ => "{}"
7483
}
7584
FlipcastPushRequest(configName, registration_ids.filter( _.trim.length > 0), data, Option(ttl),
76-
Option(delayWhileIdle))
85+
Option(delayWhileIdle), priority)
7786
}
7887
}
7988

0 commit comments

Comments
 (0)