The scheduling uses the java.util.concurrent Scheduled Executor Service. It + * is started and stopped based on Spring life cycle events. + */ +public class BookingRequestFeeder extends Feeder implements InitializingBean, DisposableBean { + + public FeederTask feederTask; + private IBookingRequestDAO bookingRequestDAO; + + public void afterPropertiesSet() throws Exception { + log.info("--- STARTING BOOKING REQUEST FEEDER WITH CYCLE [" + defaultDelay + "]"); + Assert.notNull(bookingRequestDAO, "****** bookingRequestDAO is a required property ******"); + + executorService = Executors.newScheduledThreadPool(3); + feederTask = new FeederTask(); + sf = executorService.scheduleAtFixedRate(feederTask, defaultDelay, defaultDelay, TimeUnit.MILLISECONDS); + } + + public void destroy() throws Exception { + sf.cancel(false); + sf = null; + executorService.shutdown(); + } + + public class FeederTask implements Runnable { + private long counter = 1; + + public void run() { + try { + long time = System.currentTimeMillis(); + BookingRequest bookingRequest = new BookingRequest("FEEDER " + Long.toString(time)); + + SourceDestinationAirports airports = AirportDataUtils.generateRandomSourceDestinationAirPorts(); + bookingRequest.setSourceDestinationId(airports.getId()); + bookingRequest.setDestinationAirport(airports.getDesintationAirport()); + bookingRequest.setSourceAirport(airports.getSourceAirport()); + bookingRequest.setAirline(AirportDataUtils.generateRandomAirport()); + + bookingRequestDAO.save(bookingRequest); + //log.info("--- FEEDER WROTE BOOKING REQUEST " + bookingRequest); + }catch(SpaceInterruptedException e) { + // ignore, we are being shutdown + }catch(Exception e) { + e.printStackTrace(); + } + counter++; + } + + public long getCounter() { + return counter; + } + } + + public long getFeedCount() { + return feederTask.getCounter(); + } + + public void setDefaultDelay(long defaultDelay) { + this.defaultDelay = defaultDelay; + } + + public void setBookingRequestDAO(IBookingRequestDAO bookingRequestDAO) { + this.bookingRequestDAO = bookingRequestDAO; + } + +} \ No newline at end of file diff --git a/realTimeAnalyticsTimeSeries/feeder/src/main/java/com/mycompany/app/feeder/Feeder.java b/realTimeAnalyticsTimeSeries/feeder/src/main/java/com/mycompany/app/feeder/Feeder.java new file mode 100644 index 0000000..8f7c942 --- /dev/null +++ b/realTimeAnalyticsTimeSeries/feeder/src/main/java/com/mycompany/app/feeder/Feeder.java @@ -0,0 +1,20 @@ +package com.mycompany.app.feeder; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.logging.Logger; + +public abstract class Feeder { + + protected Logger log = Logger.getLogger(this.getClass().getName()); + + protected ScheduledExecutorService executorService; + protected ScheduledFuture> sf; + + protected long defaultDelay = 100; + + public void setDefaultDelay(long defaultDelay) { + this.defaultDelay = defaultDelay; + } + +} \ No newline at end of file diff --git a/realTimeAnalyticsTimeSeries/feeder/src/main/java/com/mycompany/app/feeder/SearchRequestFeeder.java b/realTimeAnalyticsTimeSeries/feeder/src/main/java/com/mycompany/app/feeder/SearchRequestFeeder.java new file mode 100644 index 0000000..9af6c9f --- /dev/null +++ b/realTimeAnalyticsTimeSeries/feeder/src/main/java/com/mycompany/app/feeder/SearchRequestFeeder.java @@ -0,0 +1,83 @@ +package com.mycompany.app.feeder; + +import com.mycompany.app.common.dao.ISearchRequestDAO; +import com.mycompany.app.common.domain.SearchRequest; +import com.mycompany.app.common.utils.AirportDataUtils; +import com.mycompany.app.common.vo.SourceDestinationAirports; + +import org.openspaces.core.SpaceInterruptedException; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.util.Assert; + +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * A feeder bean starts a scheduled task that writes a new Data objects to the space (in an unprocessed state). + * + *
The scheduling uses the java.util.concurrent Scheduled Executor Service. It
+ * is started and stopped based on Spring life cycle events.
+ */
+public class SearchRequestFeeder extends Feeder implements InitializingBean, DisposableBean {
+
+ private FeederTask feederTask;
+ private ISearchRequestDAO searchRequestDAO;
+
+ public void afterPropertiesSet() throws Exception {
+ log.info("--- STARTING SEARCH REQUEST FEEDER WITH CYCLE [" + defaultDelay + "]");
+ Assert.notNull(searchRequestDAO, "****** searchRequestDAO is a required property ******");
+
+ executorService = Executors.newScheduledThreadPool(1);
+ feederTask = new FeederTask();
+ sf = executorService.scheduleAtFixedRate(feederTask, defaultDelay, defaultDelay, TimeUnit.MILLISECONDS);
+ }
+
+ public void destroy() throws Exception {
+ sf.cancel(false);
+ sf = null;
+ executorService.shutdown();
+ }
+
+ public class FeederTask implements Runnable {
+ private long counter = 1;
+
+ public void run() {
+ try {
+ long time = System.currentTimeMillis();
+ SearchRequest searchRequest = new SearchRequest("FEEDER " + Long.toString(time));
+
+ SourceDestinationAirports airports = AirportDataUtils.generateRandomSourceDestinationAirPorts();
+ searchRequest.setSourceDestinationId(airports.getId());
+ searchRequest.setDestinationAirport(airports.getDesintationAirport());
+ searchRequest.setSourceAirport(airports.getSourceAirport());
+ searchRequest.setAirline(AirportDataUtils.generateRandomAirport());
+
+ searchRequestDAO.save(searchRequest);
+ //log.info("--- FEEDER WROTE SEARCH REQUEST " + searchRequest);
+ }catch(SpaceInterruptedException e) {
+ // ignore, we are being shutdown
+ }catch(Exception e) {
+ e.printStackTrace();
+ }
+ counter++;
+ }
+
+ public long getCounter() {
+ return counter;
+ }
+ }
+
+ public long getFeedCount() {
+ return feederTask.getCounter();
+ }
+
+ public void setDefaultDelay(long defaultDelay) {
+ this.defaultDelay = defaultDelay;
+ }
+
+ public void setSearchRequestDAO(ISearchRequestDAO searchRequestDAO) {
+ this.searchRequestDAO = searchRequestDAO;
+ }
+
+}
\ No newline at end of file
diff --git a/realTimeAnalyticsTimeSeries/feeder/src/main/java/com/mycompany/app/init/InitalizeFeeders.java b/realTimeAnalyticsTimeSeries/feeder/src/main/java/com/mycompany/app/init/InitalizeFeeders.java
new file mode 100644
index 0000000..fd0152d
--- /dev/null
+++ b/realTimeAnalyticsTimeSeries/feeder/src/main/java/com/mycompany/app/init/InitalizeFeeders.java
@@ -0,0 +1,32 @@
+package com.mycompany.app.init;
+
+import org.openspaces.core.cluster.ClusterInfo;
+import org.openspaces.pu.container.ProcessingUnitContainer;
+import org.openspaces.pu.container.standalone.StandaloneProcessingUnitContainerProvider;
+
+public class InitalizeFeeders {
+
+ public static void main(String[] args) throws Exception {
+ StandaloneProcessingUnitContainerProvider provider = new StandaloneProcessingUnitContainerProvider("target/my-app-feeder.jar");
+
+ //Provide cluster information for the specific PU instance
+ ClusterInfo clusterInfo = new ClusterInfo();
+ clusterInfo.setSchema("partitioned-sync2backup");
+ clusterInfo.setNumberOfInstances(2);
+ clusterInfo.setNumberOfBackups(1);
+ clusterInfo.setInstanceId(1);
+ provider.setClusterInfo(clusterInfo);
+
+ //Build the Spring application context and "start" it
+ ProcessingUnitContainer container = provider.createContainer();
+
+ //Stop the Feeder after 20 minutes in case the demonstrator forget to manually stop the process
+ try {
+ Thread.sleep(1200000);
+ container.close();
+ }catch(Exception e) {
+ System.out.println(e);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/realTimeAnalyticsTimeSeries/feeder/src/main/resources/META-INF/spring/pu.xml b/realTimeAnalyticsTimeSeries/feeder/src/main/resources/META-INF/spring/pu.xml
new file mode 100644
index 0000000..2aa953a
--- /dev/null
+++ b/realTimeAnalyticsTimeSeries/feeder/src/main/resources/META-INF/spring/pu.xml
@@ -0,0 +1,50 @@
+
+ =o.length)return n;var r=[],u=a[e++];return n.forEach(function(n,u){r.push({key:n,values:t(u,e)})}),u?r.sort(function(n,t){return u(n.key,t.key)}):r}var e,r,i={},o=[],a=[];return i.map=function(t,e){return n(e,t,0)},i.entries=function(e){return t(n(mo.map,e,0),0)},i.key=function(n){return o.push(n),i},i.sortKeys=function(n){return a[o.length-1]=n,i},i.sortValues=function(n){return e=n,i},i.rollup=function(n){return r=n,i},i},mo.set=function(n){var t=new i;if(n)for(var e=0,r=n.length;r>e;++e)t.add(n[e]);return t},r(i,{has:function(n){return zo+n in this},add:function(n){return this[zo+n]=!0,n},remove:function(n){return n=zo+n,n in this&&delete this[n]},values:function(){var n=[];return this.forEach(function(t){n.push(t)}),n},forEach:function(n){for(var t in this)t.charCodeAt(0)===Co&&n.call(this,t.substring(1))}}),mo.behavior={},mo.rebind=function(n,t){for(var e,r=1,u=arguments.length;++r=0&&(r=n.substring(e+1),n=n.substring(0,e)),n)return arguments.length<2?this[n].on(r):this[n].on(r,t);if(2===arguments.length){if(null==t)for(n in this)this.hasOwnProperty(n)&&this[n].on(r,null);return this}},mo.event=null,mo.requote=function(n){return n.replace(jo,"\\$&")};var jo=/[\\\^\$\*\+\?\|\[\]\(\)\.\{\}]/g,Lo={}.__proto__?function(n,t){n.__proto__=t}:function(n,t){for(var e in t)n[e]=t[e]},Ho=function(n,t){return t.querySelector(n)},Fo=function(n,t){return t.querySelectorAll(n)},Po=bo[a(bo,"matchesSelector")],Oo=function(n,t){return Po.call(n,t)};"function"==typeof Sizzle&&(Ho=function(n,t){return Sizzle(n,t)[0]||null},Fo=function(n,t){return Sizzle.uniqueSort(Sizzle(n,t))},Oo=Sizzle.matchesSelector),mo.selection=function(){return Uo};var Ro=mo.selection.prototype=[];Ro.select=function(n){var t,e,r,u,i=[];n=d(n);for(var o=-1,a=this.length;++o=0&&(e=n.substring(0,t),n=n.substring(t+1)),Yo.hasOwnProperty(e)?{space:Yo[e],local:n}:n}},Ro.attr=function(n,t){if(arguments.length<2){if("string"==typeof n){var e=this.node();return n=mo.ns.qualify(n),n.local?e.getAttributeNS(n.space,n.local):e.getAttribute(n)}for(t in n)this.each(m(t,n[t]));return this}return this.each(m(n,t))},Ro.classed=function(n,t){if(arguments.length<2){if("string"==typeof n){var e=this.node(),r=(n=n.trim().split(/^|\s+/g)).length,u=-1;if(t=e.classList){for(;++u =0?n.substring(0,t):n,r=t>=0?n.substring(t+1):"in";return e=bc.get(e)||xc,r=_c.get(r)||dt,Ar(r(e.apply(null,Array.prototype.slice.call(arguments,1))))},mo.interpolateHcl=Rr,mo.interpolateHsl=Yr,mo.interpolateLab=Ir,mo.interpolateRound=Ur,mo.transform=function(n){var t=xo.createElementNS(mo.ns.prefix.svg,"g");return(mo.transform=function(n){if(null!=n){t.setAttribute("transform",n);var e=t.transform.baseVal.consolidate()}return new Zr(e?e.matrix:wc)})(n)},Zr.prototype.toString=function(){return"translate("+this.translate+")rotate("+this.rotate+")skewX("+this.skew+")scale("+this.scale+")"};var wc={a:1,b:0,c:0,d:1,e:0,f:0};mo.interpolateTransform=Br,mo.layout={},mo.layout.bundle=function(){return function(n){for(var t=[],e=-1,r=n.length;++eFlight Operations Dashboard
+
+
+
+ 0;h--)o.push(i(l)*h);for(l=0;o[l]c;s--);o=o.slice(l,s)}return o},o.tickFormat=function(n,t){if(!arguments.length)return qc;arguments.length<2?t=qc:"function"!=typeof t&&(t=mo.format(t));var r,a=Math.max(.1,n/o.ticks().length),c=e?(r=1e-12,Math.ceil):(r=-1e-12,Math.floor);return function(n){return n/i(c(u(n)+r))<=a?t(n):""}},o.copy=function(){return ii(n.copy(),t,e,r)},ni(o,n)}function oi(n,t,e){function r(t){return n(u(t))}var u=ai(t),i=ai(1/t);return r.invert=function(t){return i(n.invert(t))},r.domain=function(t){return arguments.length?(n.domain((e=t.map(Number)).map(u)),r):e},r.ticks=function(n){return ri(e,n)},r.tickFormat=function(n,t){return ui(e,n,t)},r.nice=function(n){return r.domain(ti(e,n))},r.exponent=function(o){return arguments.length?(u=ai(t=o),i=ai(1/t),n.domain(e.map(u)),r):t},r.copy=function(){return oi(n.copy(),t,e)},ni(r,n)}function ai(n){return function(t){return 0>t?-Math.pow(-t,n):Math.pow(t,n)}}function ci(n,t){function e(e){return o[((i.get(e)||"range"===t.t&&i.set(e,n.push(e)))-1)%o.length]}function r(t,e){return mo.range(n.length).map(function(n){return t+e*n})}var i,o,a;return e.domain=function(r){if(!arguments.length)return n;n=[],i=new u;for(var o,a=-1,c=r.length;++a