Skip to content

Commit 579ef3e

Browse files
committed
⚙️ improve localbox error handling
1 parent 23f77dd commit 579ef3e

File tree

1 file changed

+129
-62
lines changed

1 file changed

+129
-62
lines changed

src/codeboxapi/box/localbox.py

Lines changed: 129 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -108,16 +108,40 @@ def start(self) -> CodeBoxStatus:
108108
return CodeBoxStatus(status="started")
109109

110110
def _connect(self) -> None:
111-
response = requests.post(
112-
f"{self.kernel_url}/kernels",
113-
headers={"Content-Type": "application/json"},
114-
timeout=270,
115-
)
116-
self.kernel_id = response.json()["id"]
111+
# Implement retry logic for kernel connection
112+
for attempt in range(5):
113+
try:
114+
response = requests.post(
115+
f"{self.kernel_url}/kernels",
116+
headers={"Content-Type": "application/json"},
117+
timeout=60,
118+
)
119+
if response.status_code == 201:
120+
self.kernel_id = response.json().get("id", None)
121+
if self.kernel_id:
122+
break
123+
except requests.RequestException as e:
124+
print(f"Could not connect to kernel. {e}")
125+
time.sleep(5) # Wait for 5 seconds before retrying
126+
117127
if self.kernel_id is None:
118-
raise Exception("Could not start kernel")
128+
raise Exception("Could not start kernel after multiple attempts")
119129

120-
self.ws = ws_connect_sync(f"{self.ws_url}/kernels/{self.kernel_id}/channels")
130+
# Connect to WebSocket with retry logic
131+
for attempt in range(5):
132+
try:
133+
self.ws = ws_connect_sync(
134+
f"{self.ws_url}/kernels/{self.kernel_id}/channels",
135+
open_timeout=60,
136+
close_timeout=60,
137+
)
138+
break # Break the loop if connection is successful
139+
except (ConnectionClosedError, TimeoutError) as e:
140+
print(f"Attempt {attempt + 1}: WebSocket connection failed. Error: {e}")
141+
time.sleep(5) # Wait for 5 seconds before retrying
142+
143+
if not self.ws:
144+
raise Exception("Could not connect to WebSocket after multiple attempts")
121145

122146
def _check_port(self) -> None:
123147
try:
@@ -189,14 +213,45 @@ async def astart(self) -> CodeBoxStatus:
189213

190214
async def _aconnect(self) -> None:
191215
if self.aiohttp_session is None:
192-
self.aiohttp_session = aiohttp.ClientSession()
193-
response = await self.aiohttp_session.post(
194-
f"{self.kernel_url}/kernels", headers={"Content-Type": "application/json"}
195-
)
196-
self.kernel_id = (await response.json())["id"]
216+
timeout = aiohttp.ClientTimeout(total=270)
217+
self.aiohttp_session = aiohttp.ClientSession(timeout=timeout)
218+
219+
# Implement retry logic for kernel connection
220+
for attempt in range(5):
221+
try:
222+
response = await self.aiohttp_session.post(
223+
f"{self.kernel_url}/kernels",
224+
headers={"Content-Type": "application/json"},
225+
)
226+
if response.status == 201:
227+
self.kernel_id = (await response.json()).get("id", None)
228+
if self.kernel_id:
229+
break
230+
except aiohttp.ClientError as e:
231+
print(f"Attempt {attempt + 1}: Could not connect to kernel. Error: {e}")
232+
await asyncio.sleep(5) # Wait for 5 seconds before retrying
233+
197234
if self.kernel_id is None:
198-
raise Exception("Could not start kernel")
199-
self.ws = await ws_connect(f"{self.ws_url}/kernels/{self.kernel_id}/channels")
235+
raise Exception("Could not start kernel after multiple attempts")
236+
237+
# Connect to WebSocket with increased timeout and retry logic
238+
for attempt in range(5):
239+
try:
240+
self.ws = await ws_connect(
241+
f"{self.ws_url}/kernels/{self.kernel_id}/channels",
242+
timeout=60,
243+
open_timeout=60,
244+
close_timeout=60,
245+
)
246+
break # Break the loop if connection is successful
247+
except asyncio.TimeoutError as e:
248+
print(
249+
f"Attempt {attempt + 1}: WebSocket connection timeout. Error: {e}"
250+
)
251+
await asyncio.sleep(5) # Wait for 5 seconds before retrying
252+
253+
if not self.ws:
254+
raise Exception("Could not connect to WebSocket after multiple attempts")
200255

201256
async def _acheck_port(self) -> None:
202257
try:
@@ -294,44 +349,49 @@ def run(
294349
self.start()
295350
return self.run(code, file_path, retry - 1)
296351

352+
msg_header = received_msg.get("header", {})
353+
msg_parent_header = received_msg.get("parent_header", {})
354+
msg_content = received_msg.get("content", {})
355+
msg_data = msg_content.get("data", {})
356+
297357
if (
298-
received_msg["header"]["msg_type"] == "stream"
299-
and received_msg["parent_header"]["msg_id"] == msg_id
358+
msg_header["msg_type"] == "stream"
359+
and msg_parent_header["msg_id"] == msg_id
300360
):
301-
msg = received_msg["content"]["text"].strip()
361+
msg = msg_content["text"].strip()
302362
if "Requirement already satisfied:" in msg:
303363
continue
304364
result += msg + "\n"
305365
if settings.VERBOSE:
306366
print("Output:\n", result)
307367

308368
elif (
309-
received_msg["header"]["msg_type"] == "execute_result"
310-
and received_msg["parent_header"]["msg_id"] == msg_id
369+
msg_header["msg_type"] == "execute_result"
370+
and msg_parent_header["msg_id"] == msg_id
311371
):
312-
result += received_msg["content"]["data"]["text/plain"].strip() + "\n"
372+
result += msg_data["text/plain"].strip() + "\n"
313373
if settings.VERBOSE:
314374
print("Output:\n", result)
315375

316-
elif received_msg["header"]["msg_type"] == "display_data":
317-
if "image/png" in received_msg["content"]["data"]:
376+
elif msg_header["msg_type"] == "display_data":
377+
if "image/png" in msg_data:
318378
return CodeBoxOutput(
319379
type="image/png",
320-
content=received_msg["content"]["data"]["image/png"],
380+
content=msg_data["image/png"],
321381
)
322-
if "text/plain" in received_msg["content"]["data"]:
382+
if "text/plain" in msg_data:
323383
return CodeBoxOutput(
324384
type="text",
325-
content=received_msg["content"]["data"]["text/plain"],
385+
content=msg_data["text/plain"],
326386
)
327387
return CodeBoxOutput(
328388
type="error",
329389
content="Could not parse output",
330390
)
331391
elif (
332-
received_msg["header"]["msg_type"] == "status"
333-
and received_msg["parent_header"]["msg_id"] == msg_id
334-
and received_msg["content"]["execution_state"] == "idle"
392+
msg_header["msg_type"] == "status"
393+
and msg_parent_header["msg_id"] == msg_id
394+
and msg_content["execution_state"] == "idle"
335395
):
336396
if len(result) > 500:
337397
result = "[...]\n" + result[-500:]
@@ -340,13 +400,10 @@ def run(
340400
)
341401

342402
elif (
343-
received_msg["header"]["msg_type"] == "error"
344-
and received_msg["parent_header"]["msg_id"] == msg_id
403+
msg_header["msg_type"] == "error"
404+
and msg_parent_header["msg_id"] == msg_id
345405
):
346-
error = (
347-
f"{received_msg['content']['ename']}: "
348-
f"{received_msg['content']['evalue']}"
349-
)
406+
error = f"{msg_content['ename']}: " f"{msg_content['evalue']}"
350407
if settings.VERBOSE:
351408
print("Error:\n", error)
352409
return CodeBoxOutput(type="error", content=error)
@@ -367,8 +424,6 @@ async def arun(
367424
raise RuntimeError("Could not connect to kernel")
368425
if not self.ws:
369426
await self._aconnect()
370-
if not self.ws:
371-
raise RuntimeError("Jupyter not running. Make sure to start it first.")
372427

373428
if settings.VERBOSE:
374429
print("Running code:\n", code)
@@ -406,40 +461,45 @@ async def arun(
406461
await self.astart()
407462
return await self.arun(code, file_path, retry - 1)
408463

464+
msg_header = received_msg.get("header", {})
465+
msg_parent_header = received_msg.get("parent_header", {})
466+
msg_content = received_msg.get("content", {})
467+
msg_data = msg_content.get("data", {})
468+
409469
if (
410-
received_msg["header"]["msg_type"] == "stream"
411-
and received_msg["parent_header"]["msg_id"] == msg_id
470+
msg_header["msg_type"] == "stream"
471+
and msg_parent_header["msg_id"] == msg_id
412472
):
413-
msg = received_msg["content"]["text"].strip()
473+
msg = msg_content["text"].strip()
414474
if "Requirement already satisfied:" in msg:
415475
continue
416476
result += msg + "\n"
417477
if settings.VERBOSE:
418478
print("Output:\n", result)
419479

420480
elif (
421-
received_msg["header"]["msg_type"] == "execute_result"
422-
and received_msg["parent_header"]["msg_id"] == msg_id
481+
msg_header["msg_type"] == "execute_result"
482+
and msg_parent_header["msg_id"] == msg_id
423483
):
424-
result += received_msg["content"]["data"]["text/plain"].strip() + "\n"
484+
result += msg_data["text/plain"].strip() + "\n"
425485
if settings.VERBOSE:
426486
print("Output:\n", result)
427487

428-
elif received_msg["header"]["msg_type"] == "display_data":
429-
if "image/png" in received_msg["content"]["data"]:
488+
elif msg_header["msg_type"] == "display_data":
489+
if "image/png" in msg_data:
430490
return CodeBoxOutput(
431491
type="image/png",
432-
content=received_msg["content"]["data"]["image/png"],
492+
content=msg_data["image/png"],
433493
)
434-
if "text/plain" in received_msg["content"]["data"]:
494+
if "text/plain" in msg_data:
435495
return CodeBoxOutput(
436496
type="text",
437-
content=received_msg["content"]["data"]["text/plain"],
497+
content=msg_data["text/plain"],
438498
)
439499
elif (
440-
received_msg["header"]["msg_type"] == "status"
441-
and received_msg["parent_header"]["msg_id"] == msg_id
442-
and received_msg["content"]["execution_state"] == "idle"
500+
msg_header["msg_type"] == "status"
501+
and msg_parent_header["msg_id"] == msg_id
502+
and msg_content["execution_state"] == "idle"
443503
):
444504
if len(result) > 500:
445505
result = "[...]\n" + result[-500:]
@@ -448,13 +508,10 @@ async def arun(
448508
)
449509

450510
elif (
451-
received_msg["header"]["msg_type"] == "error"
452-
and received_msg["parent_header"]["msg_id"] == msg_id
511+
msg_header["msg_type"] == "error"
512+
and msg_parent_header["msg_id"] == msg_id
453513
):
454-
error = (
455-
f"{received_msg['content']['ename']}: "
456-
f"{received_msg['content']['evalue']}"
457-
)
514+
error = f"{msg_content['ename']}: " f"{msg_content['evalue']}"
458515
if settings.VERBOSE:
459516
print("Error:\n", error)
460517
return CodeBoxOutput(type="error", content=error)
@@ -500,18 +557,27 @@ async def alist_files(self) -> List[CodeBoxFile]:
500557
return await asyncio.to_thread(self.list_files)
501558

502559
def restart(self) -> CodeBoxStatus:
560+
# self.stop()
561+
# self.start()
503562
return CodeBoxStatus(status="restarted")
504563

505564
async def arestart(self) -> CodeBoxStatus:
565+
# await self.astop()
566+
# await self.astart()
506567
return CodeBoxStatus(status="restarted")
507568

508569
def stop(self) -> CodeBoxStatus:
509570
try:
510571
if self.jupyter is not None:
511-
self.jupyter.terminate()
512-
self.jupyter.wait()
513-
self.jupyter = None
514-
time.sleep(2)
572+
if isinstance(self.jupyter, subprocess.Popen):
573+
self.jupyter.terminate()
574+
self.jupyter.wait()
575+
self.jupyter = None
576+
time.sleep(2)
577+
elif isinstance(self.jupyter, Process):
578+
self.jupyter.terminate()
579+
self.jupyter = None
580+
time.sleep(5)
515581
else:
516582
for pid in self._jupyter_pids:
517583
os.kill(pid, signal.SIGTERM)
@@ -534,8 +600,9 @@ def stop(self) -> CodeBoxStatus:
534600
async def astop(self) -> CodeBoxStatus:
535601
if self.jupyter is not None:
536602
self.jupyter.terminate()
603+
await asyncio.create_subprocess_exec("kill", "-9", str(self.jupyter.pid))
604+
await asyncio.sleep(5)
537605
self.jupyter = None
538-
await asyncio.sleep(2)
539606

540607
if self.ws is not None:
541608
try:

0 commit comments

Comments
 (0)