forked from franzinc/nfs
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsunrpc-service.cl
363 lines (313 loc) · 11.3 KB
/
sunrpc-service.cl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
;; -*- mode: common-lisp -*-
;; See the file LICENSE for the full license governing this code.
;; Service stuff
(in-package :sunrpc)
(eval-when (compile)
(declaim (optimize (speed 3))))
(defmacro with-portmapper-mappings ((service prog versions udpport tcpport)
&body body)
(let ((s (gensym))
(p (gensym))
(vs (gensym))
(uport (gensym))
(tport (gensym))
(v (gensym)))
`(let ((,s ,service)
(,p ,prog)
(,vs ,versions)
(,uport ,udpport)
(,tport ,tcpport))
(if (not (listp, vs))
(setf ,vs (list ,vs)))
(when (/= ,p portmap:*pmap-prog*)
;; Cleanup first
(dolist (,v ,vs)
(portmap-unset ,p ,v))
;; Now register.
(dolist (,v ,vs)
(when ,uport
(if (null (portmap-set ,p ,v :udp ,uport))
(user::bailout "~a: Failed to register with portmapper.~%" ,s)))
(when ,tport
(if (null (portmap-set ,p ,v :tcp ,tport))
(user::bailout "~a: Failed to register with portmapper.~%" ,s)))))
(unwind-protect (progn ,@body)
;; Unregister
(when (/= ,p portmap:*pmap-prog*)
(dolist (,v ,vs)
(ignore-errors (portmap-unset ,p ,v))))))))
;; valid means:
;; message type is *call*
;; rpcvers is 2
(defmacro with-valid-call ((msg peer cbody) &body body)
(let ((m (gensym))
(p (gensym)))
`(let ((,m (rpc-msg-body ,msg))
(,p ,peer)
,cbody)
(when (eq (rpc-msg-body-u-mtype ,m) #.*call*)
(setf ,cbody (rpc-msg-body-u-cbody ,m))
(if* (not (eq (call-body-rpcvers ,cbody) 2))
then (send-rpc-mismatch-reply ,p (rpc-msg-xid ,m) 2 2)
else ,@body)))))
(defun make-rpc-socket (service &rest params)
(handler-case (apply #'socket:make-socket params)
(error (c)
(user::bailout "
~a: Error while creating socket: ~a~%" service c))))
(defmacro with-rpc-sockets ((service usock tsock &key port) &body body)
(let ((s (gensym))
(p (gensym)))
`(let* ((,s ,service)
(,p ,port)
(,usock (make-rpc-socket ,s :type :datagram :local-port ,p))
(,tsock (make-rpc-socket ,s :type :hiper :connect :passive
:local-port ,p)))
(unwind-protect (progn ,@body)
(ignore-errors (close ,tsock))
(ignore-errors (close ,usock))))))
(defmacro def-rpc-program-1 ((program prognum versions usock tsock
&key port)
&body body)
`(with-rpc-sockets (,program ,usock ,tsock :port ,port)
(user::logit-stamp "~a: Using UDP port ~d~%"
,program (socket:local-port ,usock))
(user::logit-stamp "~a: Using TCP port ~d~%"
,program (socket:local-port ,tsock))
(with-portmapper-mappings (,program ,prognum ',versions
(socket:local-port ,usock)
(socket:local-port ,tsock))
,@body)))
(defun prepend-xdr (sym)
(intern (concatenate 'string (symbol-name 'xdr) "-" (symbol-name sym))))
;; 'program' is a string.
(defmacro def-rpc-program-main (program prognum proc-versions usock tsock
lowest-version highest-version)
(let ((server (gensym))
(msgxdr (gensym))
(peer (gensym))
(msg (gensym))
(cbody (gensym))
(vers (gensym))
(res (gensym))
(init-func (intern (concatenate 'string program "-init")))
version-cases)
(dolist (vdef proc-versions)
(let (proc-cases)
(dolist (procdef (cdr vdef))
(push
(let ((encoder (fourth procdef)))
(if (not (eq encoder :ignore))
(setf encoder (prepend-xdr encoder)))
`(,(first procdef)
(setf func (quote ,(second procdef)))
(setf args-decoder (quote ,(prepend-xdr (third procdef))))
(setf res-encoder (quote ,encoder))))
proc-cases))
(push `(t
(user::logit-stamp "~
~a: ~a requested procedure ~d, version ~a, which is unavailable.~%"
,program
(peer-dotted ,peer)
(call-body-proc ,cbody) (call-body-vers ,cbody))
(send-proc-unavail-reply ,peer (rpc-msg-xid ,msg)
*nullverf*)
(return))
proc-cases)
(setf proc-cases (nreverse proc-cases))
(push
`(,(car vdef)
(case procnum
,@proc-cases))
version-cases)))
(setf version-cases (nreverse version-cases))
`(let ((,server (make-rpc-server :tcpsock ,tsock
:udpsock ,usock)))
(if (fboundp ',init-func)
(funcall ',init-func))
;; Indicate successful initialization
(mp:open-gate start-gate) ;; start-gate is bound by def-rpc-program
(loop
(block nil
(let* ((,msgxdr (rpc-get-message ,server))
(,msg (xdr-rpc-msg ,msgxdr))
(,peer (rpc-server-peer ,server)))
(with-valid-call (,msg ,peer ,cbody)
;; sanity checks first
(if* (/= (call-body-prog ,cbody) ,prognum)
then (user::logit-stamp "~
~a: Sending program unavailable response for prog=~D to ~A~%"
,program
(call-body-prog ,cbody)
(peer-dotted ,peer))
(send-prog-unavail-reply ,peer (rpc-msg-xid ,msg)
*nullverf*)
(return))
(let ((,vers (call-body-vers ,cbody))
(procnum (call-body-proc ,cbody))
func args-decoder res-encoder)
(case ,vers
,@version-cases)
(if* (null func)
then (user::logit-stamp "~
~a: Sending program version mismatch response (requested version was ~D) to ~A~%"
,program
,vers
(peer-dotted ,peer))
(send-prog-mismatch-reply
,peer (rpc-msg-xid ,msg) *nullverf*
,lowest-version ,highest-version)
(return))
(if* (eq res-encoder :ignore)
then (funcall func
(funcall args-decoder ,msgxdr)
,vers
,peer
,cbody)
else (with-successful-reply (,res ,peer
(rpc-msg-xid ,msg)
*nullverf*)
(funcall res-encoder ,res
(funcall func
(funcall args-decoder ,msgxdr)
,vers
,peer
,cbody))))))))))))
(defmacro def-rpc-program ((prgname prognum &key port) definitions)
(let ((program (symbol-name prgname))
(usock (gensym))
(tsock (gensym)))
(let (all-versions ;; for use in portmapper call
proc-versions) ;; for use in main loop
(dolist (vdef definitions)
(let ((versions (first vdef)))
(if (not (listp versions))
(setf versions (list versions)))
(setf all-versions (append all-versions versions))
(push (cons versions (rest vdef)) proc-versions)))
(setf all-versions (sort all-versions #'<))
(setf proc-versions (nreverse proc-versions))
`(defun ,prgname (start-gate)
(declare (optimize (speed 3)))
(def-rpc-program-1 (,program ,prognum ,all-versions ,usock ,tsock
:port ,port)
(def-rpc-program-main ,program ,prognum ,proc-versions
,usock ,tsock
,(first all-versions)
,(car (last all-versions))))))))
(defstruct rpc-server
tcpsock
udpsock
tcpclientlist
(buffer (make-array #.*rpc-buffer-size* :element-type '(unsigned-byte 8)))
(peer (make-rpc-peer))) ;; peer associated with the last message received.
(defun cleanup-tcp-client-connection (server stream &key condition)
;; Closes down the tcp stream and removes it from server's tcp client list
(when *rpc-debug*
(if* condition
then (user::logit-stamp "client abort (~a) ~S~%" condition stream)
else (user::logit-stamp "client disconnected from ~S~%" stream)))
(ignore-errors (close stream))
(ignore-errors (close stream :abort t))
(setf (rpc-server-tcpclientlist server)
(delete stream (rpc-server-tcpclientlist server))))
(defun rpc-receive-and-handle-message (server handler)
;; Gets an RPC message and calls handler with the message and peer, handling
;; errno-stream-errors in the process. The return value is undefined.
(let ((message (rpc-get-message server)))
(handler-bind
((errno-stream-error ;; This should cover socket-error as well
(lambda (c)
(let ((s (stream-error-stream c)))
(when (member s (rpc-server-tcpclientlist server))
;; Assume some problem sending a response to a TCP client.
;; Drop the connection and consider the error handled.
(cleanup-tcp-client-connection server s :condition c)
(return-from rpc-receive-and-handle-message))
(when (eq s (rpc-server-udpsock server))
;; Assume some problem sending a response to a UDP client.
;; Log and move on.
(let ((peer (rpc-server-peer server)))
(user::logit-stamp "Failed to send response to ~a:~a. ~a~%"
(peer-dotted peer)
(rpc-peer-port peer)
c))
(return-from rpc-receive-and-handle-message))
))))
(funcall handler message (rpc-server-peer server)))))
;; Returns an xdr
;; Also fills in 'peer' slot of 'server'.
(defun rpc-get-message (server)
(symbol-macrolet ((clientlist (rpc-server-tcpclientlist server)))
(let ((tcpsock (rpc-server-tcpsock server))
(udpsock (rpc-server-udpsock server))
(buffer (rpc-server-buffer server))
(peer (rpc-server-peer server))
waitlist
readylist
record)
(loop
(setf waitlist clientlist)
(if tcpsock
(push tcpsock waitlist))
(if udpsock
(push udpsock waitlist))
;;(logit "waiting for input.~%")
;;(logit "waitlist is ~S~%" waitlist)
(handler-case (setf readylist (mp:wait-for-input-available waitlist))
(socket-error (c)
(case (stream-error-identifier c)
(:connection-reset
(let ((stream (stream-error-stream c)))
(cleanup-tcp-client-connection server stream :condition c)
nil))
(t
(error c)))))
;;(logit "readylist is ~A~%" readylist)
(when (member tcpsock readylist)
(if *rpc-debug*
(user::logit-stamp "~
Accepting new tcp connection and adding it to the client list.~%"))
(push (socket:accept-connection tcpsock) clientlist)
(setf readylist (delete tcpsock readylist)))
(when (member udpsock readylist)
(multiple-value-bind (vec count addr port)
(handler-case (socket:receive-from udpsock (length buffer)
:buffer buffer)
(socket-error (c)
(if *rpc-debug*
(user::logit-stamp "Ignoring socket error: ~a~%" c))
nil)
(errno-stream-error (c)
(case (stream-error-code c)
(10035 ;; WSAEWOULDBLOCK.. ignore
(if *rpc-debug*
(user::logit-stamp "Ignoring stream error: ~a~%" c))
nil)
(t
(user::logit-stamp "rpc-get-message: Unexpected stream error during receive-from: ~a~%" c)
(error c))))
(t (c)
(user::logit-stamp "rpc-get-message: Unexpected error during receive-from: ~a~%" c)
(error c)))
(when vec
(setf (rpc-peer-type peer) :datagram)
(setf (rpc-peer-socket peer) udpsock)
(setf (rpc-peer-addr peer) addr)
(setf (rpc-peer-port peer) port)
(return-from rpc-get-message (create-xdr :vec vec :size count))))
(setf readylist (delete udpsock readylist)))
;; all remaining entries on readylist will be tcp clients
(dolist (s readylist)
(setf record (read-record s buffer))
(if* (null record)
then (cleanup-tcp-client-connection server s)
else (setf (rpc-peer-type peer) :stream)
(setf (rpc-peer-socket peer) s)
(setf (rpc-peer-addr peer) (socket:remote-host s))
(return-from rpc-get-message (create-xdr :vec record))))))))
(eval-when (compile load eval)
(export '(def-rpc-program
make-rpc-server rpc-server-peer
rpc-get-message rpc-receive-and-handle-message
with-rpc-sockets with-portmapper-mappings with-valid-call)))