Skip to content

Commit 18ff8c7

Browse files
committed
Built-in esptool flash (via Pythonx)
Optional use built-in esptool install via Pythonx. Signed-off-by: Peter M <[email protected]>
1 parent cbd2fb6 commit 18ff8c7

File tree

6 files changed

+563
-5
lines changed

6 files changed

+563
-5
lines changed

lib/esptool_helper.ex

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
defmodule ExAtomVM.EsptoolHelper do
2+
@moduledoc """
3+
Module for setting up and using esptool through Pythonx.
4+
"""
5+
6+
@doc """
7+
Initializes Python environment with project configuration.
8+
We use locked main branch esptool version, pending a stable 5.x release,
9+
as we need the read to memory (instead of only to file) features.
10+
"""
11+
def setup do
12+
case Code.ensure_loaded(Pythonx) do
13+
{:module, Pythonx} ->
14+
Application.ensure_all_started(:pythonx)
15+
16+
Pythonx.uv_init("""
17+
[project]
18+
name = "project"
19+
version = "0.0.0"
20+
requires-python = "==3.13.*"
21+
dependencies = [
22+
"esptool @ git+https://github.com/espressif/esptool.git@6f0d779"
23+
]
24+
""")
25+
26+
_ ->
27+
{:error, :pythonx_not_available,
28+
"The :pythonx dependency is not available. Please add it to your mix.exs dependencies.\n{:pythonx, \"~> 0.4.0\"}"}
29+
end
30+
end
31+
32+
def flash_pythonx(tool_args) do
33+
# https://github.com/espressif/esptool/blob/master/docs/en/esptool/scripting.rst
34+
35+
tool_args =
36+
if not Enum.member?(tool_args, "--port") do
37+
selected_device = select_device()
38+
39+
["--port", selected_device["port"]] ++ tool_args
40+
else
41+
tool_args
42+
end
43+
44+
{_result, globals} =
45+
try do
46+
Pythonx.eval(
47+
"""
48+
import esptool
49+
import sys
50+
51+
command = [arg.decode('utf-8') for arg in tool_args]
52+
53+
def flash_esp():
54+
esptool.main(command)
55+
56+
if __name__ == "__main__":
57+
try:
58+
result = flash_esp()
59+
result = True
60+
except SystemExit as e:
61+
exit_code = int(str(e))
62+
result = exit_code == 0
63+
except Exception as e:
64+
print(f"Warning: {e}")
65+
result = True
66+
67+
""",
68+
%{"tool_args" => tool_args}
69+
)
70+
rescue
71+
e in Pythonx.Error ->
72+
IO.inspect("Pythonx error occurred: #{inspect(e)}")
73+
exit({:shutdown, 1})
74+
end
75+
76+
Pythonx.decode(globals["result"])
77+
end
78+
79+
@moduledoc """
80+
Erases flash of an ESP32 device.
81+
--after "no-reset" is needed for keeping USB-OTG devices like esp32-S2 in a good state.
82+
"""
83+
def erase_flash(tool_args \\ ["--chip", "auto", "--after", "no-reset"]) do
84+
tool_args =
85+
if not Enum.member?(tool_args, "--port") do
86+
selected_device = select_device()
87+
88+
confirmation =
89+
IO.gets(
90+
"\nAre you sure you want to erase the flash of\n#{selected_device["chip_family_name"]} - Port: #{selected_device["port"]} MAC: #{selected_device["mac_address"]} ? [N/y]: "
91+
)
92+
93+
case String.trim(confirmation) do
94+
input when input in ["Y", "y"] ->
95+
IO.puts("Erasing..")
96+
97+
_ ->
98+
IO.puts("Flash erase cancelled.")
99+
exit({:shutdown, 0})
100+
end
101+
102+
["--port", selected_device["port"]] ++ tool_args ++ ["erase-flash"]
103+
else
104+
tool_args ++ ["erase-flash"]
105+
end
106+
107+
{_result, globals} =
108+
try do
109+
Pythonx.eval(
110+
"""
111+
import esptool
112+
113+
command = [arg.decode('utf-8') for arg in tool_args]
114+
115+
def flash_esp():
116+
esptool.main(command)
117+
118+
if __name__ == "__main__":
119+
try:
120+
result = flash_esp()
121+
result = True
122+
except SystemExit as e:
123+
exit_code = int(str(e))
124+
result = exit_code == 0
125+
except Exception as e:
126+
print(f"Warning: {e}")
127+
result = False
128+
""",
129+
%{"tool_args" => tool_args}
130+
)
131+
rescue
132+
e in Pythonx.Error ->
133+
IO.inspect("Pythonx error occurred: #{inspect(e)}")
134+
exit({:shutdown, 1})
135+
end
136+
137+
Pythonx.decode(globals["result"])
138+
end
139+
140+
def connected_devices do
141+
{_result, globals} =
142+
try do
143+
Pythonx.eval(
144+
"""
145+
from esptool.cmds import (detect_chip, read_flash, attach_flash)
146+
import serial.tools.list_ports as list_ports
147+
import re
148+
149+
ports = []
150+
for port in list_ports.comports():
151+
if port.vid is None:
152+
continue
153+
ports.append(port.device)
154+
155+
result = []
156+
for port in ports:
157+
try:
158+
with detect_chip(port) as esp:
159+
description = esp.get_chip_description()
160+
features = esp.get_chip_features()
161+
mac_addr = ':'.join(['%02X' % b for b in esp.read_mac()])
162+
163+
# chips like esp32-s2 can have more specific names, so we call this chip family
164+
# https://github.com/espressif/esptool/blob/807d02b0c5eb07ba46f871a492c84395fb9f37be/esptool/targets/esp32s2.py#L167
165+
chip_family_name = esp.CHIP_NAME
166+
167+
# read 128 bytes at 0x10030
168+
attach_flash(esp)
169+
app_header = read_flash(esp, 0x10030, 128, None)
170+
app_header_strings = [s for s in re.split('\\x00', app_header.decode('utf-8', errors='replace')) if s]
171+
172+
usb_mode = esp.get_usb_mode()
173+
174+
# this is needed to keep USB-OTG boards like esp32-S2 in a good state
175+
esp.run_stub()
176+
177+
result.append({"port": port, "chip_family_name": chip_family_name,
178+
"features": features, "build_info": app_header_strings,
179+
"mac_address": mac_addr, "usb_mode": usb_mode
180+
})
181+
except Exception as e:
182+
print(f"Error: {e}")
183+
result = []
184+
""",
185+
%{}
186+
)
187+
rescue
188+
e in Pythonx.Error ->
189+
{:error, "Pythonx error occurred: #{inspect(e)}"}
190+
end
191+
192+
Pythonx.decode(globals["result"])
193+
|> Enum.map(fn device ->
194+
Map.put(device, "atomvm_installed", Enum.member?(device["build_info"], "atomvm-esp32"))
195+
end)
196+
end
197+
198+
def select_device do
199+
devices = connected_devices()
200+
201+
selected_device =
202+
case length(devices) do
203+
0 ->
204+
IO.puts(
205+
"Found no esp32 devices..\nYou may have to hold BOOT button down while plugging in the device"
206+
)
207+
208+
exit({:shutdown, 1})
209+
210+
1 ->
211+
hd(devices)
212+
213+
_ ->
214+
IO.puts("\nMultiple ESP32 devices found:")
215+
216+
devices
217+
|> Enum.with_index(1)
218+
|> Enum.each(fn {device, index} ->
219+
IO.puts(
220+
"#{index}. #{device["chip_family_name"]} - Port: #{device["port"]} MAC: #{device["mac_address"]}"
221+
)
222+
end)
223+
224+
selected =
225+
IO.gets("\nSelect device (1-#{length(devices)}): ")
226+
|> String.trim()
227+
|> Integer.parse()
228+
229+
case selected do
230+
{num, _} when num > 0 and num <= length(devices) ->
231+
Enum.at(devices, num - 1)
232+
233+
_ ->
234+
IO.puts("Invalid selection.")
235+
exit({:shutdown, 1})
236+
end
237+
end
238+
239+
selected_device
240+
end
241+
end

lib/mix/tasks/esp32.erase_flash.ex

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
defmodule Mix.Tasks.Atomvm.Esp32.EraseFlash do
2+
@moduledoc """
3+
Mix task to get erase the flash of an connected ESP32 devices.
4+
"""
5+
use Mix.Task
6+
7+
@shortdoc "Erase flash of ESP32"
8+
9+
@impl Mix.Task
10+
def run(_args) do
11+
with :ok <- ExAtomVM.EsptoolHelper.setup(),
12+
result <- ExAtomVM.EsptoolHelper.erase_flash() do
13+
case result do
14+
true -> exit({:shutdown, 0})
15+
false -> exit({:shutdown, 1})
16+
end
17+
else
18+
{:error, :pythonx_not_available, message} ->
19+
IO.puts("\nError: #{message}")
20+
exit({:shutdown, 1})
21+
22+
{:error, reason} ->
23+
IO.puts("Error: #{reason}")
24+
exit({:shutdown, 1})
25+
end
26+
end
27+
end

lib/mix/tasks/esp32.info.ex

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
defmodule Mix.Tasks.Atomvm.Esp32.Info do
2+
@moduledoc """
3+
Mix task to get information about connected ESP32 devices.
4+
"""
5+
use Mix.Task
6+
7+
@shortdoc "Get information about connected ESP32 devices"
8+
9+
@impl Mix.Task
10+
def run(_args) do
11+
with :ok <- ExAtomVM.EsptoolHelper.setup(),
12+
devices <- ExAtomVM.EsptoolHelper.connected_devices() do
13+
case length(devices) do
14+
0 ->
15+
IO.puts(
16+
"Found no esp32 devices..\nYou may have to hold BOOT button down while plugging in the device"
17+
)
18+
19+
count ->
20+
IO.puts("Found #{count} connected esp32:")
21+
end
22+
23+
if length(devices) > 1 do
24+
Enum.each(devices, fn device ->
25+
IO.puts(
26+
"#{format_atomvm_status(device["atomvm_installed"])}#{device["chip_family_name"]} - Port: #{device["port"]}"
27+
)
28+
end)
29+
end
30+
31+
Enum.each(devices, fn device ->
32+
IO.puts("\n━━━━━━━━━━━━━━━━━━━━━━")
33+
34+
IO.puts(
35+
"#{format_atomvm_status(device["atomvm_installed"])}#{device["chip_family_name"]} - Port: #{device["port"]}"
36+
)
37+
38+
IO.puts("USB_MODE: #{device["usb_mode"]}")
39+
IO.puts("MAC: #{device["mac_address"]}")
40+
IO.puts("AtomVM installed: #{device["atomvm_installed"]}")
41+
42+
IO.puts("\nBuild Information:")
43+
44+
Enum.each(format_build_info(device["build_info"]), fn build_info ->
45+
IO.puts(build_info)
46+
end)
47+
48+
IO.puts("\nFeatures:")
49+
50+
Enum.each(device["features"], fn feature ->
51+
IO.puts(" · #{feature}")
52+
end)
53+
end)
54+
55+
IO.puts("\n")
56+
else
57+
{:error, :pythonx_not_available, message} ->
58+
IO.puts("\nError: #{message}")
59+
exit({:shutdown, 1})
60+
61+
{:error, reason} ->
62+
IO.puts("\nError: Failed to get ESP32 device information")
63+
IO.puts("Reason: #{reason}")
64+
exit({:shutdown, 1})
65+
end
66+
end
67+
68+
defp format_build_info(build_info) when is_list(build_info) and length(build_info) == 5 do
69+
[version, target, time, date, sdk] =
70+
build_info
71+
|> Enum.map(&sanitize_string/1)
72+
73+
[
74+
" Version: #{version}",
75+
" Target: #{target}",
76+
" Built: #{time} #{date}",
77+
" SDK: #{sdk}"
78+
]
79+
end
80+
81+
defp format_build_info(build_info) when is_list(build_info) do
82+
build_info
83+
|> Enum.map(&sanitize_string/1)
84+
|> Enum.with_index(1)
85+
|> Enum.map(fn {info, index} -> " Info #{index}: #{info}" end)
86+
end
87+
88+
defp format_build_info(_) do
89+
[" Build info not available or corrupted"]
90+
end
91+
92+
defp sanitize_string(str) when is_binary(str) do
93+
str
94+
# Remove non-printable characters while preserving spaces
95+
|> String.replace(~r/[^\x20-\x7E\s]/u, "")
96+
|> case do
97+
"" -> "<unreadable>"
98+
sanitized -> sanitized
99+
end
100+
end
101+
102+
defp sanitize_string(_), do: "<invalid>"
103+
104+
defp format_atomvm_status(true), do: "✅"
105+
defp format_atomvm_status(_), do: "❌"
106+
end

0 commit comments

Comments
 (0)