Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions sample/issue922_benchmark.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# frozen_string_literal: true

# rubocop:disable Style/OneClassPerFile
require 'duckdb'
require 'polars-df'
require 'tmpdir'

class PolarsDataFrameTableAdapter
def call(data_frame, name, columns: nil)
columns ||= infer_columns(data_frame)
DuckDB::TableFunction.create(name:, columns:, &execute_block(data_frame))
end

private

def execute_block(data_frame)
counter = 0
height = data_frame.height
width = data_frame.width
proc do |_func_info, output|
next counter = 0 if counter >= height

write_row(data_frame, output, counter, width)
counter += 1
1
end
end

def write_row(data_frame, output, counter, width)
width.times { |index| output.set_value(index, 0, data_frame[counter, index]) }
end

def infer_columns(data_frame)
data_frame.columns.to_h { |header| [header, DuckDB::LogicalType::VARCHAR] }
end
end

# Batch approach: write BATCH_SIZE rows per execute call to reduce Ruby<->C crossings
class PolarsDataFrameBatchTableAdapter
BATCH_SIZE = 2048

def call(data_frame, name, columns: nil)
columns ||= infer_columns(data_frame)
DuckDB::TableFunction.create(name:, columns:, &execute_block(data_frame))
end

private

def execute_block(data_frame)
offset = 0
height = data_frame.height
width = data_frame.width
proc do |_func_info, output|
next offset = 0 if offset >= height

rows = [height - offset, BATCH_SIZE].min
write_batch(data_frame, output, offset, rows, width)
offset += rows
rows
end
end

def write_batch(data_frame, output, offset, rows, width)
rows.times do |row_idx|
width.times { |col_idx| output.set_value(col_idx, row_idx, data_frame[offset + row_idx, col_idx]) }
end
end

def infer_columns(data_frame)
data_frame.columns.to_h { |header| [header, DuckDB::LogicalType::VARCHAR] }
end
end

# Optimized batch approach: pre-extract columns as Ruby arrays to avoid
# repeated Polars FFI calls, and use assign_string_element to skip type dispatch
class PolarsDataFrameOptimizedTableAdapter
BATCH_SIZE = 2048

def call(data_frame, name, columns: nil)
columns ||= infer_columns(data_frame)
DuckDB::TableFunction.create(name:, columns:, &execute_block(data_frame))
end

private

# rubocop:disable Metrics/MethodLength
def execute_block(data_frame)
col_arrays = extract_columns(data_frame)
offset = 0
height = data_frame.height
width = data_frame.width
proc do |_func_info, output|
next offset = 0 if offset >= height

rows = [height - offset, BATCH_SIZE].min
vectors = width.times.map { |i| output.get_vector(i) }
write_batch(col_arrays, vectors, offset, rows)
offset += rows
rows
end
end
# rubocop:enable Metrics/MethodLength

def extract_columns(data_frame)
data_frame.columns.map { |col| data_frame[col].cast(Polars::Utf8).to_a }
end

def write_batch(col_arrays, vectors, offset, rows)
col_arrays.each_with_index do |col_data, col_idx|
vec = vectors[col_idx]
rows.times { |row_idx| vec.assign_string_element(row_idx, col_data[offset + row_idx].to_s) }
end
end

def infer_columns(data_frame)
data_frame.columns.to_h { |header| [header, DuckDB::LogicalType::VARCHAR] }
end
end

def query_via_parquet(con, data_frame, name, parquet_path)
data_frame.write_parquet(parquet_path)
con.query("CREATE OR REPLACE TABLE #{name} AS SELECT * FROM read_parquet('#{parquet_path}')")
con.query("SELECT * FROM #{name}").to_a
Comment on lines +120 to +123
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Harden SQL construction for identifier/literal safety.

name and parquet_path are interpolated directly into SQL. This can break on quotes and is unsafe if reused with non-constant input.

🔧 Proposed fix
 def query_via_parquet(con, data_frame, name, parquet_path)
+  quoted_name = %("#{name.to_s.gsub('"', '""')}")
+  quoted_path = "'#{parquet_path.to_s.gsub("'", "''")}'"
   data_frame.write_parquet(parquet_path)
-  con.query("CREATE OR REPLACE TABLE #{name} AS SELECT * FROM read_parquet('#{parquet_path}')")
-  con.query("SELECT * FROM #{name}").to_a
+  con.query("CREATE OR REPLACE TABLE #{quoted_name} AS SELECT * FROM read_parquet(#{quoted_path})")
+  con.query("SELECT * FROM #{quoted_name}").to_a
 end
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sample/issue922_benchmark.rb` around lines 120 - 123, The query_via_parquet
method currently interpolates name and parquet_path directly into SQL
(con.query("CREATE OR REPLACE TABLE #{name} ..." and
read_parquet('#{parquet_path}')), which is unsafe and will break on quotes;
change it to use your DB driver's safe APIs: bind the parquet_path as a
parameter or use a quoted-literal helper for the read_parquet argument, and
quote or escape the table identifier using the connection's identifier-quoting
helper (or a prepared/parameterized CREATE/INSERT pattern) instead of string
interpolation; update the two con.query calls in query_via_parquet to construct
SQL with the connection's quote_identifier/quote_literal or parameter binding to
ensure safe identifiers and literals.

end

df = Polars::DataFrame.new(
{
id: 100_000.times.map { |i| i + 1 },
name: 100_000.times.map { |i| "Name#{i + 1}" },
age: 100_000.times.map { rand(0..100) }
}
)

db = DuckDB::Database.open
con = db.connect
con.query('SET threads=1')

DuckDB::TableFunction.add_table_adapter(Polars::DataFrame, PolarsDataFrameTableAdapter.new)
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
con.expose_as_table(df, 'polars_tf')
con.query('SELECT * FROM polars_tf()').to_a
end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)

DuckDB::TableFunction.add_table_adapter(Polars::DataFrame, PolarsDataFrameBatchTableAdapter.new)
start_time3 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
con.expose_as_table(df, 'polars_tf_batch')
con.query('SELECT * FROM polars_tf_batch()').to_a
end_time3 = Process.clock_gettime(Process::CLOCK_MONOTONIC)

DuckDB::TableFunction.add_table_adapter(Polars::DataFrame, PolarsDataFrameOptimizedTableAdapter.new)
start_time4 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
con.expose_as_table(df, 'polars_tf_opt')
con.query('SELECT * FROM polars_tf_opt()').to_a
end_time4 = Process.clock_gettime(Process::CLOCK_MONOTONIC)

parquet_path = File.join(Dir.tmpdir, 'issue922_benchmark.parquet')
start_time2 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
query_via_parquet(con, df, 'polars_pq', parquet_path)
end_time2 = Process.clock_gettime(Process::CLOCK_MONOTONIC)

con.close
db.close
File.delete(parquet_path)
Comment on lines +134 to +163
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Wrap benchmark lifecycle in ensure and avoid fixed tmp filename.

Connection/database close and parquet cleanup should run even if a query fails. Also, a fixed tmp filename can collide across concurrent runs.

🔧 Proposed fix
-parquet_path = File.join(Dir.tmpdir, 'issue922_benchmark.parquet')
-start_time2 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
-query_via_parquet(con, df, 'polars_pq', parquet_path)
-end_time2 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
-
-con.close
-db.close
-File.delete(parquet_path)
+parquet_path = File.join(Dir.tmpdir, "issue922_benchmark_#{Process.pid}_#{Time.now.to_i}.parquet")
+begin
+  start_time2 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
+  query_via_parquet(con, df, 'polars_pq', parquet_path)
+  end_time2 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ensure
+  con&.close
+  db&.close
+  File.delete(parquet_path) if File.exist?(parquet_path)
+end
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@sample/issue922_benchmark.rb` around lines 134 - 163, Wrap the benchmark
resource lifecycle in an ensure block so the connection and database are always
closed and the parquet file always removed even on errors: acquire db and con
via DuckDB::Database.open and con.connect, run the benchmark calls
(con.expose_as_table, queries, and query_via_parquet) in the begin section, and
move con.close, db.close and parquet cleanup into ensure. Replace the fixed
parquet_path with a unique temporary file (use Tempfile or Dir::Tmpname to
generate a temp pathname) and ensure that tempfile is closed/unlinked in the
ensure block after query_via_parquet completes.


puts "Time taken for table function approach (1 row/call): #{end_time - start_time} seconds"
puts "Time taken for table function approach (batch/call): #{end_time3 - start_time3} seconds"
puts "Time taken for table function approach (batch + pre-extract): #{end_time4 - start_time4} seconds"
puts "Time taken for parquet file approach: #{end_time2 - start_time2} seconds"
# rubocop:enable Style/OneClassPerFile