Skip to content

Commit a46057b

Browse files
committedApr 26, 2010
Adding new binding for pipeline style actions against channels
1 parent 29bb844 commit a46057b

26 files changed

+1134
-533
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Copyright 2007-2008 The Apache Software Foundation.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
4+
// this file except in compliance with the License. You may obtain a copy of the
5+
// License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software distributed
10+
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11+
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
12+
// specific language governing permissions and limitations under the License.
13+
namespace Magnum.Specs.Channels
14+
{
15+
using System;
16+
using Fibers;
17+
using Magnum.Channels;
18+
using Magnum.Extensions;
19+
using NUnit.Framework;
20+
using TestFramework;
21+
22+
[TestFixture]
23+
public class Building_an_aggregator_network
24+
{
25+
private SynchronousFiber _fiber;
26+
private TimeSpan _timeout;
27+
28+
[SetUp]
29+
public void Setup()
30+
{
31+
_fiber = new SynchronousFiber();
32+
_timeout = 100.Milliseconds();
33+
}
34+
35+
36+
[Test]
37+
public void Should_send_to_a_adapter_consumer_chain()
38+
{
39+
Future<TestMessage> future = new Future<TestMessage>();
40+
41+
var consumer = new ConsumerChannel<TestMessage>(_fiber, future.Complete);
42+
var adapter = new ChannelAdapter<TestMessage>(consumer);
43+
44+
adapter.Send(new TestMessage());
45+
46+
future.IsCompleted.ShouldBeTrue();
47+
}
48+
49+
private class TestMessage
50+
{
51+
}
52+
53+
[Test]
54+
public void Should_add_a_consumer_to_an_empty_adapter_chain()
55+
{
56+
var adapter = new ChannelAdapter<TestMessage>(new ShuntChannel<TestMessage>());
57+
58+
var future = new Future<TestMessage>();
59+
60+
using (var scope = adapter.CreateBinderScope())
61+
{
62+
var consumer = new ConsumerChannel<TestMessage>(_fiber, future.Complete);
63+
64+
scope.Add(consumer);
65+
66+
adapter.Send(new TestMessage());
67+
}
68+
69+
future.IsCompleted.ShouldBeTrue();
70+
}
71+
72+
[Test]
73+
public void Should_add_a_consumer_to_an_existing_adapter_chain()
74+
{
75+
var firstFuture = new Future<TestMessage>();
76+
var secondFuture = new Future<TestMessage>();
77+
78+
var first = new ConsumerChannel<TestMessage>(_fiber, firstFuture.Complete);
79+
var subs = new PublishSubscribeChannel<TestMessage>(_fiber, new[] {first});
80+
var adapter = new ChannelAdapter<TestMessage>(subs);
81+
82+
using (var scope = adapter.CreateBinderScope())
83+
{
84+
var second = new ConsumerChannel<TestMessage>(_fiber, secondFuture.Complete);
85+
scope.Add(second);
86+
87+
adapter.Send(new TestMessage());
88+
}
89+
90+
firstFuture.IsCompleted.ShouldBeTrue();
91+
secondFuture.IsCompleted.ShouldBeTrue();
92+
}
93+
94+
[Test]
95+
public void Should_remove_a_consumer_from_an_adapter_chain()
96+
{
97+
var adapter = new ChannelAdapter<TestMessage>(new ShuntChannel<TestMessage>());
98+
99+
var future = new Future<TestMessage>();
100+
101+
using (var scope = adapter.CreateBinderScope())
102+
{
103+
var consumer = new ConsumerChannel<TestMessage>(_fiber, future.Complete);
104+
105+
scope.Add(consumer);
106+
}
107+
108+
adapter.Send(new TestMessage());
109+
110+
future.IsCompleted.ShouldBeFalse();
111+
}
112+
}
113+
}

‎src/Magnum.Specs/Channels/ChannelVisitor_Specs.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public void Should_capture_the_async_result_channel_and_state()
6363
public void Should_capture_the_transform_channel_types()
6464
{
6565
var channel = new ConsumerChannel<string>(new SynchronousFiber(), x => { });
66-
var transform = new TranformChannel<int, string>(new SynchronousFiber(), channel, x => x.ToString());
66+
var transform = new ConvertChannel<int, string>(new SynchronousFiber(), channel, x => x.ToString());
6767

6868
new ChannelVisitor().Visit(transform);
6969
}

‎src/Magnum.Specs/Channels/ConditionalConsumer_Specs.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class Passing_a_message_to_a_conditional_consumer
2121
[Test]
2222
public void Should_return_null_if_not_interested()
2323
{
24-
ConditionalConsumer<int> subject = message =>
24+
SelectiveConsumer<int> subject = message =>
2525
{
2626
return null;
2727
};

‎src/Magnum.Specs/Magnum.Specs.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
<Compile Include="Calendar\IsHoliday_Specs.cs" />
6969
<Compile Include="Calendar\MemorialDayObservedCheck.cs" />
7070
<Compile Include="BehaviorTest.cs" />
71+
<Compile Include="Channels\Aggregator_Specs.cs" />
7172
<Compile Include="Channels\ChannelPerformance_Specs.cs" />
7273
<Compile Include="Channels\ChannelQueue_Specs.cs" />
7374
<Compile Include="Channels\ChannelVisitor_Specs.cs" />

‎src/Magnum.Web/Actors/Actor.cs

+1-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ namespace Magnum.Web.Actors
1515
using Channels;
1616

1717
public interface Actor :
18-
UntypedChannel
18+
RequestResponseChannel
1919
{
20-
void Send<T>(T message, Actor replyTo);
2120
}
2221
}
+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Copyright 2007-2008 The Apache Software Foundation.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
4+
// this file except in compliance with the License. You may obtain a copy of the
5+
// License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software distributed
10+
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11+
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
12+
// specific language governing permissions and limitations under the License.
13+
namespace Magnum.Channels
14+
{
15+
using System;
16+
using System.Collections.Generic;
17+
using Extensions;
18+
using Reflection;
19+
20+
public class AddChannelBinder<TChannel>
21+
{
22+
private readonly Channel<TChannel> _newChannel;
23+
private bool _added;
24+
25+
public AddChannelBinder(Channel<TChannel> newChannel)
26+
{
27+
_newChannel = newChannel;
28+
}
29+
30+
public void BindTo<T>(Channel<T> channel)
31+
{
32+
Channel<T> result = Visit(channel);
33+
34+
if (!_added)
35+
throw new InvalidOperationException("The binding operation failed: {0} to {1}".FormatWith(typeof (T).Name, typeof (TChannel).Name));
36+
}
37+
38+
protected virtual Channel<T> Visit<T>(Channel<T> channel)
39+
{
40+
Channel<T> result = this.FastInvoke<AddChannelBinder<TChannel>, Channel<T>>("Visitor", channel);
41+
42+
return result;
43+
}
44+
45+
protected virtual Channel<T> Visitor<T>(Channel<T> channel)
46+
{
47+
return channel;
48+
}
49+
50+
protected virtual Channel<T> Visitor<T>(ChannelAdapter<T> channel)
51+
{
52+
//Guard.IsTrue(x => x.IsAssignableFrom(typeof(TChannel)), typeof(T), "Type {0} is not assignable to {1}".FormatWith(typeof(T).Name, typeof(TChannel).Name));
53+
54+
Channel<T> originalOutput = channel.Output;
55+
56+
Channel<T> output = Visit(originalOutput);
57+
58+
if (originalOutput != output)
59+
{
60+
channel.ChangeOutputChannel(originalOutput, output);
61+
}
62+
63+
return channel;
64+
}
65+
66+
protected virtual Channel<T> Visitor<T>(ShuntChannel<T> channel)
67+
{
68+
if (typeof (T) == typeof (TChannel))
69+
{
70+
_added = true;
71+
return (Channel<T>) _newChannel;
72+
}
73+
74+
return channel;
75+
}
76+
77+
protected virtual Channel<T> Visitor<T>(PublishSubscribeChannel<T> channel)
78+
{
79+
if (typeof (T) == typeof (TChannel))
80+
{
81+
return new PublishSubscribeChannel<T>(channel.Fiber, VisitSubscribers(channel.Subscribers));
82+
}
83+
84+
return channel;
85+
}
86+
87+
private IEnumerable<Channel<T>> VisitSubscribers<T>(IEnumerable<Channel<T>> recipients)
88+
{
89+
foreach (Channel<T> recipient in recipients)
90+
{
91+
Channel<T> newRecipient = Visit(recipient);
92+
93+
if (newRecipient == (Channel<T>) _newChannel)
94+
throw new InvalidOperationException("The channel has already been added to the network");
95+
96+
if (newRecipient != null)
97+
yield return newRecipient;
98+
}
99+
100+
if (!_added)
101+
{
102+
_added = true;
103+
yield return (Channel<T>) _newChannel;
104+
}
105+
}
106+
}
107+
}

‎src/Magnum/Channels/BinderScope.cs

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Copyright 2007-2008 The Apache Software Foundation.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
4+
// this file except in compliance with the License. You may obtain a copy of the
5+
// License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software distributed
10+
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11+
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
12+
// specific language governing permissions and limitations under the License.
13+
namespace Magnum.Channels
14+
{
15+
using System;
16+
17+
public interface BinderScope :
18+
IDisposable
19+
{
20+
void Add<TChannel>(Channel<TChannel> channel);
21+
}
22+
}

‎src/Magnum/Channels/ChannelAdapter.cs

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2007-2008 The Apache Software Foundation.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
4+
// this file except in compliance with the License. You may obtain a copy of the
5+
// License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software distributed
10+
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11+
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
12+
// specific language governing permissions and limitations under the License.
13+
namespace Magnum.Channels
14+
{
15+
using System;
16+
using System.Threading;
17+
18+
public class ChannelAdapter<T> :
19+
Channel<T>
20+
{
21+
private Channel<T> _output;
22+
23+
public ChannelAdapter(Channel<T> output)
24+
{
25+
_output = output;
26+
}
27+
28+
public Channel<T> Output
29+
{
30+
get { return _output; }
31+
}
32+
33+
public void Send(T message)
34+
{
35+
Output.Send(message);
36+
}
37+
38+
public void ChangeOutputChannel(Channel<T> original, Channel<T> replacement)
39+
{
40+
Interlocked.CompareExchange(ref _output, replacement, original);
41+
if (_output != replacement)
42+
throw new InvalidOperationException("The channel has been modified since it was last requested");
43+
}
44+
}
45+
}
+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright 2007-2008 The Apache Software Foundation.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
4+
// this file except in compliance with the License. You may obtain a copy of the
5+
// License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software distributed
10+
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11+
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
12+
// specific language governing permissions and limitations under the License.
13+
namespace Magnum.Channels
14+
{
15+
using System.Collections.Generic;
16+
17+
public class ChannelBinderScope<T> :
18+
BinderScope
19+
{
20+
private readonly HashSet<Channel> _boundChannels = new HashSet<Channel>();
21+
private readonly Channel<T> _channel;
22+
23+
public ChannelBinderScope(Channel<T> channel)
24+
{
25+
_channel = channel;
26+
}
27+
28+
public void Dispose()
29+
{
30+
new RemoveChannelBinder(_boundChannels).Unbind(_channel);
31+
32+
_boundChannels.Clear();
33+
}
34+
35+
public void Add<TChannel>(Channel<TChannel> channel)
36+
{
37+
new AddChannelBinder<TChannel>(channel).BindTo(_channel);
38+
39+
_boundChannels.Add(channel);
40+
}
41+
}
42+
}

‎src/Magnum/Channels/ChannelVisitor.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ protected virtual Channel<T> Visitor<T>(PublishSubscribeChannel<T> channel)
121121
return channel;
122122
}
123123

124-
protected virtual Channel<TInput> Visitor<TInput, TOutput>(TranformChannel<TInput, TOutput> channel)
124+
protected virtual Channel<TInput> Visitor<TInput, TOutput>(ConvertChannel<TInput, TOutput> channel)
125125
{
126126
Trace.WriteLine("TransformChannel<{0}>, Output: {1}".FormatWith(typeof (TInput).Name, typeof (TOutput).Name));
127127

0 commit comments

Comments
 (0)
Please sign in to comment.