Skip to content

Commit 0747a41

Browse files
authored
Adding architecture docs. (Azure#22371)
* Add architecture diagram for prefetch. * Adding a receive flow and architecture UML. * Adding documentation to reactor dispatcher. * Add documentation. * Update diagram * Update sdk/core/azure-core-amqp/docs/architecture.md
1 parent 9144494 commit 0747a41

File tree

4 files changed

+81
-6
lines changed

4 files changed

+81
-6
lines changed
549 KB
Loading
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# Architecture Docs
2+
3+
## Qpid Proton-J Integration
4+
5+
Qpid Proton-J publishes events and messages via its event-driven process called [`Reactor`][Reactor]. `azure-core-amqp`
6+
hooks into Qpid Proton-J Reactor via `Handlers`. [`BaseHandler`][BaseHandler] contains all of the events that can be
7+
listened to. `Handlers` can be associated with classes implementing the `Extendable` interface via
8+
`BaseHandler.setHandler(Extendable, Handler)`. Proton-J `Connection`, `Session`, and `Link` (`Sender` and `Receiver`
9+
are specialised instances of `Link`) are all `Extendable`. In `azure-core-amqp`, we create and own an instance of
10+
`Reactor`. When an AMQP connection, session, or link is created from that `Reactor`, we instantiate and correlate the
11+
corresponding [`Handler`][Handlers]. This way, we have granular control over what events are listened to for each
12+
handler, rather than listening to all events available on [`BaseHandler`][BaseHandler].
13+
14+
The UML diagram below shows this relationship; the interfaces shown in green are Proton-J classes. Each Proton-J
15+
instance (e.g., `Connection`, `Session`, `Sender`, `Receiver`) is associated with one corresponding `*Handler`. Each
16+
`azure-core-amqp` `AmqpConnection` is associated with one [`Reactor`][Reactor]. When that instance closes, the AMQP
17+
connection is also closed.
18+
19+
Each [ReactorConnection][ReactorConnection] has one Proton-J [`Reactor`][Reactor] instance. Each [`Reactor`][Reactor]
20+
has one [`ReactorDispatcher`][ReactorDispatcher] and one [`ReactorExecutor`][ReactorExecutor]. When a `Reactor` is
21+
created, a [`SelectableChannel`][SelectableChannel] is associated with this `Reactor`.
22+
[`ReactorDispatcher`][ReactorDispatcher] is responsible for queueing work on this channel and the work is run in FIFO
23+
basis. [`ReactorExecutor`][ReactorExecutor] holds onto the `Reactor` instance and continuously processes to work
24+
SelectableChannel.
25+
26+
![azure-core-amqp integration with Proton-J][AzureCoreAmpqArchitecture]
27+
28+
## Prefetch and AMQP Link Credits
29+
30+
In Project Reactor, prefetch is the initial number of items to request upstream. Afterwards, 75% of the initial prefetch
31+
is used for subsequent `request(long)`.
32+
33+
In Event Hubs, prefetch is the number of AMQP link credits to put on the link when it is first created. After those
34+
initial link credits have been consumed, we have different ways of calculating how many credits are added to the link.
35+
36+
The diagram below illustrates how it happens. Things to note:
37+
38+
* Large `EventData` use multiple AMQP link credits because
39+
* There is no backpressure for [`EventHubConsumerAsyncClient.receiveFromPartition()`][EventHubConsumerAsyncClient].
40+
* [`EventHubConsumerAsyncClient.receiveFromPartition()`] returns `EventData` on `Scheduler.single("<name>")`.
41+
* Since events are not published on another Scheduler, they flow downstream using the Scheduler that
42+
[`ReceiveLinkHandler.onDelivery`][ReceiveLinkHandler] executed on.
43+
* All Proton-J events run on the single scheduler because it is not thread-safe.
44+
* [`EventProcessorClient`][EventProcessorClient] uses back-pressure due to `concatMap` and `publishOn` within its
45+
[PartitionPumpManager.startPartitionPump][PartitionPumpManager].
46+
47+
![Flow of credits when receiving deliveries][ReceiveFlowDiagram]
48+
49+
<!-- Links -->
50+
[BaseHandler]: https://github.com/apache/qpid-proton-j/blob/main/proton-j/src/main/java/org/apache/qpid/proton/engine/BaseHandler.java
51+
[EventHubConsumerAsyncClient]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java#L334
52+
[EventProcessorClient]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java
53+
[AzureCoreAmpqArchitecture]: ./architecture-uml.jpeg
54+
[Handlers]: https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler
55+
[ReceiveFlowDiagram]: ./receive-flow.jpeg
56+
[PartitionPumpManager]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java#L228
57+
[Reactor]: https://github.com/apache/qpid-proton-j/blob/main/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
58+
[SelectableChannel]: https://qpid.apache.org/releases/qpid-proton-j-0.33.5/api/org/apache/qpid/proton/reactor/Selectable.html#setChannel-java.nio.channels.SelectableChannel-
59+
[ReceiveLinkHandler]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java#L97
60+
[ReactorConnection]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java
61+
[ReactorDispatcher]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorDispatcher.java
62+
[ReactorExecutor]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java
232 KB
Loading

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorDispatcher.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,19 @@
2424
import java.util.concurrent.atomic.AtomicInteger;
2525

2626
/**
27+
* The following utility class is used to generate an event to hook into {@link Reactor}'s event delegation pattern. It
28+
* uses a {@link Pipe} as the IO on which Reactor listens to.
29+
*
30+
* <p>
2731
* {@link Reactor} is not thread-safe - all calls to {@link Proton} APIs should be on the Reactor Thread. {@link
2832
* Reactor} works out-of-box for all event driven API - ex: onReceive - which could raise upon onSocketRead. {@link
2933
* Reactor} doesn't support APIs like send() out-of-box - which could potentially run on different thread to that of the
3034
* Reactor thread.
31-
*
32-
* <p>
33-
* The following utility class is used to generate an Event to hook into {@link Reactor}'s event delegation pattern. It
34-
* uses a {@link Pipe} as the IO on which Reactor listens to.
3535
* </p>
3636
*
3737
* <p>
3838
* Cardinality: Multiple {@link ReactorDispatcher}'s could be attached to 1 {@link Reactor}. Each {@link
39-
* ReactorDispatcher} should be initialized synchronously - as it calls API in {@link Reactor} which is not
40-
* thread-safe.
39+
* ReactorDispatcher} should be initialized synchronously - as it calls API in {@link Reactor} which is not thread-safe.
4140
* </p>
4241
*/
4342
public final class ReactorDispatcher {
@@ -52,6 +51,13 @@ public final class ReactorDispatcher {
5251
private final AtomicBoolean isClosed = new AtomicBoolean();
5352
private final Sinks.One<AmqpShutdownSignal> shutdownSignal = Sinks.one();
5453

54+
/**
55+
* Creates an instance. The {@code ioSignal} is associated with {@code reactor} as a child {@link Selectable}.
56+
*
57+
* @param connectionId The connection id.
58+
* @param reactor The reactor instance.
59+
* @param ioSignal IO pipe to signal work on the {@code reactor}.
60+
*/
5561
public ReactorDispatcher(final String connectionId, final Reactor reactor, final Pipe ioSignal) {
5662
this.connectionId = connectionId;
5763
this.reactor = reactor;
@@ -60,6 +66,13 @@ public ReactorDispatcher(final String connectionId, final Reactor reactor, final
6066
this.onClose = new CloseHandler();
6167
this.workScheduler = new WorkScheduler();
6268

69+
// The Proton-J reactor goes quiescent when there is no work to do, and it only wakes up when a Selectable (by
70+
// default, the network connection) signals that data is available.
71+
//
72+
// That's a problem in the send-only scenario, which is a common scenario, or any scenario where activity is
73+
// sparse. If the reactor has gone quiescent, the SDK can put a pending send in the work queue, but it will just
74+
// sit there until a Selectable wakes the reactor up. The pipe gives the SDK code a guaranteed way to ensure the
75+
// reactor is awake.
6376
final Selectable schedulerSelectable = this.reactor.selectable();
6477

6578
schedulerSelectable.setChannel(this.ioSignal.source());

0 commit comments

Comments
 (0)