Skip to content

Commit 40c39d4

Browse files
committed
Implement L2CAP service
1 parent 6616477 commit 40c39d4

File tree

3 files changed

+326
-1
lines changed

3 files changed

+326
-1
lines changed

bumble/pandora/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@
2626
from .device import PandoraDevice
2727
from .host import HostService
2828
from .security import SecurityService, SecurityStorageService
29+
from .l2cap import L2CAPService
2930
from pandora.host_grpc_aio import add_HostServicer_to_server
3031
from pandora.security_grpc_aio import (
3132
add_SecurityServicer_to_server,
3233
add_SecurityStorageServicer_to_server,
3334
)
35+
from pandora.l2cap_grpc_aio import add_L2CAPServicer_to_server
3436
from typing import Callable, List, Optional
3537

3638
# public symbols
@@ -77,6 +79,7 @@ async def serve(
7779
add_SecurityStorageServicer_to_server(
7880
SecurityStorageService(bumble.device, config), server
7981
)
82+
add_L2CAPServicer_to_server(L2CAPService(bumble.device, config), server)
8083

8184
# call hooks if any.
8285
for hook in _SERVICERS_HOOKS:

bumble/pandora/l2cap.py

Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import abc
16+
import asyncio
17+
import collections
18+
import dataclasses
19+
import grpc
20+
import logging
21+
import struct
22+
23+
from bumble import device
24+
from bumble import l2cap
25+
from bumble.utils import EventWatcher
26+
from bumble.pandora import config
27+
from bumble.pandora import utils
28+
from google.protobuf import any_pb2 # pytype: disable=pyi-error
29+
from google.protobuf import empty_pb2 # pytype: disable=pyi-error
30+
from pandora import l2cap_pb2
31+
from pandora import l2cap_grpc_aio
32+
from typing import AsyncGenerator, Dict, Union, Optional, DefaultDict
33+
34+
35+
class ChannelProxy(abc.ABC):
36+
up_queue: asyncio.Queue[bytes] = asyncio.Queue()
37+
38+
def send(self, sdu: bytes) -> None:
39+
...
40+
41+
async def receive(self) -> bytes:
42+
return await self.up_queue.get()
43+
44+
def on_data(self, pdu: bytes) -> None:
45+
self.up_queue.put_nowait(pdu)
46+
47+
48+
class CocChannelProxy(ChannelProxy):
49+
def __init__(
50+
self, channel: Union[l2cap.ClassicChannel, l2cap.LeCreditBasedChannel]
51+
) -> None:
52+
super().__init__()
53+
self.channel = channel
54+
channel.sink = self.on_data
55+
self.disconnection_result = asyncio.get_event_loop().create_future()
56+
57+
@channel.once('close')
58+
def on_close() -> None:
59+
self.disconnection_result.set_result(None)
60+
61+
def send(self, data: bytes) -> None:
62+
if isinstance(self.channel, l2cap.ClassicChannel):
63+
self.channel.send_pdu(data)
64+
else:
65+
self.channel.write(data)
66+
67+
@property
68+
def closed(self):
69+
if isinstance(self.channel, l2cap.ClassicChannel):
70+
return self.channel.state == self.channel.State.CLOSED
71+
else:
72+
return self.channel.state == self.channel.State.DISCONNECTED
73+
74+
async def disconnect(self) -> None:
75+
if self.closed:
76+
return
77+
78+
await self.channel.disconnect()
79+
80+
async def wait_disconnect(self) -> None:
81+
if self.closed:
82+
return
83+
84+
await self.disconnection_result
85+
86+
87+
@dataclasses.dataclass
88+
class FixedChannelProxy(ChannelProxy):
89+
connection_handle: int
90+
cid: int
91+
device: device.Device
92+
93+
def send(self, data: bytes) -> None:
94+
self.device.send_l2cap_pdu(self.connection_handle, self.cid, data)
95+
96+
97+
class L2CAPService(l2cap_grpc_aio.L2CAPServicer):
98+
channels: DefaultDict[int, Dict[int, ChannelProxy]]
99+
100+
def __init__(self, device: device.Device, config: config.Config) -> None:
101+
self.log = utils.BumbleServerLoggerAdapter(
102+
logging.getLogger(), {'service_name': 'L2CAP', 'device': device}
103+
)
104+
self.device = device
105+
self.config = config
106+
self.channels = collections.defaultdict(dict)
107+
self.device.on('connection', self.on_acl)
108+
109+
def on_acl(self, connection: device.Connection) -> None:
110+
def on_disconnection(_reason) -> None:
111+
del self.channels[connection.handle]
112+
113+
connection.once('disconnection', on_disconnection)
114+
115+
def get_channel(self, channel: l2cap_pb2.Channel) -> ChannelProxy:
116+
connection_handle, cid = struct.unpack('>HH', channel.cookie.value)
117+
if cid not in self.channels[connection_handle]:
118+
raise RuntimeError('No valid cid or handle')
119+
return self.channels[connection_handle][cid]
120+
121+
@utils.rpc
122+
async def Connect(
123+
self, request: l2cap_pb2.ConnectRequest, context: grpc.ServicerContext
124+
) -> l2cap_pb2.ConnectResponse:
125+
self.log.debug('Connect')
126+
channel: Union[
127+
FixedChannelProxy, l2cap.ClassicChannel, l2cap.LeCreditBasedChannel
128+
]
129+
connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
130+
131+
connection = self.device.lookup_connection(connection_handle)
132+
if connection is None:
133+
raise RuntimeError('Connection not exist')
134+
135+
if request.type_variant() == 'fixed':
136+
# For fixed channel connection, do nothing because it's connectionless
137+
assert request.fixed
138+
cid = request.fixed.cid
139+
l2cap_cookie = any_pb2.Any(value=struct.pack('>HH', connection_handle, cid))
140+
self.channels[connection_handle][cid] = FixedChannelProxy(
141+
connection_handle=connection_handle,
142+
cid=cid,
143+
device=self.device,
144+
)
145+
146+
def on_fixed_pdu(connection_handle: int, pdu: bytes) -> None:
147+
self.channels[connection_handle][cid].on_data(pdu)
148+
149+
self.device.l2cap_channel_manager.register_fixed_channel(cid, on_fixed_pdu)
150+
return l2cap_pb2.ConnectResponse(
151+
channel=l2cap_pb2.Channel(cookie=l2cap_cookie)
152+
)
153+
154+
if request.type_variant() == 'basic':
155+
assert request.basic
156+
channel = await connection.create_l2cap_channel(
157+
spec=l2cap.ClassicChannelSpec(
158+
psm=request.basic.psm, mtu=request.basic.mtu
159+
)
160+
)
161+
elif request.type_variant() == 'le_credit_based':
162+
assert request.le_credit_based
163+
channel = await connection.create_l2cap_channel(
164+
spec=l2cap.LeCreditBasedChannelSpec(
165+
psm=request.le_credit_based.spsm,
166+
max_credits=request.le_credit_based.initial_credit,
167+
mtu=request.le_credit_based.mtu,
168+
mps=request.le_credit_based.mps,
169+
)
170+
)
171+
else:
172+
raise NotImplementedError()
173+
174+
self.channels[connection_handle][channel.source_cid] = CocChannelProxy(channel)
175+
l2cap_cookie = any_pb2.Any(
176+
value=struct.pack('>HH', connection_handle, channel.source_cid)
177+
)
178+
return l2cap_pb2.ConnectResponse(channel=l2cap_pb2.Channel(cookie=l2cap_cookie))
179+
180+
@utils.rpc
181+
async def OnConnection(
182+
self, request: l2cap_pb2.OnConnectionRequest, context: grpc.ServicerContext
183+
) -> AsyncGenerator[l2cap_pb2.OnConnectionResponse, None]:
184+
self.log.debug('WaitConnection')
185+
186+
queue: asyncio.Queue[l2cap_pb2.OnConnectionResponse] = asyncio.Queue()
187+
188+
watcher = EventWatcher()
189+
server: Union[
190+
l2cap.ClassicChannelServer, l2cap.LeCreditBasedChannelServer, None
191+
] = None
192+
fixed_cid: Optional[int] = None
193+
194+
# Fixed channels are connectionless, so it should produce a response immediately.
195+
if request.type_variant() == 'fixed':
196+
assert request.fixed
197+
fixed_cid = request.fixed.cid
198+
199+
def on_fixed_pdu(connection_handle: int, pdu: bytes) -> None:
200+
self.channels[connection_handle][fixed_cid].on_data(pdu)
201+
202+
channel_proxy = FixedChannelProxy(
203+
connection_handle=connection_handle,
204+
cid=fixed_cid,
205+
device=self.device,
206+
)
207+
self.channels[connection_handle][fixed_cid] = channel_proxy
208+
l2cap_cookie = any_pb2.Any(
209+
value=struct.pack('>HH', connection_handle, fixed_cid)
210+
)
211+
212+
queue.put_nowait(
213+
l2cap_pb2.OnConnectionResponse(
214+
channel=l2cap_pb2.Channel(cookie=l2cap_cookie)
215+
)
216+
)
217+
218+
# Register CID and callback
219+
self.device.l2cap_channel_manager.register_fixed_channel(
220+
fixed_cid, on_fixed_pdu
221+
)
222+
else:
223+
224+
def on_connected(
225+
channel: Union[l2cap.ClassicChannel, l2cap.LeCreditBasedChannel]
226+
) -> None:
227+
connection_handle = channel.connection.handle
228+
229+
# Save channel instances
230+
cid = channel.source_cid
231+
self.channels[connection_handle][cid] = CocChannelProxy(channel)
232+
233+
# Produce connection responses
234+
l2cap_cookie = any_pb2.Any(
235+
value=struct.pack('>HH', connection_handle, cid)
236+
)
237+
queue.put_nowait(
238+
l2cap_pb2.OnConnectionResponse(
239+
channel=l2cap_pb2.Channel(cookie=l2cap_cookie)
240+
)
241+
)
242+
243+
# Listen disconnections
244+
@watcher.on(channel, 'close')
245+
def on_close():
246+
del self.channels[connection_handle][cid]
247+
248+
if request.type_variant() == 'basic':
249+
assert request.basic
250+
server = self.device.create_l2cap_server(
251+
spec=l2cap.ClassicChannelSpec(psm=request.basic.psm),
252+
handler=on_connected,
253+
)
254+
elif request.type_variant() == 'le_credit_based':
255+
assert request.le_credit_based
256+
server = self.device.create_l2cap_server(
257+
spec=l2cap.LeCreditBasedChannelSpec(
258+
psm=request.le_credit_based.spsm,
259+
max_credits=request.le_credit_based.initial_credit,
260+
mtu=request.le_credit_based.mtu,
261+
mps=request.le_credit_based.mps,
262+
),
263+
handler=on_connected,
264+
)
265+
else:
266+
raise NotImplementedError()
267+
268+
try:
269+
# Produce event stream
270+
while event := await queue.get():
271+
yield event
272+
finally:
273+
watcher.close()
274+
if server:
275+
server.close()
276+
if fixed_cid:
277+
self.device.l2cap_channel_manager.deregister_fixed_channel(fixed_cid)
278+
279+
@utils.rpc
280+
async def Disconnect(
281+
self, request: l2cap_pb2.DisconnectRequest, context: grpc.ServicerContext
282+
) -> l2cap_pb2.DisconnectResponse:
283+
self.log.debug('Disconnect')
284+
channel = self.get_channel(request.channel)
285+
if isinstance(channel, FixedChannelProxy):
286+
raise ValueError('Fixed channel cannot be disconnected')
287+
288+
assert isinstance(channel, CocChannelProxy)
289+
await channel.disconnect()
290+
return l2cap_pb2.DisconnectResponse(success=empty_pb2.Empty())
291+
292+
@utils.rpc
293+
async def WaitDisconnection(
294+
self, request: l2cap_pb2.WaitDisconnectionRequest, context: grpc.ServicerContext
295+
) -> l2cap_pb2.WaitDisconnectionResponse:
296+
self.log.debug('WaitDisconnection')
297+
channel = self.get_channel(request.channel)
298+
if isinstance(channel, FixedChannelProxy):
299+
raise RuntimeError('Fixed channel cannot be disconnected')
300+
301+
assert isinstance(channel, CocChannelProxy)
302+
await channel.wait_disconnect()
303+
return l2cap_pb2.WaitDisconnectionResponse(success=empty_pb2.Empty())
304+
305+
@utils.rpc
306+
async def Receive(
307+
self, request: l2cap_pb2.ReceiveRequest, context: grpc.ServicerContext
308+
) -> AsyncGenerator[l2cap_pb2.ReceiveResponse, None]:
309+
self.log.debug('Receive')
310+
channel = self.get_channel(request.channel)
311+
312+
while packet := await channel.receive():
313+
yield l2cap_pb2.ReceiveResponse(data=packet)
314+
315+
@utils.rpc
316+
async def Send(
317+
self, request: l2cap_pb2.SendRequest, context: grpc.ServicerContext
318+
) -> l2cap_pb2.SendResponse:
319+
self.log.debug('Send')
320+
channel = self.get_channel(request.channel)
321+
channel.send(request.data)
322+
return l2cap_pb2.SendResponse(success=empty_pb2.Empty())

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ include_package_data = True
3333
install_requires =
3434
aiohttp ~= 3.8; platform_system!='Emscripten'
3535
appdirs >= 1.4; platform_system!='Emscripten'
36-
bt-test-interfaces >= 0.0.2; platform_system!='Emscripten'
36+
bt-test-interfaces >= 0.0.4; platform_system!='Emscripten'
3737
click == 8.1.3; platform_system!='Emscripten'
3838
cryptography == 39; platform_system!='Emscripten'
3939
# Pyodide bundles a version of cryptography that is built for wasm, which may not match the

0 commit comments

Comments
 (0)