-
Notifications
You must be signed in to change notification settings - Fork 2
/
mux.go
182 lines (162 loc) · 5.04 KB
/
mux.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package mux
import (
"crypto/ed25519"
"errors"
"fmt"
"io"
"net"
"time"
muxv1 "go.sia.tech/mux/v1"
muxv2 "go.sia.tech/mux/v2"
)
// A Mux multiplexes multiple duplex Streams onto a single net.Conn.
type Mux struct {
m1 *muxv1.Mux
m2 *muxv2.Mux
}
// Close closes the underlying net.Conn.
func (m *Mux) Close() error {
if m.m1 != nil {
return m.m1.Close()
}
return m.m2.Close()
}
// AcceptStream waits for and returns the next peer-initiated Stream.
func (m *Mux) AcceptStream() (*Stream, error) {
if m.m1 != nil {
s, err := m.m1.AcceptStream()
return &Stream{s1: s}, err
}
s, err := m.m2.AcceptStream()
return &Stream{s2: s}, err
}
// DialStream creates a new Stream.
//
// Unlike e.g. net.Dial, this does not perform any I/O; the peer will not be
// aware of the new Stream until Write is called.
func (m *Mux) DialStream() *Stream {
if m.m1 != nil {
return &Stream{s1: m.m1.DialStream()}
}
return &Stream{s2: m.m2.DialStream()}
}
// Dial initiates a mux protocol handshake on the provided conn.
func Dial(conn net.Conn, theirKey ed25519.PublicKey) (*Mux, error) {
// exchange versions
var theirVersion [1]byte
if _, err := conn.Write([]byte{2}); err != nil {
return nil, fmt.Errorf("could not write our version: %w", err)
} else if _, err := io.ReadFull(conn, theirVersion[:]); err != nil {
return nil, fmt.Errorf("could not read peer version: %w", err)
} else if theirVersion[0] == 0 {
return nil, errors.New("peer sent invalid version")
}
if theirVersion[0] == 1 {
m, err := muxv1.Dial(conn, theirKey)
return &Mux{m1: m}, err
}
m, err := muxv2.Dial(conn, theirKey)
return &Mux{m2: m}, err
}
// Accept reciprocates a mux protocol handshake on the provided conn.
func Accept(conn net.Conn, ourKey ed25519.PrivateKey) (*Mux, error) {
// exchange versions
var theirVersion [1]byte
if _, err := io.ReadFull(conn, theirVersion[:]); err != nil {
return nil, fmt.Errorf("could not read peer version: %w", err)
} else if _, err := conn.Write([]byte{2}); err != nil {
return nil, fmt.Errorf("could not write our version: %w", err)
} else if theirVersion[0] == 0 {
return nil, errors.New("peer sent invalid version")
}
if theirVersion[0] == 1 {
m, err := muxv1.Accept(conn, ourKey)
return &Mux{m1: m}, err
}
m, err := muxv2.Accept(conn, ourKey)
return &Mux{m2: m}, err
}
var anonPrivkey = ed25519.NewKeyFromSeed(make([]byte, 32))
var anonPubkey = anonPrivkey.Public().(ed25519.PublicKey)
// DialAnonymous initiates a mux protocol handshake to a party without a
// pre-established identity. The counterparty must reciprocate the handshake with
// AcceptAnonymous.
func DialAnonymous(conn net.Conn) (*Mux, error) { return Dial(conn, anonPubkey) }
// AcceptAnonymous reciprocates a mux protocol handshake without a
// pre-established identity. The counterparty must initiate the handshake with
// DialAnonymous.
func AcceptAnonymous(conn net.Conn) (*Mux, error) { return Accept(conn, anonPrivkey) }
// A Stream is a duplex connection multiplexed over a net.Conn. It implements
// the net.Conn interface.
type Stream struct {
s1 *muxv1.Stream
s2 *muxv2.Stream
}
// LocalAddr returns the underlying connection's LocalAddr.
func (s *Stream) LocalAddr() net.Addr {
if s.s1 != nil {
return s.s1.LocalAddr()
}
return s.s2.LocalAddr()
}
// RemoteAddr returns the underlying connection's RemoteAddr.
func (s *Stream) RemoteAddr() net.Addr {
if s.s1 != nil {
return s.s1.RemoteAddr()
}
return s.s2.RemoteAddr()
}
// SetDeadline sets the read and write deadlines associated with the Stream. It
// is equivalent to calling both SetReadDeadline and SetWriteDeadline.
//
// This implementation does not entirely conform to the net.Conn interface:
// setting a new deadline does not affect pending Read or Write calls, only
// future calls.
func (s *Stream) SetDeadline(t time.Time) error {
if s.s1 != nil {
return s.s1.SetDeadline(t)
}
return s.s2.SetDeadline(t)
}
// SetReadDeadline sets the read deadline associated with the Stream.
//
// This implementation does not entirely conform to the net.Conn interface:
// setting a new deadline does not affect pending Read calls, only future calls.
func (s *Stream) SetReadDeadline(t time.Time) error {
if s.s1 != nil {
return s.s1.SetReadDeadline(t)
}
return s.s2.SetReadDeadline(t)
}
// SetWriteDeadline sets the write deadline associated with the Stream.
//
// This implementation does not entirely conform to the net.Conn interface:
// setting a new deadline does not affect pending Write calls, only future
// calls.
func (s *Stream) SetWriteDeadline(t time.Time) error {
if s.s1 != nil {
return s.s1.SetWriteDeadline(t)
}
return s.s2.SetWriteDeadline(t)
}
// Read reads data from the Stream.
func (s *Stream) Read(p []byte) (int, error) {
if s.s1 != nil {
return s.s1.Read(p)
}
return s.s2.Read(p)
}
// Write writes data to the Stream.
func (s *Stream) Write(p []byte) (int, error) {
if s.s1 != nil {
return s.s1.Write(p)
}
return s.s2.Write(p)
}
// Close closes the Stream. The underlying connection is not closed.
func (s *Stream) Close() error {
if s.s1 != nil {
return s.s1.Close()
}
return s.s2.Close()
}