From 096f5157ce59c974b221c5e84b6b58355cb6176a Mon Sep 17 00:00:00 2001 From: "Oleg V. Kozlyuk" Date: Mon, 6 Nov 2023 19:41:17 +0100 Subject: [PATCH] Add ability to set columns for BulkCopy (#383) --- .../BulkInsertColumn.cs | 6 +- ClickHouse.Client.Tests/BulkCopyTests.cs | 20 ++++--- ClickHouse.Client.Tests/NestedTableTests.cs | 2 +- .../ORM/DapperContribTests.cs | 2 +- ClickHouse.Client.Tests/ORM/DapperTests.cs | 31 +++++++++- .../SQL/ParameterizedInsertTests.cs | 8 +-- ClickHouse.Client/Copy/ClickHouseBulkCopy.cs | 59 +++++++++++++++---- 7 files changed, 98 insertions(+), 30 deletions(-) diff --git a/ClickHouse.Client.Benchmark/BulkInsertColumn.cs b/ClickHouse.Client.Benchmark/BulkInsertColumn.cs index 0acaf5ab..0f91120f 100644 --- a/ClickHouse.Client.Benchmark/BulkInsertColumn.cs +++ b/ClickHouse.Client.Benchmark/BulkInsertColumn.cs @@ -31,7 +31,7 @@ public BulkInsertColumn() { var connectionString = Environment.GetEnvironmentVariable("CLICKHOUSE_CONNECTION"); connection = new ClickHouseConnection(connectionString); - + var targetTable = $"test.benchmark_bulk_insert_int64"; // Create database and table for benchmark @@ -42,8 +42,10 @@ public BulkInsertColumn() { DestinationTableName = targetTable, BatchSize = 10000, - MaxDegreeOfParallelism = 1 + MaxDegreeOfParallelism = 1, + ColumnNames = new[] { "col1" } }; + bulkCopy.InitAsync().Wait(); } [Benchmark] diff --git a/ClickHouse.Client.Tests/BulkCopyTests.cs b/ClickHouse.Client.Tests/BulkCopyTests.cs index 3629c60d..fc4c1d0a 100644 --- a/ClickHouse.Client.Tests/BulkCopyTests.cs +++ b/ClickHouse.Client.Tests/BulkCopyTests.cs @@ -100,14 +100,15 @@ public async Task ShouldExecuteInsertWithLessColumns() var targetTable = $"test.multiple_columns"; await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}"); - await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value1 Nullable(UInt8), value2 Nullable(Float32), value3 Nullable(Int8)) ENGINE TinyLog"); + await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value1 Nullable(UInt8), value2 Nullable(Float32), value3 Nullable(Int8)) ENGINE Memory"); using var bulkCopy = new ClickHouseBulkCopy(connection) { DestinationTableName = targetTable, + ColumnNames = new[] { "value2" } }; - - await bulkCopy.WriteToServerAsync(Enumerable.Repeat(new object[] { 5 }, 5), new[] { "value2" }, CancellationToken.None); + await bulkCopy.InitAsync(); + await bulkCopy.WriteToServerAsync(Enumerable.Repeat(new object[] { 5 }, 5), CancellationToken.None); using var reader = await connection.ExecuteReaderAsync($"SELECT * from {targetTable}"); } @@ -118,14 +119,15 @@ public async Task ShouldExecuteInsertWithBacktickedColumns() var targetTable = $"test.backticked_columns"; await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}"); - await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (`field.id` Nullable(UInt8), `@value` Nullable(UInt8)) ENGINE TinyLog"); + await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (`field.id` Nullable(UInt8), `@value` Nullable(UInt8)) ENGINE Memory"); using var bulkCopy = new ClickHouseBulkCopy(connection) { DestinationTableName = targetTable, + ColumnNames = new[] { "`field.id`, `@value`" } }; - await bulkCopy.WriteToServerAsync(Enumerable.Repeat(new object[] { 5, 5 }, 5), new[] { "`field.id`, `@value`" }); + await bulkCopy.WriteToServerAsync(Enumerable.Repeat(new object[] { 5, 5 }, 5)); using var reader = await connection.ExecuteReaderAsync($"SELECT * FROM {targetTable}"); } @@ -149,7 +151,7 @@ public async Task ShouldExecuteBulkInsertWithComplexColumnName(string columnName var targetTable = "test." + SanitizeTableName($"bulk_complex_{columnName}"); await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}"); - await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (`{columnName.Replace("`", "\\`")}` Int32) ENGINE TinyLog"); + await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (`{columnName.Replace("`", "\\`")}` Int32) ENGINE Memory"); using var bulkCopy = new ClickHouseBulkCopy(connection) { @@ -216,7 +218,7 @@ public async Task ShouldExecuteBulkInsertIntoSimpleAggregatedFunctionColumn() var targetTable = "test." + SanitizeTableName($"bulk_simple_aggregated_function"); await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}"); - await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value SimpleAggregateFunction(anyLast,Nullable(Float64))) ENGINE TinyLog"); + await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value SimpleAggregateFunction(anyLast,Nullable(Float64))) ENGINE Memory"); using var bulkCopy = new ClickHouseBulkCopy(connection) { @@ -234,12 +236,12 @@ public async Task ShouldExecuteBulkInsertIntoSimpleAggregatedFunctionColumn() [Test] - public async Task ShouldNotLoseRowsOnMuptipleBatches() + public async Task ShouldNotLoseRowsOnMultipleBatches() { var targetTable = "test.bulk_multiple_batches"; ; await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {targetTable}"); - await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value Int32) ENGINE TinyLog"); + await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (value Int32) ENGINE Memory"); using var bulkCopy = new ClickHouseBulkCopy(connection) { diff --git a/ClickHouse.Client.Tests/NestedTableTests.cs b/ClickHouse.Client.Tests/NestedTableTests.cs index 3fd10264..3600fc2b 100644 --- a/ClickHouse.Client.Tests/NestedTableTests.cs +++ b/ClickHouse.Client.Tests/NestedTableTests.cs @@ -15,7 +15,7 @@ public class NestedTableTests : AbstractConnectionTestFixture public async Task Setup() { await connection.ExecuteStatementAsync($"TRUNCATE TABLE IF EXISTS {Table}"); - await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {Table}(id UInt32, params Nested (param_id UInt8, param_val String)) ENGINE TinyLog"); + await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {Table}(id UInt32, params Nested (param_id UInt8, param_val String)) ENGINE Memory"); } [Test] diff --git a/ClickHouse.Client.Tests/ORM/DapperContribTests.cs b/ClickHouse.Client.Tests/ORM/DapperContribTests.cs index 336629ac..6fc2f69e 100644 --- a/ClickHouse.Client.Tests/ORM/DapperContribTests.cs +++ b/ClickHouse.Client.Tests/ORM/DapperContribTests.cs @@ -20,7 +20,7 @@ public record class TestRecord(int Id, string Value, DateTime Timestamp); public async Task SetUp() { await connection.ExecuteStatementAsync("TRUNCATE TABLE IF EXISTS test.dapper_contrib"); - await connection.ExecuteStatementAsync("CREATE TABLE IF NOT EXISTS test.dapper_contrib (Id Int32, Value String, Timestamp DateTime('UTC')) ENGINE TinyLog"); + await connection.ExecuteStatementAsync("CREATE TABLE IF NOT EXISTS test.dapper_contrib (Id Int32, Value String, Timestamp DateTime('UTC')) ENGINE Memory"); await connection.ExecuteStatementAsync("INSERT INTO test.dapper_contrib VALUES (1, 'value', toDateTime('2023/04/15 01:02:03', 'UTC'))"); } diff --git a/ClickHouse.Client.Tests/ORM/DapperTests.cs b/ClickHouse.Client.Tests/ORM/DapperTests.cs index d1f1c935..d3dd35b8 100644 --- a/ClickHouse.Client.Tests/ORM/DapperTests.cs +++ b/ClickHouse.Client.Tests/ORM/DapperTests.cs @@ -22,9 +22,11 @@ public class DapperTests : AbstractConnectionTestFixture static DapperTests() { - SqlMapper.AddTypeHandler(new DateTimeOffsetHandler()); SqlMapper.AddTypeHandler(new ClickHouseDecimalHandler()); + SqlMapper.AddTypeHandler(new DateTimeOffsetHandler()); SqlMapper.AddTypeHandler(new ITupleHandler()); + SqlMapper.AddTypeMap(typeof(DateTime), DbType.DateTime2); + SqlMapper.AddTypeMap(typeof(DateTimeOffset), DbType.DateTime2); } // "The member value of type cannot be used as a parameter value" @@ -104,13 +106,30 @@ public async Task ShouldExecuteSimpleSelect() [Test] [Parallelizable] [TestCaseSource(typeof(DapperTests), nameof(SimpleSelectQueries))] - public async Task ShouldExecuteSelectWithSingleParameterValue(string sql, object value) + public async Task ShouldExecuteSelectStringWithSingleParameterValue(string sql, object value) { var parameters = new Dictionary { { "value", value } }; var results = await connection.QueryAsync(sql, parameters); Assert.AreEqual(Convert.ToString(value, CultureInfo.InvariantCulture), results.Single()); } + [Test] + [Parallelizable] + [TestCaseSource(typeof(DapperTests), nameof(SimpleSelectQueries))] + public async Task ShouldExecuteSelectWithSingleParameterValue(string sql, object expected) + { + var parameters = new Dictionary { { "value", expected } }; + var rows = await connection.QueryAsync(sql, parameters); + IDictionary row = rows.Single(); + + // Workaround: Dapper does not specify type, so + // DateTime is always mapped as CH's 32-bit DateTime + if (expected is DateTime dt) + expected = dt.AddTicks(-dt.Ticks % TimeSpan.TicksPerSecond); + + Assert.AreEqual(expected, row.Single().Value); + } + [Test] public async Task ShouldExecuteSelectWithArrayParameter() { @@ -122,6 +141,14 @@ public async Task ShouldExecuteSelectWithArrayParameter() CollectionAssert.AllItemsAreNotNull(functions); } + [Test] + public async Task ShouldExecuteSelectReturningNullable() + { + string sql = "SELECT toNullable(5)"; + var result = (await connection.QueryAsync(sql)).Single(); + Assert.AreEqual(5, result); + } + [Test] public async Task ShouldExecuteSelectReturningArray() { diff --git a/ClickHouse.Client.Tests/SQL/ParameterizedInsertTests.cs b/ClickHouse.Client.Tests/SQL/ParameterizedInsertTests.cs index 2e7345e8..19d2bbb7 100644 --- a/ClickHouse.Client.Tests/SQL/ParameterizedInsertTests.cs +++ b/ClickHouse.Client.Tests/SQL/ParameterizedInsertTests.cs @@ -13,7 +13,7 @@ public class ParameterizedInsertTests : AbstractConnectionTestFixture public async Task ShouldInsertParameterizedFloat64Array() { await connection.ExecuteStatementAsync("TRUNCATE TABLE IF EXISTS test.float_array"); - await connection.ExecuteStatementAsync("CREATE TABLE IF NOT EXISTS test.float_array (arr Array(Float64)) ENGINE TinyLog"); + await connection.ExecuteStatementAsync("CREATE TABLE IF NOT EXISTS test.float_array (arr Array(Float64)) ENGINE Memory"); var command = connection.CreateCommand(); command.AddParameter("values", new[] { 1.0, 2.0, 3.0 }); @@ -28,7 +28,7 @@ public async Task ShouldInsertParameterizedFloat64Array() public async Task ShouldInsertEnum8() { await connection.ExecuteStatementAsync("TRUNCATE TABLE IF EXISTS test.insert_enum8"); - await connection.ExecuteStatementAsync("CREATE TABLE IF NOT EXISTS test.insert_enum8 (enum Enum8('a' = -1, 'b' = 127)) ENGINE TinyLog"); + await connection.ExecuteStatementAsync("CREATE TABLE IF NOT EXISTS test.insert_enum8 (enum Enum8('a' = -1, 'b' = 127)) ENGINE Memory"); var command = connection.CreateCommand(); command.AddParameter("value", "a"); @@ -45,7 +45,7 @@ public async Task ShouldInsertParameterizedUUIDArray() { await connection.ExecuteStatementAsync("TRUNCATE TABLE IF EXISTS test.uuid_array"); await connection.ExecuteStatementAsync( - "CREATE TABLE IF NOT EXISTS test.uuid_array (arr Array(UUID)) ENGINE TinyLog"); + "CREATE TABLE IF NOT EXISTS test.uuid_array (arr Array(UUID)) ENGINE Memory"); var command = connection.CreateCommand(); command.AddParameter("values", new[] { Guid.NewGuid(), Guid.NewGuid(), }); @@ -61,7 +61,7 @@ public async Task ShouldInsertStringWithNewline() { await connection.ExecuteStatementAsync("TRUNCATE TABLE IF EXISTS test.string_with_newline"); await connection.ExecuteStatementAsync( - "CREATE TABLE IF NOT EXISTS test.string_with_newline (str_value String) ENGINE TinyLog"); + "CREATE TABLE IF NOT EXISTS test.string_with_newline (str_value String) ENGINE Memory"); var command = connection.CreateCommand(); diff --git a/ClickHouse.Client/Copy/ClickHouseBulkCopy.cs b/ClickHouse.Client/Copy/ClickHouseBulkCopy.cs index c642548d..453b55ab 100644 --- a/ClickHouse.Client/Copy/ClickHouseBulkCopy.cs +++ b/ClickHouse.Client/Copy/ClickHouseBulkCopy.cs @@ -20,6 +20,7 @@ public class ClickHouseBulkCopy : IDisposable private readonly ClickHouseConnection connection; private bool ownsConnection; private long rowsWritten; + private (string[] names, ClickHouseType[] types) columnNamesAndTypes; public ClickHouseBulkCopy(ClickHouseConnection connection) { @@ -45,15 +46,43 @@ public ClickHouseBulkCopy(string connectionString) public int MaxDegreeOfParallelism { get; set; } = 4; /// - /// Gets or sets name of destination table to insert to. "SELECT ..columns.. LIMIT 0" query is performed before insertion. + /// Gets name of destination table to insert to /// - public string DestinationTableName { get; set; } + public string DestinationTableName { get; init; } + + /// + /// Gets columns + /// + public IReadOnlyCollection ColumnNames { get; init; } + + private async Task<(string[] names, ClickHouseType[] types)> LoadNamesAndTypesAsync(string destinationTableName, IReadOnlyCollection columns = null) + { + using var reader = (ClickHouseDataReader)await connection.ExecuteReaderAsync($"SELECT {GetColumnsExpression(columns)} FROM {DestinationTableName} WHERE 1=0").ConfigureAwait(false); + var types = reader.GetClickHouseColumnTypes(); + var names = reader.GetColumnNames().Select(c => c.EncloseColumnName()).ToArray(); + return (names, types); + } /// /// Gets total number of rows written by this instance. /// public long RowsWritten => Interlocked.Read(ref rowsWritten); + /// + /// One-time init operation to load column types using provided names + /// Use to reduce number of service queries + /// + /// Names of columns which will be inserted + /// Awaitable task + public async Task InitAsync() + { + if (ColumnNames is null) + throw new InvalidOperationException($"{nameof(ColumnNames)} is null"); + if (DestinationTableName is null) + throw new InvalidOperationException($"{nameof(DestinationTableName)} is null"); + columnNamesAndTypes = await LoadNamesAndTypesAsync(DestinationTableName, ColumnNames).ConfigureAwait(false); + } + public Task WriteToServerAsync(IDataReader reader) => WriteToServerAsync(reader, CancellationToken.None); public Task WriteToServerAsync(IDataReader reader, CancellationToken token) @@ -63,7 +92,9 @@ public Task WriteToServerAsync(IDataReader reader, CancellationToken token) throw new ArgumentNullException(nameof(reader)); } +#pragma warning disable CS0618 // Type or member is obsolete return WriteToServerAsync(reader.AsEnumerable(), reader.GetColumnNames(), token); +#pragma warning restore CS0618 // Type or member is obsolete } public Task WriteToServerAsync(DataTable table, CancellationToken token) @@ -75,15 +106,19 @@ public Task WriteToServerAsync(DataTable table, CancellationToken token) var rows = table.Rows.Cast().Select(r => r.ItemArray); // enumerable var columns = table.Columns.Cast().Select(c => c.ColumnName).ToArray(); +#pragma warning disable CS0618 // Type or member is obsolete return WriteToServerAsync(rows, columns, token); +#pragma warning restore CS0618 // Type or member is obsolete } public Task WriteToServerAsync(IEnumerable rows) => WriteToServerAsync(rows, null, CancellationToken.None); - public Task WriteToServerAsync(IEnumerable rows, IReadOnlyCollection columns) => WriteToServerAsync(rows, columns, CancellationToken.None); - public Task WriteToServerAsync(IEnumerable rows, CancellationToken token) => WriteToServerAsync(rows, null, token); + [Obsolete("Use InitColumnsAsync method instead to set columns")] + public Task WriteToServerAsync(IEnumerable rows, IReadOnlyCollection columns) => WriteToServerAsync(rows, columns, CancellationToken.None); + + [Obsolete("Use InitColumnsAsync method instead to set columns")] public async Task WriteToServerAsync(IEnumerable rows, IReadOnlyCollection columns, CancellationToken token) { if (rows is null) @@ -96,16 +131,18 @@ public async Task WriteToServerAsync(IEnumerable rows, IReadOnlyCollec throw new InvalidOperationException("Destination table not set"); } - ClickHouseType[] columnTypes = null; - string[] columnNames = columns?.ToArray(); + var (columnNames, columnTypes) = columnNamesAndTypes; - using (var reader = (ClickHouseDataReader)await connection.ExecuteReaderAsync($"SELECT {ClickHouseBulkCopy.GetColumnsExpression(columns)} FROM {DestinationTableName} WHERE 1=0").ConfigureAwait(false)) + if (columns != null) + { + // Deprecated path + // If the list of columns was explicitly provided, avoid cache + (columnNames, columnTypes) = await LoadNamesAndTypesAsync(DestinationTableName, columns).ConfigureAwait(false); + } + else if (columnNames == null || columnNames.Length == 0) { - columnTypes = reader.GetClickHouseColumnTypes(); - columnNames ??= reader.GetColumnNames(); + (columnNames, columnTypes) = await LoadNamesAndTypesAsync(DestinationTableName, null).ConfigureAwait(false); } - for (int i = 0; i < columnNames.Length; i++) - columnNames[i] = columnNames[i].EncloseColumnName(); var tasks = new Task[MaxDegreeOfParallelism]; for (var i = 0; i < tasks.Length; i++)