Skip to content

Commit

Permalink
feature:
Browse files Browse the repository at this point in the history
添加第二种Pipeline的实现方式
  • Loading branch information
KawhiWei committed Jul 16, 2024
1 parent d164cb5 commit 9428178
Show file tree
Hide file tree
Showing 17 changed files with 259 additions and 44 deletions.
3 changes: 3 additions & 0 deletions src/framework/Luck.Framework/PipelineAbstract/DelegatePipe.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace Luck.Framework.PipelineAbstract;

public delegate ValueTask DelegatePipe<in TContext>(TContext context) where TContext : IContext;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Luck.Framework.PipelineAbstract;

public interface IDelegatePipe<TContext> where TContext : IContext
{
ValueTask InvokeAsync(TContext context, DelegatePipe<TContext> next);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Luck.Framework.PipelineAbstract;

public interface IDelegatePipelineBuilder<TContext> where TContext : IContext
{
// IDelegatePipelineBuilder<TContext> UseMiddleware<TMiddleware>(IDelegatePipe<TContext> delegatePipe) where TMiddleware;

IDelegatePipelineBuilder<TContext> UsePipe(IDelegatePipe<TContext> delegatePipe);
DelegatePipe<TContext> Build();
}
2 changes: 1 addition & 1 deletion src/framework/Luck.Framework/PipelineAbstract/IPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace Luck.Framework.PipelineAbstract;

public interface IPipe<TContext> where TContext : IContext
{
IPipe<TContext>? NextMiddleware { get; set; }
IPipe<TContext>? NextPipe { get; set; }

ValueTask ExecuteAsync(TContext context);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Luck.Framework.PipelineAbstract;

public interface IActuator<in TContext> where TContext : IContext
public interface IPipeActuator<in TContext> where TContext : IContext
{
ValueTask InvokeAsync(TContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ public interface IPipelineBuilder<TContext> where TContext : IContext
{
IPipelineBuilder<TContext> UseMiddleware<TMiddleware>() where TMiddleware : IPipe<TContext>;

IActuator<TContext> Build();
IPipeActuator<TContext> Build();

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@ namespace Luck.Framework.PipelineAbstract;
public interface IPipelineFactory
{
IPipelineBuilder<TContext> CreatePipelineBuilder<TContext>() where TContext : IContext;


IDelegatePipelineBuilder<TContext> CreateDelegatePipelineBuilder<TContext>() where TContext : IContext;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,9 @@ public enum Interruptible
/// 终止, 进入监控,不自动补偿.
/// </summary>
Observe = 2002,

/// <summary>
/// 跳过执行当前Pipe
/// </summary>
Skip = 3002,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
namespace Luck.Framework.PipelineAbstract;

public delegate DelegatePipe<TContext> PipelineDelegate<TContext>(DelegatePipe<TContext> next)
where TContext : IContext;
52 changes: 52 additions & 0 deletions src/framework/Luck.Pipeline/DefaultDelegatePipe.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using Luck.Framework.PipelineAbstract;

namespace Luck.Pipeline;

public abstract class DefaultDelegatePipe<TContext> : IDelegatePipe<TContext> where TContext : IContext
{
public async ValueTask InvokeAsync(TContext context, DelegatePipe<TContext> next)
{
try
{
var runThis = await BeforeInvokeAsync(context);
if (context.IsInterruptible)
{
await OnCancelled(context);
return;
}

if (runThis)
{
await Invoke(context);
}

if (context.IsInterruptible)
{
await OnCancelled(context);
return;
}

await next(context);
}
catch (Exception ex)
{
await OnCancelled(context, ex);
}
}

protected abstract ValueTask Invoke(TContext context);

protected virtual ValueTask<bool> BeforeInvokeAsync(TContext context) => ValueTask.FromResult(true);

protected virtual ValueTask AfterInvokeAsync(TContext context) => ValueTask.CompletedTask;

protected virtual ValueTask OnCancelled(TContext context, Exception ex = null!)
{
if (ex != null)
{
throw new Exception(this.GetType().Name + "->" + ex.Message, ex);
}

return ValueTask.CompletedTask;
}
}
58 changes: 55 additions & 3 deletions src/framework/Luck.Pipeline/DefaultPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace Luck.Pipeline;

public abstract class DefaultPipe<TContext> : IPipe<TContext> where TContext : IContext
{
public IPipe<TContext>? NextMiddleware { get; set; } = null;
public IPipe<TContext>? NextPipe { get; set; } = null;

public async ValueTask ExecuteAsync(TContext context)
{
Expand All @@ -28,9 +28,9 @@ public async ValueTask ExecuteAsync(TContext context)
return;
}

if (NextMiddleware is not null)
if (NextPipe is not null)
{
await NextMiddleware.ExecuteAsync(context);
await NextPipe.ExecuteAsync(context);
}


Expand Down Expand Up @@ -71,4 +71,56 @@ protected virtual ValueTask OnCancelled(TContext context, Exception ex = null!)

return ValueTask.CompletedTask;
}

public async ValueTask ExecuteAsync(TContext context, IPipe<TContext> next)
{
try
{
var runThis = await BeforeInvokeAsync(context);
if (context.IsInterruptible)
{
await OnCancelled(context);
return;
}

if (runThis)
{
await Invoke(context);
}

if (context.IsInterruptible)
{
await OnCancelled(context);
return;
}

if (NextPipe is not null)
{
await NextPipe.ExecuteAsync(context);
}


if (context.IsInterruptible)
{
await OnCancelled(context);
return;
}

if (runThis)
{
await AfterInvokeAsync(context);
}

if (context.IsInterruptible)
{
await OnCancelled(context);
}
}
catch (Exception ex)
{
await OnCancelled(context, ex);
}

throw new NotImplementedException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

namespace Luck.Pipeline;

public class DefaultActuator<TContext> : List<IPipe<TContext>>,
IActuator<TContext> where TContext : IContext
public class DefaultPipeActuator<TContext> : List<IPipe<TContext>>,
IPipeActuator<TContext> where TContext : IContext
{
public async ValueTask InvokeAsync(TContext context)
{
Expand Down
36 changes: 36 additions & 0 deletions src/framework/Luck.Pipeline/DelegatePipelineBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using Luck.Framework.PipelineAbstract;
using Microsoft.Extensions.DependencyInjection;

namespace Luck.Pipeline;

public class DelegatePipelineBuilder<TContext> : IDelegatePipelineBuilder<TContext> where TContext : IContext
{
private readonly IServiceProvider _serviceProvider;
private IList<PipelineDelegate<TContext>> _pipes = new List<PipelineDelegate<TContext>>();

public DelegatePipelineBuilder(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}

public IDelegatePipelineBuilder<TContext> UsePipe(IDelegatePipe<TContext> delegatePipe)
{
DelegatePipe<TContext> PipelineDelegate(DelegatePipe<TContext> next) =>
context => delegatePipe.InvokeAsync(context, next);

_pipes.Add(PipelineDelegate);
return this;
}

public DelegatePipe<TContext> Build()
{
DelegatePipe<TContext> next = _ => ValueTask.CompletedTask;

for (var i = _pipes.Count - 1; i >= 0; i--)
{
next = _pipes[i].Invoke(next);
}

return next;
}
}
17 changes: 8 additions & 9 deletions src/framework/Luck.Pipeline/PipelineBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,25 @@ public IPipelineBuilder<TContext> UseMiddleware<TMiddleware>() where TMiddleware
return this;
}

public IActuator<TContext> Build()
public IPipeActuator<TContext> Build()
{
var line = new DefaultActuator<TContext>();
var defaultActuator = new DefaultPipeActuator<TContext>();
foreach (var type in _types)
{
var middleware = _serviceProvider.GetService(type) as IPipe<TContext>;
if (middleware == null)
if (_serviceProvider.GetService(type) is not IPipe<TContext> middleware)
{
throw new NotImplementedException("中间件类" + type.Name + "未实现,请检查.");
throw new NotImplementedException("中间件类未注入到DI容器" + type.Name + "请检查.");
}

var lastMiddleware = line.LastOrDefault();
var lastMiddleware = defaultActuator.LastOrDefault();
if (lastMiddleware is not null)
{
lastMiddleware.NextMiddleware = middleware;
lastMiddleware.NextPipe = middleware;
}

line.Add(middleware);
defaultActuator.Add(middleware);
}

return line;
return defaultActuator;
}
}
4 changes: 2 additions & 2 deletions src/framework/Luck.Pipeline/PipelineFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ public IPipelineBuilder<TContext> CreatePipelineBuilder<TContext>() where TConte
return pipelineBuilder;
}

public IPipelineDelegateBuilder<TContext> CreatePipelineDelegateBuilder<TContext>() where TContext : IContext
public IDelegatePipelineBuilder<TContext> CreateDelegatePipelineBuilder<TContext>() where TContext : IContext
{
var pipelineBuilder = new PipelineDelegateBuilder<TContext>(_serviceProvider);
var pipelineBuilder = new DelegatePipelineBuilder<TContext>(_serviceProvider);
return pipelineBuilder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System;
using System.Threading.Tasks;
using Luck.Framework.PipelineAbstract;
using Luck.Pipeline;
using Luck.UnitTest.Pipeline_Tests.Context;

namespace Luck.UnitTest.Pipeline_Tests;

public class CreateCustomerDelegatePipe : DefaultDelegatePipe<CustomerContext>
{
protected override ValueTask Invoke(CustomerContext context)
{
if (context.Properties.TryGetValue("测试数据", out var value))
{
Console.WriteLine($"测试输出信息:【{value}】");
}

;
return new ValueTask();
}
}

public class FetchOrderDetailDelegatePipePipe : DefaultDelegatePipe<CustomerContext>
{
protected override ValueTask Invoke(CustomerContext context)
{
Console.WriteLine($"添加测试数据");
context.Properties.Add("测试数据", "测试订单列表");
return new ValueTask();
}
}

public class CancelDelegatePipe : DefaultDelegatePipe<CustomerContext>
{
protected override ValueTask Invoke(CustomerContext context)
{
context.SetInterruptible(Interruptible.Cancel);
return new ValueTask();
}
}
Loading

0 comments on commit 9428178

Please sign in to comment.