77
88# numpy
99import numpy as np
10- import shapely .geometry
1110
1211# messages
1312import zmq
2019import tornado .ioloop
2120
2221# mmi
23- from . import send_array , recv_array
22+ from . import send_array
23+ from .tracker_views import views
2424
2525ioloop .install ()
2626
3131 "PUB" , "REQ" , "REP" , "PAIR" }
3232}
3333
34- HAVE_GDAL = False
35- try :
36- import osgeo .osr
37- HAVE_GDAL = True
38- except ImportError :
39- pass
40-
4134
4235logging .basicConfig ()
4336logger = logging .getLogger (__name__ )
4437
4538
46- class Views (object ):
47- @staticmethod
48- def grid (context ):
49- meta = context ["value" ]
50- # Get connection info
51- node = meta ['node' ]
52- node = 'localhost'
53- req_port = meta ["ports" ]["REQ" ]
54- ctx = context ["ctx" ]
55- req = ctx .socket (zmq .REQ )
56- req .connect ("tcp://%s:%s" % (node , req_port ))
57- # Get grid variables
58- send_array (req , A = None , metadata = {"get_var" : "xk" })
59- xk , A = recv_array (req )
60- send_array (req , A = None , metadata = {"get_var" : "yk" })
61- yk , A = recv_array (req )
62- send_array (req , A = None , metadata = {"get_var" : "flowelemnode" })
63- yk , A = recv_array (req )
64- # Spatial transform
65- points = np .c_ [xk , yk ]
66- logger .info ("points shape: %s, values: %s" , points .shape , points )
67- if HAVE_GDAL :
68- src_srs = osgeo .osr .SpatialReference ()
69- src_srs .ImportFromEPSG (meta ["epsg" ])
70- dst_srs = osgeo .osr .SpatialReference ()
71- dst_srs .ImportFromEPSG (4326 )
72- transform = osgeo .osr .CoordinateTransformation (src_srs , dst_srs )
73- wkt_points = transform .TransformPoints (points [:1000 ])
74- else :
75- wkt_points = points
76- geom = shapely .geometry .MultiPoint (wkt_points )
77-
78- geojson = shapely .geometry .mapping (geom )
79- return geojson
80-
81-
82- views = Views ()
8339
8440
8541class WebSocket (tornado .websocket .WebSocketHandler ):
@@ -90,6 +46,8 @@ def __init__(self, application, request, **kwargs):
9046 self .metadata = None
9147
9248 def initialize (self , database , ctx ):
49+ # TODO: use database that supports timeout, persistency, logging, key value store
50+ # perhaps redis
9351 self .database = database
9452 self .ctx = ctx
9553
@@ -134,6 +92,7 @@ def on_message(self, message):
13492 # use the zmqstream as a socket (send, send_json)
13593 socket = self .pushstream
13694
95+ # TODO: can we just forward the bytes without deserializing?
13796 if isinstance (message , six .text_type ):
13897 metadata = json .loads (message )
13998 logger .debug ("got metadata %s" , metadata )
0 commit comments