Skip to content

Commit

Permalink
Merge pull request #16 from esri-es/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
ntkog authored May 20, 2019
2 parents 2bd0b3f + 3e9ec0c commit 1264e2c
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 49 deletions.
64 changes: 64 additions & 0 deletions streamserver/pipelines/custom.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
This is where you can customize the transformation of your data from websockets
If you want to add more steps in the final pipeline:
- implement a function like this and add it to the array on the module.exports
function your_step( context ) {
return data => {
...do your stuff here
return result // result has to be an object like data or null to skip it
}
}
context =
{
geo : { lat : fieldLat, lon : fieldLon } || null,
service : <serviceConf>
}
data = {
key : <from the parser>
value : <your actual payload>
}
*/

const proj4 = require('proj4');

module.exports = [adaptPayload];

function adaptPayload (context) {
return data => {
if(context.geo !== null) {
let data_lat = data.value[context.geo.lat];
let data_lon = data.value[context.geo.lon];
if (data_lat !== 0 && data_lon !== 0 ) {
// Reprojection according to conf.
try {
let [lon,lat] = proj4(proj4.defs(`EPSG:${context.service.out_sr.latestWkid}`),[data_lon,data_lat])
let fixed = {
geometry : {
x : lon, y : lat,
spatialReference : context.service.out_sr
},
attributes : data.value
};
fixed.attributes.FltId = data.value.id_str;
data.value = fixed;

return data;
} catch(err) {
console.error(`Failed re-projection [${err}]`);
console.log(`${data_lat} || ${data_lon}`);
return null;
}
} else {
return null
}
} else {
return null
}
};
}
33 changes: 33 additions & 0 deletions streamserver/pipelines/default.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
const {parser} = require('stream-json/Parser');
const {streamValues} = require('stream-json/streamers/StreamValues');
const {chain} = require('stream-chain');
const CUSTOM_PIPELINE = require('./custom.js');

function _injectCtx (arr,ctx) {
return arr.map(fn => fn(ctx));
}

function sanityCheck(arr) {
let length = arr.length;
return Array.isArray(arr) && arr.filter(fn => typeof(fn) === "function").length === length;
}

function compose(ctx) {
let pipeline = [
parser({jsonStreaming: true}),
streamValues()
];
if (sanityCheck(CUSTOM_PIPELINE)) {
pipeline.push(..._injectCtx(CUSTOM_PIPELINE,ctx));
} else {
console.log(`Default Pipeline setup...[Skipping custom pipeline]`);
if (CUSTOM_PIPELINE.length > 0) {
console.warn(`Something is wrong : Please review your custom pipeline`);
process.exit(12);
}
}

return pipeline;
}

module.exports = compose;
73 changes: 24 additions & 49 deletions streamserver/streamserver_simple.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ const websocket = require('websocket-stream');
const http = require('http');
const Router = require('router');
const finalhandler = require('finalhandler');
const {parser} = require('stream-json/Parser');
const {streamValues} = require('stream-json/streamers/StreamValues');
const {chain} = require('stream-chain');
const uuidv4 = require('uuid/v4');
const esriTypes = require('./utils/esri_types.js');
const streamServerFilter = require('./utils/filter_utils.js');
const proj4 = require('proj4');
const defaultPipeline = require('./pipelines/default.js');

const JSAPI_VERSION = process.argv[3] || "4.11";

Expand Down Expand Up @@ -120,16 +117,16 @@ function _setupHTTPServer(serviceConf){

function _setupSource(obj) {
//console.log( `WS Server ready at [${conf.ws.client.wsUrl}/${BASE_URL}/subscribe]`);

return function handle(stream, request) {
var serverRef = this;
stream.binary = false;
stream.socket.uuid = uuidv4();
console.log(`client [${stream.socket.uuid}] connected`);
stream.socket.challenge = false;
var filter = false;
stream.on('data', function(buf){
let data = buf.toString();
console.log(`${data} from [${stream.socket.uuid}]`);
if (!obj.connections[stream.socket.uuid].challenge && _doChallenge()) {
if (!stream.socket.challenge && _doChallenge()) {
// Challenge
try {
stream.write(JSON.stringify({
Expand All @@ -140,58 +137,37 @@ function _setupSource(obj) {
console.log(`bad payload[${data}]`);
}
console.log("Challenge done!");
obj.connections[stream.socket.uuid].challenge = true;
stream.socket.challenge = true;
} else {
// Filters
try{
obj.connections[stream.socket.uuid].filter = JSON.parse(data).filter.where;
filter = true;
stream.socket.filter = JSON.parse(data).filter.where;
}catch(err){
console.log(`Invalid filter received from ${stream.socket.uuid}: ${data}`);
};
}
});
stream.on('close',function(){
console.log(`client [${stream.socket.uuid}] disconnected`);
delete obj.connections[stream.socket.uuid];
})
serverRef.clients.delete(stream.socket);
stream.end();
});

let pipeline = chain([
parser({jsonStreaming: true}),
streamValues(),
...defaultPipeline({ geo : obj.geo, service : obj.service}),
data => {
return filter
? streamServerFilter(data.value,obj.connections[stream.socket.uuid].filter)
return stream.socket.hasOwnProperty("filter")
? streamServerFilter(data.value,stream.socket.filter)
? data
: null
: data;
},
data => {

if (data.value[obj.geo.lat] !== 0 && data.value[obj.geo.lon] !== 0 && obj.geo !== null) {
// Reprojection according to conf.
let [lon,lat] = proj4(proj4.defs(`EPSG:${obj.service.out_sr.latestWkid}`),[data.value[obj.geo.lon],data.value[obj.geo.lat]])
let fixed = {
geometry : {
x : lon, y : lat,
spatialReference : obj.service.out_sr
},
attributes : data.value
};
fixed.attributes.FltId = data.value.id_str;
data.value = fixed;

return data;
} else {
return null
}
},
data => {
return JSON.stringify(data.value)
}
data => JSON.stringify(data.value)
]);

obj.connections[stream.socket.uuid] = stream.socket;
pipeline.on("error", function(err){
console.error(err);
})

obj.pullStream
.pipe(pipeline)
Expand All @@ -211,13 +187,18 @@ function start(cfg){
} else {
console.log(`Found spatial information in fields [${lonField},${latField}]`);
}
var connections = {};
let HTTPServer = _setupHTTPServer(conf.service);

let HTTPServer = _setupHTTPServer(conf.service);
let wsRemoteClient = websocket(conf.ws.client.wsUrl, {
perMessageDeflate: false
});

var fieldGeo = avoidGeo
? null
: {
lat : latField,
lon : lonField
};

var wss = websocket.createServer({
server: HTTPServer,
Expand All @@ -226,13 +207,7 @@ function start(cfg){
_setupSource({
pullStream : wsRemoteClient,
service: conf.service,
connections : connections,
geo : avoidGeo
? null
: {
lat : latField,
lon : lonField
}
geo : fieldGeo
}))
}

Expand Down

0 comments on commit 1264e2c

Please sign in to comment.