forked from getlantern/go-udtwrapper
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfd_rw.go
201 lines (175 loc) · 4.03 KB
/
fd_rw.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package udt
import (
"io"
"net"
"unsafe"
)
// #cgo CFLAGS: -Wall
// #cgo LDFLAGS: -lstdc++
//
// #include "udt_c.h"
// #include <errno.h>
// #include <arpa/inet.h>
// #include <string.h>
import "C"
func slice2cbuf(buf []byte) *C.char {
return (*C.char)(unsafe.Pointer(&buf[0]))
}
// udtIOError interprets the udt_getlasterror_code and returns an
// error if IO systems should stop.
func (fd *udtFD) udtIOError() error {
// switch C.udt_getlasterror_code() {
// case C.UDT_SUCCESS: // success :)
// case C.UDT_ECONNFAIL, C.UDT_ECONNLOST: // connection closed
// case C.UDT_EASYNCRCV, C.UDT_EASYNCSND: // no data to read (async)
// case C.UDT_ETIMEOUT: // timeout that we triggered
// default: // unexpected error, bail
// return lastError()
// }
// timeout and/or closing.
// TODO remove this and turn async off. This timeout is here because I'm seeing
// unexpected blocking (violating the timeout). Its not clear how the UDT async
// stuff and Goroutines mesh... this worked.
// UPDATE: async disabled for now.
select {
// case <-time.After(time.Duration(UDT_ASYNC_TIMEOUT) * time.Millisecond):
case <-fd.proc.Closing():
return io.EOF // seems to have been a graceful close
default:
}
return nil
}
func (fd *udtFD) incref() error {
select {
case <-fd.proc.Closing():
return errClosing
case <-fd.csema:
fd.proc.Children().Add(1)
fd.csema <- signal{}
return nil
}
}
func (fd *udtFD) decref() {
fd.proc.Children().Done()
}
func (fd *udtFD) readLock() error {
// first acquire control sema to add ourselves as a child
if err := fd.incref(); err != nil {
return err
}
// second acquire read sema (one reader at a time)
select {
case <-fd.proc.Closing():
fd.decref() // didnt work out. undo
return errClosing
case <-fd.rsema:
return nil
}
}
func (fd *udtFD) readUnlock() {
fd.decref()
fd.rsema <- signal{}
}
func (fd *udtFD) writeLock() error {
select {
case <-fd.proc.Closing():
return errClosing
case <-fd.csema:
<-fd.wsema
fd.proc.Children().Add(1)
fd.csema <- signal{}
return nil
}
}
func (fd *udtFD) writeUnlock() {
fd.proc.Children().Done()
fd.wsema <- signal{}
}
func (fd *udtFD) Read(buf []byte) (readcnt int, err error) {
if err = fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
readcnt = 0
for {
n := int(C.udt_recv(fd.sock, slice2cbuf(buf[readcnt:]), C.int(len(buf)-readcnt), 0))
if C.int(n) == C.ERROR {
// got problems?
if err = fd.udtIOError(); err != nil {
break
}
// nope, everything's fine. read again.
continue
}
if n > 0 {
readcnt += n
}
if err != nil { // bad things happened
break
}
if n == 0 {
err = io.EOF
}
break // return the data we have.
}
if err != nil && err != io.EOF {
err = &net.OpError{"read", fd.net, fd.laddr, err}
}
return readcnt, err
}
func (fd *udtFD) Write(buf []byte) (writecnt int, err error) {
if err = fd.writeLock(); err != nil {
return 0, err
}
defer fd.writeUnlock()
writecnt = 0
for {
n := int(C.udt_send(fd.sock, slice2cbuf(buf[writecnt:]), C.int(len(buf)-writecnt), 0))
if C.int(n) == C.ERROR {
// UDT Error?
if err = fd.udtIOError(); err != nil {
break
}
// everything's fine, proceed
}
// update our running count
if n > 0 {
writecnt += n
}
if writecnt == len(buf) { // done!
break
}
if err != nil { // bad things happened
break
}
if n == 0 { // early eof?
err = io.ErrUnexpectedEOF
break
}
}
if err != nil {
err = &net.OpError{"write", fd.net, fd.raddr, err}
}
return writecnt, err
}
type socketStatus C.enum_UDTSTATUS
func getSocketStatus(sock C.UDTSOCKET) socketStatus {
return socketStatus(C.udt_getsockstate(sock))
}
func (s socketStatus) inSetup() bool {
switch C.enum_UDTSTATUS(s) {
case C.INIT, C.OPENED, C.LISTENING, C.CONNECTING:
return true
}
return false
}
func (s socketStatus) inTeardown() bool {
switch C.enum_UDTSTATUS(s) {
case C.BROKEN, C.CLOSED, C.NONEXIST: // c.CLOSING
return true
}
return false
}
func (s socketStatus) inConnected(sock C.UDTSOCKET) bool {
return C.enum_UDTSTATUS(s) == C.CONNECTED
}