Skip to content

Commit

Permalink
Add ability to set columns for BulkCopy (#383)
Browse files Browse the repository at this point in the history
  • Loading branch information
DarkWanderer authored Nov 6, 2023
1 parent 940ee83 commit 096f515
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 30 deletions.
6 changes: 4 additions & 2 deletions ClickHouse.Client.Benchmark/BulkInsertColumn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public BulkInsertColumn()
{
var connectionString = Environment.GetEnvironmentVariable("CLICKHOUSE_CONNECTION");
connection = new ClickHouseConnection(connectionString);

var targetTable = $"test.benchmark_bulk_insert_int64";

// Create database and table for benchmark
Expand All @@ -42,8 +42,10 @@ public BulkInsertColumn()
{
DestinationTableName = targetTable,
BatchSize = 10000,
MaxDegreeOfParallelism = 1
MaxDegreeOfParallelism = 1,
ColumnNames = new[] { "col1" }
};
bulkCopy.InitAsync().Wait();
}

[Benchmark]
Expand Down
20 changes: 11 additions & 9 deletions ClickHouse.Client.Tests/BulkCopyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,15 @@ public async Task ShouldExecuteInsertWithLessColumns()
var targetTable = $"test.multiple_columns";

await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value1 Nullable(UInt8), value2 Nullable(Float32), value3 Nullable(Int8)) ENGINE TinyLog");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value1 Nullable(UInt8), value2 Nullable(Float32), value3 Nullable(Int8)) ENGINE Memory");

using var bulkCopy = new ClickHouseBulkCopy(connection)
{
DestinationTableName = targetTable,
ColumnNames = new[] { "value2" }
};

await bulkCopy.WriteToServerAsync(Enumerable.Repeat(new object[] { 5 }, 5), new[] { "value2" }, CancellationToken.None);
await bulkCopy.InitAsync();
await bulkCopy.WriteToServerAsync(Enumerable.Repeat(new object[] { 5 }, 5), CancellationToken.None);

using var reader = await connection.ExecuteReaderAsync($"SELECT * from {targetTable}");
}
Expand All @@ -118,14 +119,15 @@ public async Task ShouldExecuteInsertWithBacktickedColumns()
var targetTable = $"test.backticked_columns";

await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (`field.id` Nullable(UInt8), `@value` Nullable(UInt8)) ENGINE TinyLog");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (`field.id` Nullable(UInt8), `@value` Nullable(UInt8)) ENGINE Memory");

using var bulkCopy = new ClickHouseBulkCopy(connection)
{
DestinationTableName = targetTable,
ColumnNames = new[] { "`field.id`, `@value`" }
};

await bulkCopy.WriteToServerAsync(Enumerable.Repeat(new object[] { 5, 5 }, 5), new[] { "`field.id`, `@value`" });
await bulkCopy.WriteToServerAsync(Enumerable.Repeat(new object[] { 5, 5 }, 5));

using var reader = await connection.ExecuteReaderAsync($"SELECT * FROM {targetTable}");
}
Expand All @@ -149,7 +151,7 @@ public async Task ShouldExecuteBulkInsertWithComplexColumnName(string columnName
var targetTable = "test." + SanitizeTableName($"bulk_complex_{columnName}");

await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (`{columnName.Replace("`", "\\`")}` Int32) ENGINE TinyLog");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (`{columnName.Replace("`", "\\`")}` Int32) ENGINE Memory");

using var bulkCopy = new ClickHouseBulkCopy(connection)
{
Expand Down Expand Up @@ -216,7 +218,7 @@ public async Task ShouldExecuteBulkInsertIntoSimpleAggregatedFunctionColumn()
var targetTable = "test." + SanitizeTableName($"bulk_simple_aggregated_function");

await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value SimpleAggregateFunction(anyLast,Nullable(Float64))) ENGINE TinyLog");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value SimpleAggregateFunction(anyLast,Nullable(Float64))) ENGINE Memory");

using var bulkCopy = new ClickHouseBulkCopy(connection)
{
Expand All @@ -234,12 +236,12 @@ public async Task ShouldExecuteBulkInsertIntoSimpleAggregatedFunctionColumn()


[Test]
public async Task ShouldNotLoseRowsOnMuptipleBatches()
public async Task ShouldNotLoseRowsOnMultipleBatches()
{
var targetTable = "test.bulk_multiple_batches"; ;

await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value Int32) ENGINE TinyLog");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value Int32) ENGINE Memory");

using var bulkCopy = new ClickHouseBulkCopy(connection)
{
Expand Down
2 changes: 1 addition & 1 deletion ClickHouse.Client.Tests/NestedTableTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class NestedTableTests : AbstractConnectionTestFixture
public async Task Setup()
{
await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {Table}");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {Table}(id UInt32, params Nested (param_id UInt8, param_val String)) ENGINE TinyLog");
await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {Table}(id UInt32, params Nested (param_id UInt8, param_val String)) ENGINE Memory");
}

[Test]
Expand Down
2 changes: 1 addition & 1 deletion ClickHouse.Client.Tests/ORM/DapperContribTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public record class TestRecord(int Id, string Value, DateTime Timestamp);
public async Task SetUp()
{
await connection.ExecuteStatementAsync("TRUNCATE TABLE IF EXISTS test.dapper_contrib");
await connection.ExecuteStatementAsync("CREATE TABLE IF NOT EXISTS test.dapper_contrib (Id Int32, Value String, Timestamp DateTime('UTC')) ENGINE TinyLog");
await connection.ExecuteStatementAsync("CREATE TABLE IF NOT EXISTS test.dapper_contrib (Id Int32, Value String, Timestamp DateTime('UTC')) ENGINE Memory");
await connection.ExecuteStatementAsync("INSERT INTO test.dapper_contrib VALUES (1, 'value', toDateTime('2023/04/15 01:02:03', 'UTC'))");
}

Expand Down
31 changes: 29 additions & 2 deletions ClickHouse.Client.Tests/ORM/DapperTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ public class DapperTests : AbstractConnectionTestFixture

static DapperTests()
{
SqlMapper.AddTypeHandler(new DateTimeOffsetHandler());
SqlMapper.AddTypeHandler(new ClickHouseDecimalHandler());
SqlMapper.AddTypeHandler(new DateTimeOffsetHandler());
SqlMapper.AddTypeHandler(new ITupleHandler());
SqlMapper.AddTypeMap(typeof(DateTime), DbType.DateTime2);
SqlMapper.AddTypeMap(typeof(DateTimeOffset), DbType.DateTime2);
}

// "The member value of type <xxxxxxxx> cannot be used as a parameter value"
Expand Down Expand Up @@ -104,13 +106,30 @@ public async Task ShouldExecuteSimpleSelect()
[Test]
[Parallelizable]
[TestCaseSource(typeof(DapperTests), nameof(SimpleSelectQueries))]
public async Task ShouldExecuteSelectWithSingleParameterValue(string sql, object value)
public async Task ShouldExecuteSelectStringWithSingleParameterValue(string sql, object value)
{
var parameters = new Dictionary<string, object> { { "value", value } };
var results = await connection.QueryAsync<string>(sql, parameters);
Assert.AreEqual(Convert.ToString(value, CultureInfo.InvariantCulture), results.Single());
}

[Test]
[Parallelizable]
[TestCaseSource(typeof(DapperTests), nameof(SimpleSelectQueries))]
public async Task ShouldExecuteSelectWithSingleParameterValue(string sql, object expected)
{
var parameters = new Dictionary<string, object> { { "value", expected } };
var rows = await connection.QueryAsync(sql, parameters);
IDictionary<string, object> row = rows.Single();

// Workaround: Dapper does not specify type, so
// DateTime is always mapped as CH's 32-bit DateTime
if (expected is DateTime dt)
expected = dt.AddTicks(-dt.Ticks % TimeSpan.TicksPerSecond);

Assert.AreEqual(expected, row.Single().Value);
}

[Test]
public async Task ShouldExecuteSelectWithArrayParameter()
{
Expand All @@ -122,6 +141,14 @@ public async Task ShouldExecuteSelectWithArrayParameter()
CollectionAssert.AllItemsAreNotNull(functions);
}

[Test]
public async Task ShouldExecuteSelectReturningNullable()
{
string sql = "SELECT toNullable(5)";
var result = (await connection.QueryAsync<int?>(sql)).Single();
Assert.AreEqual(5, result);
}

[Test]
public async Task ShouldExecuteSelectReturningArray()
{
Expand Down
8 changes: 4 additions & 4 deletions ClickHouse.Client.Tests/SQL/ParameterizedInsertTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class ParameterizedInsertTests : AbstractConnectionTestFixture
public async Task ShouldInsertParameterizedFloat64Array()
{
await connection.ExecuteStatementAsync("TRUNCATE TABLE IF EXISTS test.float_array");
await connection.ExecuteStatementAsync("CREATE TABLE IF NOT EXISTS test.float_array (arr Array(Float64)) ENGINE TinyLog");
await connection.ExecuteStatementAsync("CREATE TABLE IF NOT EXISTS test.float_array (arr Array(Float64)) ENGINE Memory");

var command = connection.CreateCommand();
command.AddParameter("values", new[] { 1.0, 2.0, 3.0 });
Expand All @@ -28,7 +28,7 @@ public async Task ShouldInsertParameterizedFloat64Array()
public async Task ShouldInsertEnum8()
{
await connection.ExecuteStatementAsync("TRUNCATE TABLE IF EXISTS test.insert_enum8");
await connection.ExecuteStatementAsync("CREATE TABLE IF NOT EXISTS test.insert_enum8 (enum Enum8('a' = -1, 'b' = 127)) ENGINE TinyLog");
await connection.ExecuteStatementAsync("CREATE TABLE IF NOT EXISTS test.insert_enum8 (enum Enum8('a' = -1, 'b' = 127)) ENGINE Memory");

var command = connection.CreateCommand();
command.AddParameter("value", "a");
Expand All @@ -45,7 +45,7 @@ public async Task ShouldInsertParameterizedUUIDArray()
{
await connection.ExecuteStatementAsync("TRUNCATE TABLE IF EXISTS test.uuid_array");
await connection.ExecuteStatementAsync(
"CREATE TABLE IF NOT EXISTS test.uuid_array (arr Array(UUID)) ENGINE TinyLog");
"CREATE TABLE IF NOT EXISTS test.uuid_array (arr Array(UUID)) ENGINE Memory");

var command = connection.CreateCommand();
command.AddParameter("values", new[] { Guid.NewGuid(), Guid.NewGuid(), });
Expand All @@ -61,7 +61,7 @@ public async Task ShouldInsertStringWithNewline()
{
await connection.ExecuteStatementAsync("TRUNCATE TABLE IF EXISTS test.string_with_newline");
await connection.ExecuteStatementAsync(
"CREATE TABLE IF NOT EXISTS test.string_with_newline (str_value String) ENGINE TinyLog");
"CREATE TABLE IF NOT EXISTS test.string_with_newline (str_value String) ENGINE Memory");

var command = connection.CreateCommand();

Expand Down
59 changes: 48 additions & 11 deletions ClickHouse.Client/Copy/ClickHouseBulkCopy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class ClickHouseBulkCopy : IDisposable
private readonly ClickHouseConnection connection;
private bool ownsConnection;
private long rowsWritten;
private (string[] names, ClickHouseType[] types) columnNamesAndTypes;

public ClickHouseBulkCopy(ClickHouseConnection connection)
{
Expand All @@ -45,15 +46,43 @@ public ClickHouseBulkCopy(string connectionString)
public int MaxDegreeOfParallelism { get; set; } = 4;

/// <summary>
/// Gets or sets name of destination table to insert to. "SELECT ..columns.. LIMIT 0" query is performed before insertion.
/// Gets name of destination table to insert to
/// </summary>
public string DestinationTableName { get; set; }
public string DestinationTableName { get; init; }

/// <summary>
/// Gets columns
/// </summary>
public IReadOnlyCollection<string> ColumnNames { get; init; }

private async Task<(string[] names, ClickHouseType[] types)> LoadNamesAndTypesAsync(string destinationTableName, IReadOnlyCollection<string> columns = null)
{
using var reader = (ClickHouseDataReader)await connection.ExecuteReaderAsync($"SELECT {GetColumnsExpression(columns)} FROM {DestinationTableName} WHERE 1=0").ConfigureAwait(false);
var types = reader.GetClickHouseColumnTypes();
var names = reader.GetColumnNames().Select(c => c.EncloseColumnName()).ToArray();
return (names, types);
}

/// <summary>
/// Gets total number of rows written by this instance.
/// </summary>
public long RowsWritten => Interlocked.Read(ref rowsWritten);

/// <summary>
/// One-time init operation to load column types using provided names
/// Use to reduce number of service queries
/// </summary>
/// <param name="names">Names of columns which will be inserted</param>
/// <returns>Awaitable task</returns>
public async Task InitAsync()
{
if (ColumnNames is null)
throw new InvalidOperationException($"{nameof(ColumnNames)} is null");
if (DestinationTableName is null)
throw new InvalidOperationException($"{nameof(DestinationTableName)} is null");
columnNamesAndTypes = await LoadNamesAndTypesAsync(DestinationTableName, ColumnNames).ConfigureAwait(false);
}

public Task WriteToServerAsync(IDataReader reader) => WriteToServerAsync(reader, CancellationToken.None);

public Task WriteToServerAsync(IDataReader reader, CancellationToken token)
Expand All @@ -63,7 +92,9 @@ public Task WriteToServerAsync(IDataReader reader, CancellationToken token)
throw new ArgumentNullException(nameof(reader));
}

#pragma warning disable CS0618 // Type or member is obsolete
return WriteToServerAsync(reader.AsEnumerable(), reader.GetColumnNames(), token);
#pragma warning restore CS0618 // Type or member is obsolete
}

public Task WriteToServerAsync(DataTable table, CancellationToken token)
Expand All @@ -75,15 +106,19 @@ public Task WriteToServerAsync(DataTable table, CancellationToken token)

var rows = table.Rows.Cast<DataRow>().Select(r => r.ItemArray); // enumerable
var columns = table.Columns.Cast<DataColumn>().Select(c => c.ColumnName).ToArray();
#pragma warning disable CS0618 // Type or member is obsolete
return WriteToServerAsync(rows, columns, token);
#pragma warning restore CS0618 // Type or member is obsolete
}

public Task WriteToServerAsync(IEnumerable<object[]> rows) => WriteToServerAsync(rows, null, CancellationToken.None);

public Task WriteToServerAsync(IEnumerable<object[]> rows, IReadOnlyCollection<string> columns) => WriteToServerAsync(rows, columns, CancellationToken.None);

public Task WriteToServerAsync(IEnumerable<object[]> rows, CancellationToken token) => WriteToServerAsync(rows, null, token);

[Obsolete("Use InitColumnsAsync method instead to set columns")]
public Task WriteToServerAsync(IEnumerable<object[]> rows, IReadOnlyCollection<string> columns) => WriteToServerAsync(rows, columns, CancellationToken.None);

[Obsolete("Use InitColumnsAsync method instead to set columns")]
public async Task WriteToServerAsync(IEnumerable<object[]> rows, IReadOnlyCollection<string> columns, CancellationToken token)
{
if (rows is null)
Expand All @@ -96,16 +131,18 @@ public async Task WriteToServerAsync(IEnumerable<object[]> rows, IReadOnlyCollec
throw new InvalidOperationException("Destination table not set");
}

ClickHouseType[] columnTypes = null;
string[] columnNames = columns?.ToArray();
var (columnNames, columnTypes) = columnNamesAndTypes;

using (var reader = (ClickHouseDataReader)await connection.ExecuteReaderAsync($"SELECT {ClickHouseBulkCopy.GetColumnsExpression(columns)} FROM {DestinationTableName} WHERE 1=0").ConfigureAwait(false))
if (columns != null)
{
// Deprecated path
// If the list of columns was explicitly provided, avoid cache
(columnNames, columnTypes) = await LoadNamesAndTypesAsync(DestinationTableName, columns).ConfigureAwait(false);
}
else if (columnNames == null || columnNames.Length == 0)
{
columnTypes = reader.GetClickHouseColumnTypes();
columnNames ??= reader.GetColumnNames();
(columnNames, columnTypes) = await LoadNamesAndTypesAsync(DestinationTableName, null).ConfigureAwait(false);
}
for (int i = 0; i < columnNames.Length; i++)
columnNames[i] = columnNames[i].EncloseColumnName();

var tasks = new Task[MaxDegreeOfParallelism];
for (var i = 0; i < tasks.Length; i++)
Expand Down

0 comments on commit 096f515

Please sign in to comment.