|
| 1 | +# µPing (MicroPing) for MicroPython |
| 2 | +# copyright (c) 2018 Shawwwn <[email protected]> |
| 3 | +# License: MIT |
| 4 | + |
| 5 | +# Internet Checksum Algorithm |
| 6 | +# Author: Olav Morken |
| 7 | +# https://github.com/olavmrk/python-ping/blob/master/ping.py |
| 8 | +# Adjusted for ruff formatting to include in ManT1S MicroPython |
| 9 | + |
| 10 | +import utime |
| 11 | +import uselect |
| 12 | +import uctypes |
| 13 | +import usocket |
| 14 | +import ustruct |
| 15 | +import urandom |
| 16 | + |
| 17 | + |
| 18 | +# @data: bytes |
| 19 | +def checksum(data): |
| 20 | + if len(data) & 0x1: # Odd number of bytes |
| 21 | + data += b"\0" |
| 22 | + cs = 0 |
| 23 | + for pos in range(0, len(data), 2): |
| 24 | + b1 = data[pos] |
| 25 | + b2 = data[pos + 1] |
| 26 | + cs += (b1 << 8) + b2 |
| 27 | + while cs >= 0x10000: |
| 28 | + cs = (cs & 0xFFFF) + (cs >> 16) |
| 29 | + cs = ~cs & 0xFFFF |
| 30 | + return cs |
| 31 | + |
| 32 | + |
| 33 | +def ping(host, count=4, timeout=5000, interval=10, quiet=False, size=64): |
| 34 | + # prepare packet |
| 35 | + assert size >= 16, "pkt size too small" |
| 36 | + pkt = b"Q" * size |
| 37 | + pkt_desc = { |
| 38 | + "type": uctypes.UINT8 | 0, |
| 39 | + "code": uctypes.UINT8 | 1, |
| 40 | + "checksum": uctypes.UINT16 | 2, |
| 41 | + "id": uctypes.UINT16 | 4, |
| 42 | + "seq": uctypes.INT16 | 6, |
| 43 | + "timestamp": uctypes.UINT64 | 8, |
| 44 | + } # packet header descriptor |
| 45 | + h = uctypes.struct(uctypes.addressof(pkt), pkt_desc, uctypes.BIG_ENDIAN) |
| 46 | + h.type = 8 # ICMP_ECHO_REQUEST |
| 47 | + h.code = 0 |
| 48 | + h.checksum = 0 |
| 49 | + h.id = urandom.getrandbits(16) |
| 50 | + h.seq = 1 |
| 51 | + |
| 52 | + # init socket |
| 53 | + sock = usocket.socket(usocket.AF_INET, usocket.SOCK_RAW, 1) |
| 54 | + sock.setblocking(0) |
| 55 | + sock.settimeout(timeout / 1000) |
| 56 | + addr = usocket.getaddrinfo(host, 1)[0][-1][0] # ip address |
| 57 | + sock.connect((addr, 1)) |
| 58 | + not quiet and print("PING %s (%s): %u data bytes" % (host, addr, len(pkt))) |
| 59 | + |
| 60 | + seqs = list(range(1, count + 1)) # [1,2,...,count] |
| 61 | + c = 1 |
| 62 | + t = 0 |
| 63 | + n_trans = 0 |
| 64 | + n_recv = 0 |
| 65 | + finish = False |
| 66 | + while t < timeout: |
| 67 | + if t == interval and c <= count: |
| 68 | + # send packet |
| 69 | + h.checksum = 0 |
| 70 | + h.seq = c |
| 71 | + h.timestamp = utime.ticks_us() |
| 72 | + h.checksum = checksum(pkt) |
| 73 | + if sock.send(pkt) == size: |
| 74 | + n_trans += 1 |
| 75 | + t = 0 # reset timeout |
| 76 | + else: |
| 77 | + seqs.remove(c) |
| 78 | + c += 1 |
| 79 | + |
| 80 | + # recv packet |
| 81 | + while 1: |
| 82 | + socks, _, _ = uselect.select([sock], [], [], 0) |
| 83 | + if socks: |
| 84 | + resp = socks[0].recv(4096) |
| 85 | + resp_mv = memoryview(resp) |
| 86 | + h2 = uctypes.struct(uctypes.addressof(resp_mv[20:]), pkt_desc, uctypes.BIG_ENDIAN) |
| 87 | + # TODO: validate checksum (optional) |
| 88 | + seq = h2.seq |
| 89 | + if h2.type == 0 and h2.id == h.id and (seq in seqs): # 0: ICMP_ECHO_REPLY |
| 90 | + t_elasped = (utime.ticks_us() - h2.timestamp) / 1000 |
| 91 | + ttl = ustruct.unpack("!B", resp_mv[8:9])[0] # time-to-live |
| 92 | + n_recv += 1 |
| 93 | + not quiet and print( |
| 94 | + "%u bytes from %s: icmp_seq=%u, ttl=%u, time=%f ms" |
| 95 | + % (len(resp), addr, seq, ttl, t_elasped) |
| 96 | + ) |
| 97 | + seqs.remove(seq) |
| 98 | + if len(seqs) == 0: |
| 99 | + finish = True |
| 100 | + break |
| 101 | + else: |
| 102 | + break |
| 103 | + |
| 104 | + if finish: |
| 105 | + break |
| 106 | + |
| 107 | + utime.sleep_ms(1) |
| 108 | + t += 1 |
| 109 | + |
| 110 | + # close |
| 111 | + sock.close() |
| 112 | + ret = (n_trans, n_recv) |
| 113 | + not quiet and print("%u packets transmitted, %u packets received" % ret) |
| 114 | + return ret |
0 commit comments