-
Notifications
You must be signed in to change notification settings - Fork 592
/
Copy pathBlockingConcurrentBag.cs
122 lines (111 loc) · 3.81 KB
/
BlockingConcurrentBag.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Iot.Device.Arduino
{
/// <summary>
/// Represents a collection that removes objects based on a certain pattern
/// </summary>
internal class BlockingConcurrentBag<T>
{
private readonly object _lock = new object();
private readonly List<T?> _container = new List<T?>();
public int Count
{
get
{
lock (_lock)
{
return _container.Count;
}
}
}
public void Add(T? elem)
{
lock (_lock)
{
_container.Add(elem);
Monitor.PulseAll(_lock);
}
}
public void Clear()
{
lock (_lock)
{
_container.Clear();
Monitor.PulseAll(_lock);
}
}
/// <summary>
/// Waits until an element is in the queue that matches the given predicate.
/// Checking the predicate should be fast.
/// </summary>
/// <param name="predicate">The predicate to test</param>
/// <param name="timeout">The overall timeout</param>
/// <param name="element">Returns the element found, if any</param>
/// <returns>True if an element was found within the timeout, false otherwise</returns>
public bool TryRemoveElement(Func<T?, bool> predicate, TimeSpan timeout, out T? element)
{
bool lockTaken = false;
Stopwatch sw = Stopwatch.StartNew();
element = default;
try
{
Monitor.TryEnter(_lock, timeout, ref lockTaken);
if (lockTaken)
{
// The critical section.
while (true)
{
// Cannot use FirstOrDefault here, because we need to be able to distinguish between
// finding nothing and finding an empty (null, default) element
for (int index = 0; index < _container.Count; index++)
{
T? elem = _container[index];
if (predicate(elem))
{
_container.RemoveAt(index);
Monitor.PulseAll(_lock);
element = elem;
return true;
}
}
TimeSpan remaining = timeout - sw.Elapsed;
if (remaining < TimeSpan.Zero)
{
return false;
}
if (remaining > TimeSpan.FromMilliseconds(500))
{
remaining = TimeSpan.FromMilliseconds(500);
}
bool waitSuccess = Monitor.Wait(_lock, remaining);
if (sw.Elapsed > timeout && !waitSuccess)
{
return false;
}
}
}
else
{
return false;
}
}
finally
{
// Ensure that the lock is released.
if (lockTaken)
{
Monitor.Exit(_lock);
}
}
}
}
}