Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
daviesian committed Jul 5, 2022
0 parents commit 21a5cbe
Show file tree
Hide file tree
Showing 9 changed files with 633 additions and 0 deletions.
21 changes: 21 additions & 0 deletions LICENCE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2022 Ian Davies

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Anvil Pico libraries for Micropython

This repository contains the Anvil Pico modules that are added to MicroPython in the official Anvil firmware for the Pico W.

See https://anvil.works/pico for more information
41 changes: 41 additions & 0 deletions flash_filesystem/boot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#############################################
# Provide your Wifi connection details here #
#############################################

WIFI_SSID = "My Wireless Network Name"
WIFI_PASSWORD = "password_goes_here"

#############################################

import network
from time import sleep
from machine import Pin
import ntptime

sleep(1) # Without this, the USB handshake seems to break this script and then fail sometimes.

led = Pin("LED", Pin.OUT, value=1)

wlan = None
while not wlan or wlan.status() != 3:
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASSWORD)

# Blink LED slowly until the Wifi is connected.

while True:
print(wlan)
led.toggle()
sleep(0.2)
if wlan.status() in [-1, -2, 3]:
break

# Set the RTC to the current time

ntptime.settime()

# Solid LED means we're connected and ready to go
led.on()
print(wlan)

39 changes: 39 additions & 0 deletions flash_filesystem/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import anvil.pico
import uasyncio as a
from machine import Pin

# This is an example Anvil Uplink script for the Pico W.
# See https://anvil.works/pico for more information

UPLINK_KEY = "<uplink_key_goes_here>"

# We use the LED to indicate server calls and responses.
led = Pin("LED", Pin.OUT, value=1)


# Call this function from your Anvil app:
#
# anvil.server.call('pico_fn', 42)
#

@anvil.pico.callable(is_async=True)
async def pico_fn(n):
# Output will go to the Pico W serial port
print(f"Called local function with argument: {n}")

# Blink the LED and then double the argument and return it.
for i in range(10):
led.toggle()
await a.sleep_ms(50)
return n * 2

# Connect the Anvil Uplink. In MicroPython, this call will block forever.

anvil.pico.connect(UPLINK_KEY)


# There's lots more you can do with Anvil on your Pico W.
#
# See https://anvil.works/pico for more information


Empty file added modules/anvil/__init__.py
Empty file.
212 changes: 212 additions & 0 deletions modules/anvil/pico.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
from async_websocket_client import AsyncWebsocketClient
from ca_certs import LETSENCRYPT_ROOT
import uasyncio as a
import time
import json
import random
import sys
from machine import Pin

__all__ = ['connect', 'connect_async', 'call', 'callable', 'callable_async', 'get_user_email']
__version__ = "0.1.0"
__author__ = "Ian Davies"

# Update this with each release.
NOT_BEFORE=1657018369

ws = AsyncWebsocketClient()

async def _s(v):
#print("SEND:", v)
if isinstance(v, str):
await ws.send(v)
else:
await ws.send(json.dumps(v))

async def _r():
data = await ws.recv()
if data and isinstance(data, str):
#print("RECV:", data)
return json.loads(data)

async def _register_callables():
for fn_name, fn in fns.items():
await _s({
"type": "REGISTER",
"name": fn_name,
})

fns = {}
outstanding_calls = {}
call_stack_ids = {} # task -> call stack id

async def _incoming_call(data):
try:
f = fns[data.get('command')]
if not f:
raise Exception(f"No such function: {data.get('command')}")

call_stack_ids[id(a.current_task())] = data.get("call-stack-id")
#print("New task ID:", id(a.current_task()))

if f['require_user']:
email = await get_user_email()
#print("Got email", email)
if not email:
raise Exception("Unauthorised. You must be logged in to call this function.")
elif isinstance(f['require_user'], str) and email != f['require_user']:
raise Exception("Unauthorised. You are not authorised to call this function.")

res = f['fn'](*data.get('args'), **data.get('kwargs'))
result = await res if f['is_async'] else res

del call_stack_ids[id(a.current_task())]

await _s({
"id": data['id'],
"response": result,
})
except Exception as e:
sys.print_exception(e)
await _s({
"id": data['id'],
"error": {'message' : str(e)},
})

async def _anvil_listen():
while ws.open():
data = await _r()
if data:
if data.get("objects"):
await _s({
"id": data['id'],
"error": {'message' : f"Cannot send objects of type {[o['type'][0] for o in data['objects']]} to anvil.pico"},
})
elif "response" in data or "error" in data:
if outstanding_calls.get(data['id']):
outstanding_calls[data['id']] = data
else:
print(f"Received bogus response: {data}")
elif "output" in data and data['output'].strip():
print(f"Server: {data['output'].strip()}")
elif data.get('type') == "CALL":
#print("Call", data)
a.create_task(_incoming_call(data))

else:
a.sleep_ms(50)

RESPONSE_SENTINEL = object()

class FatalException(Exception):
pass

async def _connect(key, url):
if time.time() < NOT_BEFORE:
raise FatalException("System time invalid. Not connecting to Anvil.")

print("Connecting to Anvil...")
await ws.handshake(url, ca_certs=LETSENCRYPT_ROOT)
print("Connected")
await ws.open()
await _s({"key": key, "v": 7, "device": "PICO_W"})
result = await _r()
if not result.get("auth") == "OK":
raise Exception("Connection failed: " + str(result))
print(f"Authenticated to app {result['app-info']['id']}")

async def _launch_task(task, name):
try:
await task
except Exception as e:
print(f"Exception running uplink task: {name}")
sys.print_exception(e)

async def _blink_led(led, interval, n=None):
i = 0
while True:
if n is not None:
i += 1
if i > n:
break
led.toggle()
await a.sleep_ms(interval)
led.on()

async def _connect_async(key, on_first_connect, on_every_connect, url, no_led):
if not no_led:
led = Pin("LED", Pin.OUT, value=1)
while True:
try:
blink_task = None
if not no_led:
blink_task = a.create_task(_blink_led(led, 100))
await _connect(key, url)
await _register_callables()
if blink_task:
blink_task.cancel()
a.create_task(_blink_led(led, 50,10))
if on_first_connect:
a.create_task(_launch_task(on_first_connect, "on_first_connect"))
on_first_connect = None
if on_every_connect:
a.create_task(_launch_task(on_every_connect, "on_every_connect"))
await _anvil_listen()
except FatalException as e:
print("Fatal exception in uplink reconnection loop:")
sys.print_exception(e)
raise
except Exception as e:
print("Exception in uplink reconnection loop:")
sys.print_exception(e)
await a.sleep(1)

async def get_user_email(allow_remembered=True):
return await call("anvil.private.users.get_current_user_email", allow_remembered=allow_remembered)

def callable(name_or_fn=None, is_async=False, require_user=None):
name = None
def g(fn):
fns[name or fn.__name__] = {"fn": fn, "is_async": is_async, "require_user": require_user}
return fn

if not name_or_fn or isinstance(name_or_fn, str):
name = name_or_fn
return g
else:
return g(name_or_fn)

def callable_async(*args, **kwargs):
kwargs['is_async'] = True
return callable(*args, **kwargs)

async def call(fn_name, *args, **kwargs):
#print("Call in task", id(a.current_task()))
req_id = f"pico-call-{time.time()}-{random.randint(1,99999)}"
outstanding_calls[req_id] = RESPONSE_SENTINEL
req = {
"type": "CALL",
"command": fn_name,
"id": req_id,
"args": args,
"kwargs": kwargs,
}
call_stack_id = call_stack_ids.get(id(a.current_task()))
if call_stack_id:
req['call-stack-id'] = call_stack_id
await _s(req)
while outstanding_calls[req_id] is RESPONSE_SENTINEL:
await a.sleep_ms(5)
if 'response' in outstanding_calls[req_id]:
return outstanding_calls[req_id]['response']
else:
raise Exception(outstanding_calls[req_id]['error']['message'])


def connect_async(key, on_first_connect=None, on_every_connect=None, url="wss://anvil.works/uplink", no_led=False):
return _connect_async(key, on_first_connect, on_every_connect, url, no_led)

def connect(key, on_first_connect=None, on_every_connect=None, url="wss://anvil.works/uplink", no_led=False):
a.run(_connect_async(key, on_first_connect, on_every_connect, url, no_led))


Loading

0 comments on commit 21a5cbe

Please sign in to comment.