i want more API by Ix with TImeSpan #2287
Replies: 5 comments 5 replies
-
|
It's relatively easy to build these today by combining Rx and Ix. But when I did that as a quick experiment, it raised an important question: how would we make such things testable? Rx.NET's One of the big reasons for this is that it's hard to test if you've got no way of virtualizing time. Unlike Rx, Ix doesn't have a pervasive mechanism for managing time. It hasn't needed one because it has no timed operations. (Or perhaps it's the other way around: no timed operations have ever been added to Ix because there is no pervasive mechanism for managing timing.) I don't think we'd want to be introducing untestable operators, so a request to add timed operations to Ix is implicitly also a request to add some Ix equivalent of I just looked at In the mean time, if you want a quick fix using Rx and Ix to achieve this in combination, I've got this in my project file: <ItemGroup>
<PackageReference Include="System.Interactive.Async" Version="7.0.0-preview.13" />
<PackageReference Include="System.Reactive" Version="7.0.0-preview.1" />
</ItemGroup>(I'm using preview builds here because I'm testing Ix.NET v7 and Rx.NET v7 right now; this would work equally well with the latest current releases too.) And then I can write this: public static class Ex
{
public static IAsyncEnumerable<T> Take<T>(this IAsyncEnumerable<T> source, TimeSpan timeSpan)
{
return source.ToObservable().Take(timeSpan).ToAsyncEnumerable();
}
public static IAsyncEnumerable<IList<T>> Buffer<T>(this IAsyncEnumerable<T> source, TimeSpan timeSpan)
{
return source.ToObservable().Buffer(timeSpan).ToAsyncEnumerable();
}
}(I've not done I don't know what you mean by this:
|
Beta Was this translation helpful? Give feedback.
-
Schedulers are used for two things: first to set where the result is received/observerd, i.e., on which thread or context. second to introduce timing in a testable/fakeable way. With async/await, the first is configured by the caller and not by the callee with |
Beta Was this translation helpful? Give feedback.
-
|
Hello. I wrote a console method for testing and it runs well. #if NET9_0_OR_GREATER
[Fact]
public async Task Chunk1Async()
{
FakeTimeProvider fakeTime = new FakeTimeProvider();
var ts = Enumerable.Range(0, 11).Select(i => TimeSpan.FromSeconds(i)).ToArray();
var e = CreateTimedSequence(fakeTime, ts).Chunk(TimeSpan.FromSeconds(2.66), 999, fakeTime).GetAsyncEnumerator();
var move = e.MoveNextAsync();
Assert.False(move.IsCompleted);
fakeTime.Advance(TimeSpan.FromSeconds(3));
Assert.True(move.IsCompleted); // false
Assert.True(await move);
Assert.Equal(new[] { 0, 1, 2 }, e.Current);
}
#endifFakeTimeProvider fakeTime = new FakeTimeProvider();
var ts = Enumerable.Range(0, 11).Select(i => TimeSpan.FromSeconds(i)).ToArray();
var e = fakeTime.CreateTimedSequence(ts).Chunk(TimeSpan.FromSeconds(2.66), 999, fakeTime).GetAsyncEnumerator();
var move = e.MoveNextAsync();
CW(move);
fakeTime.Advance(TimeSpan.FromSeconds(3));
CW(move);
Console.WriteLine("e.Current:" + string.Join(", ", e.Current));
void CW(ValueTask<bool> task)
{
var s = "task.IsCompletedSuccessfully " + task.IsCompletedSuccessfully;
if (task.IsCompletedSuccessfully)
{
s += ",Result:" + task.Result;
}
Console.WriteLine(s);
}
public static partial class AsyncEnumerableEx2
{
public static async IAsyncEnumerable<int> CreateTimedSequence(this TimeProvider timeProvider, TimeSpan[] intervals)
{
int i = 0;
Queue<Task> queue = new();
foreach (var item in intervals)
{
queue.Enqueue(timeProvider.Delay(item));
}
foreach (var interval in queue)
{
await interval;
yield return i++;
}
}
}task.IsCompletedSuccessfully False
task.IsCompletedSuccessfully True,Result:True
e.Current:0, 1, 2 |
Beta Was this translation helpful? Give feedback.
-
xunit has, in the past, often caused us issues due to threading. I can no longer remember the precise details. (We stopped using it in Rx.NET because their support for UWP—which we need in Rx.NET—stopped working, so I don't use xunit regularly any more.) But I know at one point it was doing things with But this actually provides a hint as to why this whole feature area might be harder than it seems: there's nothing obviously wrong with the test you've written and yet it didn't work in xunit. Why would that be? I'm actually not sure (partly because I don't think you've told me what the failure was) but based on problems I've had in the past with xunit and Even if that's not the issue in this particular case, it has often been an issue for us with xunit in the past. And this brings us to one of the reasons I'm nervous about adding this whole feature: often it really matters how your timers actually invoke callbacks. The context from which those calls occur can matter. And that is exactly one of the concerns that schedulers in Rx deal with. So it's possible that this failure is evidence that we might end up wanting a similar abstraction, and not just (Possibly not though—without knowing how the test failed I can't really tell.) |
Beta Was this translation helpful? Give feedback.
-
|
I won't follow up until the new testing plan appears. Chunknamespace System.Linq
{
public static partial class AsyncEnumerableEx
{
public static IAsyncEnumerable<TSource[]> Chunk<TSource>(this IAsyncEnumerable<TSource> source, TimeSpan duration)
{
return Core(source, duration);
static async IAsyncEnumerable<TSource[]> Core(IAsyncEnumerable<TSource> source, TimeSpan duration, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await using var enumerator = source.GetAsyncEnumerator(cancellationToken);
var moveNext = enumerator.MoveNextAsync();
if (moveNext.IsCompletedSuccessfully && !moveNext.Result)
{
yield break;
}
var timeoutTask = Task.Delay(duration, cancellationToken);
using ChunkBuffer<TSource> buffer = new ChunkBuffer<TSource>(-1);
while (true)
{
var next = false;
if (moveNext.IsCompleted)
{
next = moveNext.Result;
}
else
{
var nextTask = moveNext.AsTask();
var winner = await Task.WhenAny(timeoutTask, nextTask).ConfigureAwait(false);
if (winner == nextTask)
{
next = await nextTask;
}
else
{
timeoutTask = Task.Delay(duration, cancellationToken);
yield return buffer.CompleteClear();
continue;
}
}
if (next)
{
buffer.Add(enumerator.Current);
moveNext = enumerator.MoveNextAsync();
}
else
{
break;
}
}
yield return buffer.CompleteClear();
}
}
public static IAsyncEnumerable<TSource[]> Chunk<TSource>(this IAsyncEnumerable<TSource> source, TimeSpan duration, int size)
{
return Core(source, duration, (uint)size);
static async IAsyncEnumerable<TSource[]> Core(IAsyncEnumerable<TSource> source, TimeSpan duration, uint size, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await using var enumerator = source.GetAsyncEnumerator(cancellationToken);
var moveNext = enumerator.MoveNextAsync();
if (moveNext.IsCompletedSuccessfully && !moveNext.Result)
{
yield break;
}
var timeoutTask = Task.Delay(duration, cancellationToken);
using ChunkBuffer<TSource> buffer = new ChunkBuffer<TSource>(-1);
while (true)
{
var next = false;
if (moveNext.IsCompleted)
{
next = moveNext.Result;
}
else
{
var nextTask = moveNext.AsTask();
var winner = await Task.WhenAny(timeoutTask, nextTask).ConfigureAwait(false);
if (winner == nextTask)
{
next = await nextTask;
}
else
{
timeoutTask = Task.Delay(duration, cancellationToken);
yield return buffer.CompleteClear();
continue;
}
}
if (next)
{
buffer.Add(enumerator.Current);
moveNext = enumerator.MoveNextAsync();
if (buffer.Size >= size)
{
timeoutTask = Task.Delay(duration, cancellationToken);
yield return buffer.CompleteClear();
}
}
else
{
break;
}
}
yield return buffer.CompleteClear();
}
}
#if NET9_0_OR_GREATER
public static IAsyncEnumerable<TSource[]> Chunk<TSource>(this IAsyncEnumerable<TSource> source, TimeSpan duration, int size, TimeProvider timeProvider)
{
return Core(source, duration, (uint)size, timeProvider);
static async IAsyncEnumerable<TSource[]> Core(IAsyncEnumerable<TSource> source, TimeSpan duration, uint size, TimeProvider timeProvider, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await using var enumerator = source.GetAsyncEnumerator(cancellationToken);
var moveNext = enumerator.MoveNextAsync();
if (moveNext.IsCompletedSuccessfully && !moveNext.Result)
{
yield break;
}
var timeoutTask = Task.Delay(duration, timeProvider, cancellationToken);
using ChunkBuffer<TSource> buffer = new ChunkBuffer<TSource>(-1);
while (true)
{
var next = false;
if (moveNext.IsCompleted)
{
next = moveNext.Result;
}
else
{
var nextTask = moveNext.AsTask();
var winner = await Task.WhenAny(timeoutTask, nextTask).ConfigureAwait(false);
if (winner == nextTask)
{
next = await nextTask;
}
else
{
timeoutTask = Task.Delay(duration, timeProvider, cancellationToken);
yield return buffer.CompleteClear();
continue;
}
}
if (next)
{
buffer.Add(enumerator.Current);
moveNext = enumerator.MoveNextAsync();
if (buffer.Size >= size)
{
timeoutTask = Task.Delay(duration, timeProvider, cancellationToken);
yield return buffer.CompleteClear();
}
}
else
{
break;
}
}
yield return buffer.CompleteClear();
}
}
#endif
}
internal struct ChunkBuffer<T> : IDisposable
{
#pragma warning disable IDE0044
private T _item0, _item1, _item2, _item3, _item4, _item5, _item6, _item7;
private T[]? _arr0, _arr1, _arr2, _arr3, _arr4, _arr5, _arr6, _arr7;
private readonly uint _max;
private int _index;
private uint _size;
private T[]? _current;
#pragma warning restore IDE0044
public readonly uint Size => _size;
#pragma warning disable CS8618
public ChunkBuffer(int max)
{
_max = (uint)max;
}
#pragma warning restore CS8618
private static T[] GetArray(int length)
{
#if NETCOREAPP1_0_OR_GREATER || NETSTANDARD2_1_OR_GREATER
return ArrayPool<T>.Shared.Rent(length);
#else
return new T[length];
#endif
}
private void CopyToArray(T[] arr)
{
#if NETCOREAPP1_0_OR_GREATER || NETSTANDARD2_1_OR_GREATER
MemoryMarshal.CreateReadOnlySpan(ref _item0, (int)_size).CopyTo(arr);
#else
Unsafe.CopyBlock(ref Unsafe.As<T, byte>(ref arr[0]), ref Unsafe.As<T, byte>(ref _item0), (uint)Unsafe.SizeOf<T>() * _size);
#endif
}
public void Add(T item)
{
if (_size >= _max)
{
throw new InvalidOperationException($"The collection is full. Current size: {_size}, Maximum capacity: {_max}");
}
if (_size < 8)
{
ref var next = ref Unsafe.Add(ref _item0, _size)!;
next = item;
_size++;
if (_size == 8)
{
_arr0 = GetArray((int)Math.Min(_size * 2, _max));
_current = _arr0;
CopyToArray(_arr0);
}
return;
}
if (_size >= (uint)_current!.Length)
{
_index++;
if (_index < 8)
{
ref T[] next = ref Unsafe.Add(ref _arr0, _index)!;
if (next == null)
{
next = GetArray((int)Math.Min(_size * 2, _max));
}
_current.CopyTo(next, 0);
_current = next;
}
else
{
_current = new T[_max];
}
}
_current[_size++] = item;
}
public T[] CompleteClear()
{
if (_size == 0)
{
return Array.Empty<T>();
}
T[] local;
if (_size <= 8)
{
local = new T[_size];
CopyToArray(local);
}
else
{
local = _current!;
if (_size == _current!.Length)
{
ref var current = ref Unsafe.Add(ref _arr0, _index);
current = null;
}
else
{
Array.Resize(ref local, (int)_size);
}
}
_size = 0;
_index = 0;
_current = null!;
return local;
}
public void Dispose()
{
for (int i = 0; i < 8; i++)
{
ref T[]? target = ref Unsafe.Add(ref _arr0, i);
#if NETSTANDARD2_1_OR_GREATER || NETCOREAPP1_0_OR_GREATER
if (target != null)
{
ArrayPool<T>.Shared.Return(target);
}
#endif
target = null;
}
_current = null;
_size = 0;
_index = 0;
}
}
}Detailsnamespace System.Linq
{
public static partial class AsyncEnumerableEx
{
public static IAsyncEnumerable<TSource> Skip<TSource>(this IAsyncEnumerable<TSource> source, TimeSpan duration)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (duration < TimeSpan.Zero)
throw Error.ArgumentOutOfRange(nameof(duration));
if (duration == TimeSpan.Zero)
{
return Core(source);
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await using var enumerator = source.GetAsyncEnumerator(cancellationToken);
var moveNextTask = enumerator.MoveNextAsync();
while (moveNextTask.IsCompleted)
{
if (!moveNextTask.Result)
{
yield break;
}
moveNextTask = enumerator.MoveNextAsync();
}
while (await moveNextTask.ConfigureAwait(false))
{
yield return enumerator.Current;
}
}
}
else
{
return Core(source, duration);
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, TimeSpan duration, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var timeoutTask = Task.Delay(duration, cancellationToken);
await using var enumerator = source.GetAsyncEnumerator(cancellationToken);
var moveNext = enumerator.MoveNextAsync();
while (true)
{
var next = false;
if (moveNext.IsCompleted)
{
next = moveNext.Result;
}
else
{
var nextTask = moveNext.AsTask();
var winner = await Task.WhenAny(timeoutTask, nextTask).ConfigureAwait(false);
if (winner == nextTask)
{
next = await nextTask;
}
else
{
break;
}
}
if (!next)
{
yield break;
}
}
while (await enumerator.MoveNextAsync().ConfigureAwait(false))
{
yield return enumerator.Current;
}
}
}
}
}
}Takenamespace System.Linq
{
public static partial class AsyncEnumerableEx
{
public static IAsyncEnumerable<TSource> Take<TSource>(this IAsyncEnumerable<TSource> source, TimeSpan duration)
{
if (source == null)
throw Error.ArgumentNull(nameof(source));
if (duration < TimeSpan.Zero)
throw Error.ArgumentOutOfRange(nameof(duration));
if (duration == TimeSpan.Zero)
{
return Core(source);
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await using var enumerator = source.GetAsyncEnumerator(cancellationToken);
var moveNextTask = enumerator.MoveNextAsync();
while (moveNextTask.IsCompleted && moveNextTask.Result)
{
yield return enumerator.Current;
moveNextTask = enumerator.MoveNextAsync();
}
}
}
else
{
return Core(source, duration);
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, TimeSpan duration, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var timeoutTask = Task.Delay(duration, cancellationToken);
await using var enumerator = source.GetAsyncEnumerator(cancellationToken);
var moveNext = enumerator.MoveNextAsync();
while (true)
{
var next = false;
if (moveNext.IsCompleted)
{
next = moveNext.Result;
}
else
{
var nextTask = moveNext.AsTask();
var winner = await Task.WhenAny(timeoutTask, nextTask).ConfigureAwait(false);
if (winner == nextTask)
{
next = await nextTask;
}
// else , next always is false , go to break
}
if (next)
{
yield return enumerator.Current;
}
else
{
yield break;
}
}
}
}
}
}
} |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
i want add
These APIs check
ValueTask.IsCompleted.If it remains consistently true, the items are considered simultaneous.
For example, if data appears at 2.99s, 2.99s, 2.99s, 2.99s, and 3.01s, the data at
3.01swill also be considered as part of the 2.99s group.The Buffermethod will start timing upon encountering the first item when no timer is active.
The Chunkmethod will perform periodic truncation starting from the call initiation. If no data is present, it returns an
arraywithLength = 0.Beta Was this translation helpful? Give feedback.
All reactions