Skip to content

Conversation

@SongChujun
Copy link
Member

Description

Additional context and related issues

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

## Section
* Fix some things. ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Sep 3, 2025
@SongChujun SongChujun force-pushed the null-serialization-optimization branch from 3fd15ce to 595c265 Compare September 3, 2025 22:35
@wendigo
Copy link
Contributor

wendigo commented Sep 4, 2025

Can you elaborate why?

@losipiuk
Copy link
Member

losipiuk commented Sep 4, 2025

IIUC now you are serializing nulls twice. First just null positions and then all the values including null.
So this is CPU vs output size tradeoff. DId you do any micro benchmarks to see what impact do we expect depending on the data shape?

@SongChujun
Copy link
Member Author

SongChujun commented Sep 4, 2025

Can you elaborate why?

Block serialization currently null-suppresses rows during serialization (i.e.: null values are removed from the serialized output and then re-constructed during deserialization by packing / unpacking according to the null masks) This is designed with the assumption that letting CPU perform the null values suppression and thus translating less data over the network is faster compared to not do null values suppression and translate more data over the network. However, this assumption may not be necessarily true consider the improvemnt in modern high throughput low latency network, which is especially true when nulls are rare the CPU cost is almost certainly not time well spent.

So curretly, I am doing microbenchmark and e2e test for this change now, will update this PR when I have more experiment results.

I submit this PR just to run test framework to help catch bugs, I should have put [Not For Merge] in the title of this PR, sorry about that...

@SongChujun
Copy link
Member Author

IIUC now you are serializing nulls twice. First just null positions and then all the values including null. So this is CPU vs output size tradeoff. DId you do any micro benchmarks to see what impact do we expect depending on the data shape?

replied here: #26550 (comment)

@SongChujun SongChujun changed the title Remove null-suppression in block serialization [Not For Merge] Remove null-suppression in block serialization Sep 4, 2025
@SongChujun SongChujun force-pushed the null-serialization-optimization branch from 595c265 to def9856 Compare September 24, 2025 14:50
@SongChujun
Copy link
Member Author

SongChujun commented Oct 13, 2025

I conducted some experiment to use some shuffle heavy queries to test if the change is able to provide better performance, but we see some regression(see #26550 (comment)), so the assumption of letting CPU perform the null values suppression and thus translating less data over the network is faster compared to not do null values suppression and translate more data over the network from #26550 (comment) is still valid. So we don't want to ship this change. Instead, we choose to use SIMD to optimize null suppression #26919. So colsing this PR.

@SongChujun SongChujun closed this Oct 13, 2025
@SongChujun
Copy link
Member Author

Attahcment:

Test query table DDL

CREATE TABLE iceberg.shuffle_test."wide_a_0.5" (
  -- ids / join keys
  id_a                BIGINT,
  k_bigint_a          BIGINT,
  k_varchar_a         VARCHAR,

  -- fixed-width & numerics
  qty_double_a        DOUBLE,
  amount_dec_a        DECIMAL(18,3),
  flag_bool_a         BOOLEAN,

  -- dates/times
  d_date_a            DATE,
  ts_a                TIMESTAMP(3),
  tsz_a               TIMESTAMP(3) WITH TIME ZONE,

  -- var-width & binary
  short_varchar_a     VARCHAR,
  long_varchar_a      VARCHAR,
  bin_a               VARBINARY,

  -- nested
  arr_bigint_a        ARRAY(BIGINT),
  map_kv_a            MAP(BIGINT, VARCHAR),
  row_a               ROW(id BIGINT, note VARCHAR)
)
WITH (
    format = 'PARQUET',
    location = 's3://athena-lakehouse-testing/iceberg/wide_a_0.5'
)

Table creation DML

INSERT INTO iceberg.shuffle_test."wide_a_0.4"
SELECT
  l.l_orderkey AS id_a,
  CASE WHEN rand() < p THEN NULL ELSE l.l_partkey END                                     AS k_bigint_a,
  CASE WHEN rand() < p THEN NULL ELSE CAST(l.l_returnflag AS VARCHAR) END                 AS k_varchar_a,
  CASE WHEN rand() < p THEN NULL ELSE CAST(l.l_quantity AS DOUBLE) END                    AS qty_double_a,
  CASE WHEN rand() < p THEN NULL
       ELSE CAST(l.l_extendedprice * (1 - l.l_discount) AS DECIMAL(18,3))
  END                                                                                     AS amount_dec_a,
  CASE WHEN rand() < p THEN NULL ELSE (l.l_returnflag = 'R') END                          AS flag_bool_a,
  CASE WHEN rand() < p THEN NULL ELSE l.l_shipdate END                                    AS d_date_a,
  CASE WHEN rand() < p THEN NULL
       ELSE (CAST(l.l_shipdate AS TIMESTAMP) + (l.l_orderkey % 86400) * INTERVAL '1' SECOND)
  END                                                                                     AS ts_a,
  CASE WHEN rand() < p THEN NULL
       ELSE (current_timestamp AT TIME ZONE 'America/New_York')
  END                                                                                     AS tsz_a,
  CASE WHEN rand() < p THEN NULL ELSE l.l_linestatus END                                  AS short_varchar_a,
  CASE WHEN rand() < p THEN NULL ELSE substr(l.l_comment, 1, 200) END                     AS long_varchar_a,
  CASE WHEN rand() < p THEN NULL ELSE to_utf8(l.l_comment) END                            AS bin_a,
  CASE WHEN rand() < p THEN NULL ELSE ARRAY[l.l_orderkey, l.l_partkey, l.l_suppkey] END   AS arr_bigint_a,
  CASE WHEN rand() < p THEN NULL ELSE MAP(ARRAY[l.l_partkey], ARRAY[substr(l.l_comment,1,40)]) END
                                                                                          AS map_kv_a,
  CASE WHEN rand() < p THEN NULL ELSE ROW(l.l_orderkey, substr(l.l_comment,1,60)) END     AS row_a
FROM tpch.sf10.lineitem l,
    (SELECT 0.4 AS p) AS params;

Test query

EXPLAIN ANALYZE
WITH src AS (
  SELECT *
  FROM iceberg.shuffle_test."wide_a_0.01"
),

-- 1) Window #1 → repartition on (k_bigint_a, k_varchar_a)
w1 AS (
  SELECT
    -- keys & ids
    id_a, k_bigint_a, k_varchar_a,

    -- original wide payload
    long_varchar_a, bin_a, arr_bigint_a, map_kv_a, row_a,

    -- (kept from your script) duplicate sets to inflate row *width*
    long_varchar_a AS long_varchar_a_dup1,
    bin_a          AS bin_a_dup1,
    arr_bigint_a   AS arr_bigint_a_dup1,
    map_kv_a       AS map_kv_a_dup1,
    row_a          AS row_a_dup1,

    long_varchar_a AS long_varchar_a_dup2,
    bin_a          AS bin_a_dup2,
    arr_bigint_a   AS arr_bigint_a_dup2,
    map_kv_a       AS map_kv_a_dup2,
    row_a          AS row_a_dup2,

    long_varchar_a AS long_varchar_a_dup3,
    bin_a          AS bin_a_dup3,
    arr_bigint_a   AS arr_bigint_a_dup3,
    map_kv_a       AS map_kv_a_dup3,
    row_a          AS row_a_dup3,

    long_varchar_a AS long_varchar_a_dup4,
    bin_a          AS bin_a_dup4,
    arr_bigint_a   AS arr_bigint_a_dup4,
    map_kv_a       AS map_kv_a_dup4,
    row_a          AS row_a_dup4,

    qty_double_a, amount_dec_a, ts_a,

    row_number() OVER (PARTITION BY k_bigint_a, k_varchar_a) AS rn1
  FROM src
),

-- 1.5) **Row duplicator** → doubles the number of rows after w1
--      (no scan increase; just duplicates each w1 row once)
w1d AS (
  SELECT w1.*, d.dupe
  FROM w1
  CROSS JOIN (VALUES 1, 2, 3) AS d(dupe)
),

-- 2) Window #2 → repartition on (bucket256, k_varchar_a)
w2 AS (
  SELECT
    w1d.*,
    (id_a % BIGINT '256') AS bucket256,
    row_number() OVER (
      PARTITION BY (id_a % BIGINT '256'), k_varchar_a
    ) AS rn2
  FROM w1d
)

-- Final projection: keep everything so planner can't drop columns
SELECT
  *
FROM w2;

Not Suppressed Result

e2e latency
0.01 47.88
0.1 44.61s
0.3 38.69

Suppressed Result

  p e2e latency
0.01 46.53
0.1 43.72s
0.3 37.02

Suppressed result still shows better performance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

3 participants