Skip to content

Commit

Permalink
Implement unsubscribe (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
floitsch authored May 23, 2022
1 parent 11f3d5f commit a9fe2f7
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 8 deletions.
19 changes: 15 additions & 4 deletions examples/subscribe_many.toit
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,23 @@ main:
mqtt.TopicFilter "c/d" --qos=1,
]

client.publish "a/b" "a/b".to_byte_array
client.publish "b/c" "b/c".to_byte_array
client.publish "c/d" "c/d".to_byte_array
client.subscribe "d/e" --qos=1

task::
5.repeat:
client.publish "a/b" "a/b $it".to_byte_array
client.publish "b/c" "b/c $it".to_byte_array
client.publish "c/d" "c/d $it".to_byte_array
client.publish "d/e" "d/e $it".to_byte_array
sleep --ms=200
sleep --ms=1000
client.close

received_count := 0
client.handle: | topic payload |
print "$topic: $payload.to_string_non_throwing"
received_count++
if received_count == 3: client.close
if received_count == 10:
client.unsubscribe "a/b"
if received_count == 15:
client.unsubscribe_all [ "b/c", "c/d" ]
16 changes: 12 additions & 4 deletions src/client.toit
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,19 @@ class Client:
else:
throw CLIENT_CLOSED_EXCEPTION

/**
Unsubscribe to a single topic $filter.
*/
/** Unsubscribes from a single topic $filter. */
unsubscribe filter/string -> none:
// Not implemented yet.
unsubscribe_all [filter]

/** Unsubscribes from the list of topic $filters. */
unsubscribe_all filters/List -> none:
if is_closed: throw CLIENT_CLOSED_EXCEPTION
packet_id := next_packet_id_++
packet := UnsubscribePacket filters --packet_id=packet_id
wait_for_ack_ packet_id: | latch/monitor.Latch |
send_ packet
ack := latch.get
if not ack: throw CLIENT_CLOSED_EXCEPTION

/**
Handle incoming messages. The $block is called with two arguments,
Expand Down
42 changes: 42 additions & 0 deletions src/packets.toit
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ abstract class Packet:
return PubAckPacket.deserialize reader
if kind == SubAckPacket.TYPE:
return SubAckPacket.deserialize reader size
if kind == UnsubAckPacket.TYPE:
return UnsubAckPacket.deserialize reader
if kind == PingRespPacket.TYPE:
return PingRespPacket.deserialize reader

Expand Down Expand Up @@ -252,6 +254,46 @@ class SubAckPacket extends Packet implements PacketIDAck:

payload -> ByteArray: return #[]

class UnsubscribePacket extends Packet:
static TYPE ::= 10

topic_filters/List/*<string>*/
packet_id/int

constructor .topic_filters --.packet_id:
super TYPE --flags=0b0010

variable_header -> ByteArray:
data := ByteArray 2
binary.BIG_ENDIAN.put_uint16 data 0 packet_id
return data

payload -> ByteArray:
buffer := bytes.Buffer
topic_filters.do: | topic_filter/string |
Packet.encode_string buffer topic_filter
return buffer.bytes

class UnsubAckPacket extends Packet implements PacketIDAck:
static TYPE ::= 11

packet_id /int

constructor .packet_id:
super TYPE

constructor.deserialize reader/reader.BufferedReader:
data := reader.read_bytes 2
packet_id = binary.BIG_ENDIAN.uint16 data 0
super TYPE

variable_header -> ByteArray:
data := ByteArray 2
binary.BIG_ENDIAN.put_uint16 data 0 packet_id
return data

payload -> ByteArray: return #[]

class PingReqPacket extends Packet:
static TYPE ::= 12

Expand Down

0 comments on commit a9fe2f7

Please sign in to comment.