Skip to content

Commit dfd58ec

Browse files
committed
Built-in esptool flash (via Pythonx)
Optional use built-in esptool install via Pythonx. Signed-off-by: Peter M <[email protected]>
1 parent cbd2fb6 commit dfd58ec

File tree

6 files changed

+535
-5
lines changed

6 files changed

+535
-5
lines changed

lib/esptool_helper.ex

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
defmodule ExAtomVM.EsptoolHelper do
2+
@moduledoc """
3+
Module for setting up and using esptool through Pythonx.
4+
"""
5+
6+
@doc """
7+
Initializes Python environment with project configuration.
8+
We use locked main branch esptool version, pending a stable 5.x release,
9+
as we need the read to memory (instead of only to file) features.
10+
"""
11+
def setup do
12+
case Code.ensure_loaded(Pythonx) do
13+
{:module, Pythonx} ->
14+
Application.ensure_all_started(:pythonx)
15+
16+
Pythonx.uv_init("""
17+
[project]
18+
name = "project"
19+
version = "0.0.0"
20+
requires-python = "==3.13.*"
21+
dependencies = [
22+
"esptool @ git+https://github.com/espressif/esptool.git@6f0d779"
23+
]
24+
""")
25+
26+
_ ->
27+
{:error, :pythonx_not_available,
28+
"The :pythonx dependency is not available. Please add it to your mix.exs dependencies.\n{:pythonx, \"~> 0.4.0\"}"}
29+
end
30+
end
31+
32+
def flash_pythonx(tool_args) do
33+
# https://github.com/espressif/esptool/blob/master/docs/en/esptool/scripting.rst
34+
35+
tool_args =
36+
if not Enum.member?(tool_args, "--port") do
37+
selected_device = select_device()
38+
39+
["--port", selected_device["port"]] ++ tool_args
40+
else
41+
tool_args
42+
end
43+
44+
{_result, globals} =
45+
try do
46+
Pythonx.eval(
47+
"""
48+
import esptool
49+
import sys
50+
51+
command = [arg.decode('utf-8') for arg in tool_args]
52+
53+
def flash_esp():
54+
esptool.main(command)
55+
56+
if __name__ == "__main__":
57+
try:
58+
result = flash_esp()
59+
result = True
60+
except SystemExit as e:
61+
exit_code = int(str(e))
62+
result = exit_code == 0
63+
except Exception as e:
64+
print(f"Warning: {e}")
65+
result = False
66+
67+
""",
68+
%{"tool_args" => tool_args}
69+
)
70+
rescue
71+
e in Pythonx.Error ->
72+
IO.inspect("Pythonx error occurred: #{inspect(e)}")
73+
exit({:shutdown, 1})
74+
end
75+
76+
Pythonx.decode(globals["result"])
77+
end
78+
79+
def erase_flash(tool_args \\ ["--chip", "auto"]) do
80+
tool_args =
81+
if not Enum.member?(tool_args, "--port") do
82+
selected_device = select_device()
83+
84+
confirmation =
85+
IO.gets(
86+
"\nAre you sure you want to erase the flash of\n#{selected_device["chip_family_name"]} - Port: #{selected_device["port"]} MAC: #{selected_device["mac_address"]} ? [N/y]: "
87+
)
88+
89+
case String.trim(confirmation) do
90+
input when input in ["Y", "y"] ->
91+
IO.puts("Erasing..")
92+
93+
_ ->
94+
IO.puts("Flash erase cancelled.")
95+
exit({:shutdown, 0})
96+
end
97+
98+
["--port", selected_device["port"]] ++ tool_args ++ ["erase-flash"]
99+
else
100+
tool_args ++ ["erase-flash"]
101+
end
102+
103+
{_result, globals} =
104+
try do
105+
Pythonx.eval(
106+
"""
107+
import esptool
108+
109+
command = [arg.decode('utf-8') for arg in tool_args]
110+
111+
def flash_esp():
112+
esptool.main(command)
113+
114+
if __name__ == "__main__":
115+
try:
116+
result = flash_esp()
117+
result = True
118+
except SystemExit as e:
119+
exit_code = int(str(e))
120+
result = exit_code == 0
121+
except Exception as e:
122+
print(f"Warning: {e}")
123+
result = False
124+
""",
125+
%{"tool_args" => tool_args}
126+
)
127+
rescue
128+
e in Pythonx.Error ->
129+
IO.inspect("Pythonx error occurred: #{inspect(e)}")
130+
exit({:shutdown, 1})
131+
end
132+
133+
Pythonx.decode(globals["result"])
134+
end
135+
136+
def connected_devices do
137+
{_result, globals} =
138+
try do
139+
Pythonx.eval(
140+
"""
141+
from esptool.cmds import (detect_chip, read_flash, attach_flash)
142+
import serial.tools.list_ports as list_ports
143+
import re
144+
145+
ports = []
146+
for port in list_ports.comports():
147+
if port.vid is None:
148+
continue
149+
ports.append(port.device)
150+
151+
result = []
152+
for port in ports:
153+
try:
154+
with detect_chip(port) as esp:
155+
description = esp.get_chip_description()
156+
features = esp.get_chip_features()
157+
mac_addr = ':'.join(['%02X' % b for b in esp.read_mac()])
158+
159+
# chips like esp32-s2 can have more specific names, so we call this chip family
160+
# https://github.com/espressif/esptool/blob/807d02b0c5eb07ba46f871a492c84395fb9f37be/esptool/targets/esp32s2.py#L167
161+
chip_family_name = esp.CHIP_NAME
162+
163+
# read 128 bytes at 0x10030
164+
attach_flash(esp)
165+
app_header = read_flash(esp, 0x10030, 128, None)
166+
app_header_strings = [s for s in re.split('\\x00', app_header.decode('utf-8', errors='replace')) if s]
167+
168+
result.append({"port": port, "chip_family_name": chip_family_name,
169+
"features": features, "build_info": app_header_strings, "mac_address": mac_addr
170+
})
171+
except Exception as e:
172+
print(f"Error: {e}")
173+
result = False
174+
""",
175+
%{}
176+
)
177+
rescue
178+
e in Pythonx.Error ->
179+
{:error, "Pythonx error occurred: #{inspect(e)}"}
180+
end
181+
182+
Pythonx.decode(globals["result"])
183+
|> Enum.map(fn device ->
184+
Map.put(device, "atomvm_installed", Enum.member?(device["build_info"], "atomvm-esp32"))
185+
end)
186+
end
187+
188+
def select_device do
189+
devices = connected_devices()
190+
191+
selected_device =
192+
case length(devices) do
193+
0 ->
194+
IO.puts("Found no esp32 devices..")
195+
exit({:shutdown, 1})
196+
197+
1 ->
198+
hd(devices)
199+
200+
_ ->
201+
IO.puts("\nMultiple ESP32 devices found:")
202+
203+
devices
204+
|> Enum.with_index(1)
205+
|> Enum.each(fn {device, index} ->
206+
IO.puts(
207+
"#{index}. #{device["chip_family_name"]} - Port: #{device["port"]} MAC: #{device["mac_address"]}"
208+
)
209+
end)
210+
211+
selected =
212+
IO.gets("\nSelect device (1-#{length(devices)}): ")
213+
|> String.trim()
214+
|> Integer.parse()
215+
216+
case selected do
217+
{num, _} when num > 0 and num <= length(devices) ->
218+
Enum.at(devices, num - 1)
219+
220+
_ ->
221+
IO.puts("Invalid selection.")
222+
exit({:shutdown, 1})
223+
end
224+
end
225+
226+
selected_device
227+
end
228+
end

lib/mix/tasks/esp32.erase_flash.ex

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
defmodule Mix.Tasks.Atomvm.Esp32.EraseFlash do
2+
@moduledoc """
3+
Mix task to get erase the flash of an connected ESP32 devices.
4+
"""
5+
use Mix.Task
6+
7+
@shortdoc "Erase flash of ESP32"
8+
9+
@impl Mix.Task
10+
def run(_args) do
11+
with :ok <- ExAtomVM.EsptoolHelper.setup(),
12+
result <- ExAtomVM.EsptoolHelper.erase_flash() do
13+
case result do
14+
true -> exit({:shutdown, 0})
15+
false -> exit({:shutdown, 1})
16+
end
17+
else
18+
{:error, :pythonx_not_available, message} ->
19+
IO.puts("\nError: #{message}")
20+
exit({:shutdown, 1})
21+
22+
{:error, reason} ->
23+
IO.puts("Error: #{reason}")
24+
exit({:shutdown, 1})
25+
end
26+
end
27+
end

lib/mix/tasks/esp32.info.ex

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
defmodule Mix.Tasks.Atomvm.Esp32.Info do
2+
@moduledoc """
3+
Mix task to get information about connected ESP32 devices.
4+
"""
5+
use Mix.Task
6+
7+
@shortdoc "Get information about connected ESP32 devices"
8+
9+
@impl Mix.Task
10+
def run(_args) do
11+
with :ok <- ExAtomVM.EsptoolHelper.setup(),
12+
devices <- ExAtomVM.EsptoolHelper.connected_devices() do
13+
case length(devices) do
14+
0 -> IO.puts("Found no esp32 devices..")
15+
devices -> IO.puts("Found #{devices} connected esp32:")
16+
end
17+
18+
if length(devices) > 1 do
19+
Enum.each(devices, fn device ->
20+
IO.puts(
21+
"#{format_atomvm_status(device["atomvm_installed"])}#{device["chip_family_name"]} - Port: #{device["port"]}"
22+
)
23+
end)
24+
end
25+
26+
Enum.each(devices, fn device ->
27+
IO.puts("\n━━━━━━━━━━━━━━━━━━━━━━")
28+
29+
IO.puts(
30+
"#{format_atomvm_status(device["atomvm_installed"])}#{device["chip_family_name"]} - Port: #{device["port"]}"
31+
)
32+
33+
IO.puts("MAC: #{device["mac_address"]}")
34+
IO.puts("AtomVM installed: #{device["atomvm_installed"]}")
35+
36+
IO.puts("\nBuild Information:")
37+
38+
Enum.each(format_build_info(device["build_info"]), fn build_info ->
39+
IO.puts(build_info)
40+
end)
41+
42+
IO.puts("\nFeatures:")
43+
44+
Enum.each(device["features"], fn feature ->
45+
IO.puts(" • #{feature}")
46+
end)
47+
end)
48+
49+
IO.puts("\n")
50+
else
51+
{:error, :pythonx_not_available, message} ->
52+
IO.puts("\nError: #{message}")
53+
exit({:shutdown, 1})
54+
55+
{:error, reason} ->
56+
IO.puts("\nError: Failed to get ESP32 device information")
57+
IO.puts("Reason: #{reason}")
58+
exit({:shutdown, 1})
59+
end
60+
end
61+
62+
defp format_build_info(build_info) when is_list(build_info) and length(build_info) == 5 do
63+
[version, target, time, date, sdk] =
64+
build_info
65+
|> Enum.map(&sanitize_string/1)
66+
67+
[
68+
" Version: #{version}",
69+
" Target: #{target}",
70+
" Built: #{time} #{date}",
71+
" SDK: #{sdk}"
72+
]
73+
end
74+
75+
defp format_build_info(build_info) when is_list(build_info) do
76+
build_info
77+
|> Enum.map(&sanitize_string/1)
78+
|> Enum.with_index(1)
79+
|> Enum.map(fn {info, index} -> " Info #{index}: #{info}" end)
80+
end
81+
82+
defp format_build_info(_) do
83+
[" Build info not available or corrupted"]
84+
end
85+
86+
defp sanitize_string(str) when is_binary(str) do
87+
str
88+
# Remove non-printable characters while preserving spaces
89+
|> String.replace(~r/[^\x20-\x7E\s]/u, "")
90+
|> case do
91+
"" -> "<unreadable>"
92+
sanitized -> sanitized
93+
end
94+
end
95+
96+
defp sanitize_string(_), do: "<invalid>"
97+
98+
defp format_atomvm_status(true), do: "✅"
99+
defp format_atomvm_status(_), do: "❌"
100+
end

0 commit comments

Comments
 (0)