Skip to content

Commit

Permalink
Rewritten BulkCopy functionality (#419)
Browse files Browse the repository at this point in the history
  • Loading branch information
DarkWanderer authored Dec 23, 2023
1 parent b5d7f03 commit 458e579
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 109 deletions.
2 changes: 1 addition & 1 deletion ClickHouse.Client.Benchmark/Properties/launchSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"profiles": {
"ClickHouse.Client.Benchmark": {
"commandName": "Project",
"commandLineArgs": "--filter \"*SelectUint64*\" --inProcess --job Default --join"
"commandLineArgs": "--filter \"*\" --inProcess --job Default --join"
}
}
}
7 changes: 7 additions & 0 deletions ClickHouse.Client.Tests/BulkCopyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public async Task ShouldExecuteSingleValueInsertViaBulkCopy(string clickHouseTyp
BatchSize = 100
};

await bulkCopy.InitAsync();
await bulkCopy.WriteToServerAsync(Enumerable.Repeat(new[] { insertedValue }, 1));

Assert.AreEqual(1, bulkCopy.RowsWritten);
Expand Down Expand Up @@ -179,6 +180,7 @@ public async Task ShouldExecuteBulkInsertWithComplexColumnName(string columnName
BatchSize = 100
};

await bulkCopy.InitAsync();
await bulkCopy.WriteToServerAsync(Enumerable.Repeat(new[] { (object)1 }, 1), CancellationToken.None);

Assert.AreEqual(1, bulkCopy.RowsWritten);
Expand All @@ -204,6 +206,7 @@ public async Task ShouldInsertIntoTableWithLotsOfColumns()
var bulkCopy = new ClickHouseBulkCopy(connection) { DestinationTableName = tableName };

var rowToInsert = new[] { Enumerable.Range(1, columnCount).Select(x => (object)x).ToArray() };
await bulkCopy.InitAsync();
await bulkCopy.WriteToServerAsync(rowToInsert);
}

Expand All @@ -218,6 +221,7 @@ public async Task ShouldThrowSpecialExceptionOnSerializationFailure()
var rows = Enumerable.Range(250, 10).Select(n => new object[] { n }).ToArray();

var bulkCopy = new ClickHouseBulkCopy(connection) { DestinationTableName = targetTable };
await bulkCopy.InitAsync();
try
{
await bulkCopy.WriteToServerAsync(rows);
Expand Down Expand Up @@ -246,6 +250,7 @@ public async Task ShouldExecuteBulkInsertIntoSimpleAggregatedFunctionColumn()
BatchSize = 100
};

await bulkCopy.InitAsync();
await bulkCopy.WriteToServerAsync(Enumerable.Repeat(new[] { (object)1 }, 1), CancellationToken.None);

Assert.AreEqual(1, bulkCopy.RowsWritten);
Expand All @@ -272,6 +277,7 @@ public async Task ShouldNotLoseRowsOnMultipleBatches()
const int Count = 1000;
var data = Enumerable.Repeat(new object[] { 1 }, Count);

await bulkCopy.InitAsync();
await bulkCopy.WriteToServerAsync(data, CancellationToken.None);

Assert.AreEqual(Count, bulkCopy.RowsWritten);
Expand All @@ -291,6 +297,7 @@ public async Task ShouldExecuteWithDBNullArrays()
DestinationTableName = targetTable,
};

await bulkCopy.InitAsync();
await bulkCopy.WriteToServerAsync(new List<object[]>
{
new object[] { DBNull.Value, new[] { 1, 2, 3 } },
Expand Down
4 changes: 2 additions & 2 deletions ClickHouse.Client.Tests/ClickHouse.Client.Tests.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net48;netstandard2.1;netcoreapp3.1;net5.0;net6.0</TargetFrameworks>
<TargetFrameworks>net6.0</TargetFrameworks>
<IsPackable>false</IsPackable>
<LangVersion>latest</LangVersion>
</PropertyGroup>
Expand Down
1 change: 1 addition & 0 deletions ClickHouse.Client.Tests/NestedTableTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public async Task ShouldInsertIntoNestedTableViaBulk()
var row1 = new object[] { 1, new[] { 1, 2, 3 }, new[] { "v1", "v2", "v3" } };
var row2 = new object[] { 2, new[] { 4, 5, 6 }, new[] { "v4", "v5", "v6" } };

await bulkCopy.InitAsync();
await bulkCopy.WriteToServerAsync(new[] { row1, row2 }, CancellationToken.None);
using var reader = await connection.ExecuteReaderAsync("SELECT * FROM test.nested ORDER BY id ASC");

Expand Down
21 changes: 9 additions & 12 deletions ClickHouse.Client.Tests/TestUtilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,22 +147,19 @@ public static IEnumerable<DataTypeSample> GetDataTypeSamples()
yield return new DataTypeSample("DateTime64(7, 'UTC')", typeof(DateTime), "toDateTime64('2043-03-01 18:34:04.4444444', 9, 'UTC')", new DateTime(644444444444444444, DateTimeKind.Utc));
yield return new DataTypeSample("DateTime64(7, 'Pacific/Fiji')", typeof(DateTime), "toDateTime64('2043-03-01 18:34:04.4444444', 9, 'Pacific/Fiji')", new DateTime(644444444444444444, DateTimeKind.Unspecified));

if (SupportedFeatures.HasFlag(Feature.Decimals))
{
yield return new DataTypeSample("Decimal32(3)", typeof(ClickHouseDecimal), "toDecimal32(123.45, 3)", new ClickHouseDecimal(123.450m));
yield return new DataTypeSample("Decimal32(3)", typeof(ClickHouseDecimal), "toDecimal32(-123.45, 3)", new ClickHouseDecimal(-123.450m));
yield return new DataTypeSample("Decimal32(3)", typeof(ClickHouseDecimal), "toDecimal32(123.45, 3)", new ClickHouseDecimal(123.450m));
yield return new DataTypeSample("Decimal32(3)", typeof(ClickHouseDecimal), "toDecimal32(-123.45, 3)", new ClickHouseDecimal(-123.450m));

yield return new DataTypeSample("Decimal64(7)", typeof(ClickHouseDecimal), "toDecimal64(1.2345, 7)", new ClickHouseDecimal(1.2345000m));
yield return new DataTypeSample("Decimal64(7)", typeof(ClickHouseDecimal), "toDecimal64(-1.2345, 7)", new ClickHouseDecimal(-1.2345000m));
yield return new DataTypeSample("Decimal64(7)", typeof(ClickHouseDecimal), "toDecimal64(1.2345, 7)", new ClickHouseDecimal(1.2345000m));
yield return new DataTypeSample("Decimal64(7)", typeof(ClickHouseDecimal), "toDecimal64(-1.2345, 7)", new ClickHouseDecimal(-1.2345000m));

yield return new DataTypeSample("Decimal128(9)", typeof(ClickHouseDecimal), "toDecimal128(12.34, 9)", new ClickHouseDecimal(12.340000000m));
yield return new DataTypeSample("Decimal128(9)", typeof(ClickHouseDecimal), "toDecimal128(-12.34, 9)", new ClickHouseDecimal(-12.340000000m));
yield return new DataTypeSample("Decimal128(9)", typeof(ClickHouseDecimal), "toDecimal128(12.34, 9)", new ClickHouseDecimal(12.340000000m));
yield return new DataTypeSample("Decimal128(9)", typeof(ClickHouseDecimal), "toDecimal128(-12.34, 9)", new ClickHouseDecimal(-12.340000000m));

yield return new DataTypeSample("Decimal128(25)", typeof(ClickHouseDecimal), "toDecimal128(1e-24, 25)", new ClickHouseDecimal(10e-25m));
yield return new DataTypeSample("Decimal128(0)", typeof(ClickHouseDecimal), "toDecimal128(repeat('1', 30), 0)", ClickHouseDecimal.Parse(new string('1', 30)));
yield return new DataTypeSample("Decimal128(25)", typeof(ClickHouseDecimal), "toDecimal128(1e-24, 25)", new ClickHouseDecimal(10e-25m));
yield return new DataTypeSample("Decimal128(0)", typeof(ClickHouseDecimal), "toDecimal128(repeat('1', 30), 0)", ClickHouseDecimal.Parse(new string('1', 30)));

yield return new DataTypeSample("Decimal128(30)", typeof(ClickHouseDecimal), "toDecimal128(1, 30)", new ClickHouseDecimal(BigInteger.Pow(10, 30), 30));
}
yield return new DataTypeSample("Decimal128(30)", typeof(ClickHouseDecimal), "toDecimal128(1, 30)", new ClickHouseDecimal(BigInteger.Pow(10, 30), 30));

if (SupportedFeatures.HasFlag(Feature.WideTypes))
{
Expand Down
186 changes: 92 additions & 94 deletions ClickHouse.Client/Copy/ClickHouseBulkCopy.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Data;
using System.IO;
Expand All @@ -18,7 +19,7 @@ namespace ClickHouse.Client.Copy;
public class ClickHouseBulkCopy : IDisposable
{
private readonly ClickHouseConnection connection;
private bool ownsConnection;
private readonly bool ownsConnection;
private long rowsWritten;
private (string[] names, ClickHouseType[] types) columnNamesAndTypes;

Expand Down Expand Up @@ -70,9 +71,8 @@ public ClickHouseBulkCopy(string connectionString)

/// <summary>
/// One-time init operation to load column types using provided names
/// Use to reduce number of service queries
/// Required to call before WriteToServerAsync
/// </summary>
/// <param name="names">Names of columns which will be inserted</param>
/// <returns>Awaitable task</returns>
public async Task InitAsync()
{
Expand All @@ -86,129 +86,50 @@ public async Task InitAsync()
public Task WriteToServerAsync(IDataReader reader, CancellationToken token)
{
if (reader is null)
{
throw new ArgumentNullException(nameof(reader));
}

#pragma warning disable CS0618 // Type or member is obsolete
return WriteToServerAsync(reader.AsEnumerable(), reader.GetColumnNames(), token);
#pragma warning restore CS0618 // Type or member is obsolete
return WriteToServerAsync(reader.AsEnumerable(), token);
}

public Task WriteToServerAsync(DataTable table, CancellationToken token)
{
if (table is null)
{
throw new ArgumentNullException(nameof(table));
}

var rows = table.Rows.Cast<DataRow>().Select(r => r.ItemArray); // enumerable
var columns = table.Columns.Cast<DataColumn>().Select(c => c.ColumnName).ToArray();
#pragma warning disable CS0618 // Type or member is obsolete
return WriteToServerAsync(rows, columns, token);
#pragma warning restore CS0618 // Type or member is obsolete
var rows = table.Rows.Cast<DataRow>().Select(r => r.ItemArray);
return WriteToServerAsync(rows, token);
}

public Task WriteToServerAsync(IEnumerable<object[]> rows) => WriteToServerAsync(rows, null, CancellationToken.None);

public Task WriteToServerAsync(IEnumerable<object[]> rows, CancellationToken token) => WriteToServerAsync(rows, null, token);
public Task WriteToServerAsync(IEnumerable<object[]> rows) => WriteToServerAsync(rows, CancellationToken.None);

[Obsolete("Use InitColumnsAsync method instead to set columns")]
public Task WriteToServerAsync(IEnumerable<object[]> rows, IReadOnlyCollection<string> columns) => WriteToServerAsync(rows, columns, CancellationToken.None);

[Obsolete("Use InitColumnsAsync method instead to set columns")]
public async Task WriteToServerAsync(IEnumerable<object[]> rows, IReadOnlyCollection<string> columns, CancellationToken token)
public async Task WriteToServerAsync(IEnumerable<object[]> rows, CancellationToken token)
{
if (rows is null)
{
throw new ArgumentNullException(nameof(rows));
}

if (string.IsNullOrWhiteSpace(DestinationTableName))
{
throw new InvalidOperationException("Destination table not set");
}

var (columnNames, columnTypes) = columnNamesAndTypes;
if (columnNames == null || columnTypes == null)
throw new InvalidOperationException("Column names not initialized. Call InitAsync once to load column data");

if (columns != null)
{
// Deprecated path
// If the list of columns was explicitly provided, avoid cache
(columnNames, columnTypes) = await LoadNamesAndTypesAsync(DestinationTableName, columns).ConfigureAwait(false);
}
else if (columnNames == null || columnNames.Length == 0)
{
(columnNames, columnTypes) = await LoadNamesAndTypesAsync(DestinationTableName, null).ConfigureAwait(false);
}
var query = $"INSERT INTO {DestinationTableName} ({string.Join(", ", columnNames)}) FORMAT RowBinary";

var tasks = new Task[MaxDegreeOfParallelism];
for (var i = 0; i < tasks.Length; i++)
{
tasks[i] = Task.CompletedTask;
}

var query = $"INSERT INTO {DestinationTableName} ({string.Join(", ", columnNames)}) FORMAT RowBinary";
bool useInlineQuery = connection.SupportedFeatures.HasFlag(Feature.InlineQuery);

// Variables are outside the loop to capture context in case of exception
object[] row = null;
int col = 0;
var enumerator = rows.GetEnumerator();
bool hasMore = false;
do
foreach (var batch in IntoBatches(rows, query, columnTypes))
{
token.ThrowIfCancellationRequested();
var stream = new MemoryStream() { Capacity = 4 * 1024 };
int counter = 0;
using (var gzipStream = new BufferedStream(new GZipStream(stream, CompressionLevel.Fastest, true), 256 * 1024))
{
if (useInlineQuery)
{
using var textWriter = new StreamWriter(gzipStream, Encoding.UTF8, 4 * 1024, true);
textWriter.WriteLine(query);
}

using var writer = new ExtendedBinaryWriter(gzipStream);

try
{
while (hasMore = enumerator.MoveNext())
{
row = enumerator.Current;
for (col = 0; col < row.Length; col++)
{
columnTypes[col].Write(writer, row[col]);
}
counter++;

if (counter >= BatchSize)
break; // We've reached the batch size
}
}
catch (Exception e)
{
throw new ClickHouseBulkCopySerializationException(row, col, e);
}
}

token.ThrowIfCancellationRequested();
stream.Seek(0, SeekOrigin.Begin);

while (true)
{
var completedTaskIndex = Array.FindIndex(tasks, t => t.IsCompleted);
if (completedTaskIndex >= 0)
{
async Task SendBatch()
{
using (stream)
{
await connection.PostStreamAsync(useInlineQuery ? null : query, stream, true, token).ConfigureAwait(false);
Interlocked.Add(ref rowsWritten, counter);
}
}
tasks[completedTaskIndex] = SendBatch();
tasks[completedTaskIndex] = SendBatchAsync(batch, token);
break; // while (true); go to next batch
}
else
Expand All @@ -218,20 +139,97 @@ async Task SendBatch()
}
}
}
while (hasMore);

await Task.WhenAll(tasks).ConfigureAwait(false);
}

private Stream SerializeBatch(Batch batch)
{
var stream = new MemoryStream() { Capacity = 8 * 1024 };

using (var gzipStream = new BufferedStream(new GZipStream(stream, CompressionLevel.Fastest, true), 256 * 1024))
{
using (var textWriter = new StreamWriter(gzipStream, Encoding.UTF8, 4 * 1024, true))
{
textWriter.WriteLine(batch.Query);
}

using var writer = new ExtendedBinaryWriter(gzipStream);

int col = 0;
object[] row = null;
int counter = 0;
var enumerator = batch.Rows.GetEnumerator();
try
{
while (enumerator.MoveNext())
{
row = (object[])enumerator.Current;
for (col = 0; col < row.Length; col++)
{
batch.Types[col].Write(writer, row[col]);
}
counter++;
if (counter >= batch.Size)
break; // We've reached the batch size
}
}
catch (Exception e)
{
throw new ClickHouseBulkCopySerializationException(row, col, e);
}
}
stream.Seek(0, SeekOrigin.Begin);
return stream;
}

private async Task SendBatchAsync(Batch batch, CancellationToken token)
{
using (batch) // Dispose object regardless whether sending succeeds
{
// Async serialization
using var stream = await Task.Run(() => SerializeBatch(batch)).ConfigureAwait(false);
// Async sending
await connection.PostStreamAsync(null, stream, true, token).ConfigureAwait(false);
// Increase counter
Interlocked.Add(ref rowsWritten, batch.Size);
}
}

public void Dispose()
{
if (ownsConnection)
{
connection?.Dispose();
ownsConnection = false;
}
GC.SuppressFinalize(this);
}

private static string GetColumnsExpression(IReadOnlyCollection<string> columns) => columns == null || columns.Count == 0 ? "*" : string.Join(",", columns);

private IEnumerable<Batch> IntoBatches(IEnumerable<object[]> rows, string query, ClickHouseType[] types)
{
foreach (var (batch, size) in rows.BatchRented(BatchSize))
{
yield return new Batch { Rows = batch, Size = size, Query = query, Types = types };
}
}

// Convenience argument collection
private struct Batch : IDisposable
{
public object[] Rows;
public int Size;
public string Query;
public ClickHouseType[] Types;

public void Dispose()
{
if (Rows != null)
{
ArrayPool<object>.Shared.Return(Rows);
Rows = null;
}
}
}
}
Loading

0 comments on commit 458e579

Please sign in to comment.