Skip to content

[FLINK] Support Apache Pulsar as a datasource #12310

Description

@zhanglistar

Description

Gluten Flink currently supports several Velox-backed datasources, such as Kafka, but Apache Pulsar is not supported yet.

This issue tracks adding Apache Pulsar as a datasource for Gluten Flink so Flink SQL streaming jobs can read from Pulsar topics through the Gluten/Velox execution path.

Motivation

Apache Pulsar is widely used as a streaming messaging and storage system. Supporting Pulsar in Gluten Flink would allow users to run Flink SQL streaming workloads on Pulsar topics with Gluten/Velox acceleration.

Example

CREATE TABLE pulsar_src (
  id INT,
  price INT,
  name STRING
) WITH (
  'connector' = 'pulsar',
  'topics' = 'persistent://public/default/gluten-pulsar-smoke',
  'service-url' = 'pulsar://127.0.0.1:6650',
  'source.subscription-name' = 'gluten-pulsar-smoke-sub',
  'source.subscription-type' = 'shared',
  'source.start.message-id' = 'earliest',
  'value.format' = 'json'
);

CREATE TABLE blackhole_sink (
  id INT,
  price INT
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT id, price FROM pulsar_src WHERE price > 10;

Proposed Scope

Detect Flink Pulsar source transformations in Gluten Flink.
Convert Flink Pulsar table/source options into Velox Pulsar connector parameters.
Support common Pulsar source options:topic/topics
service URL
subscription name
subscription type, especially shared for parallel source readers
startup position such as earliest/latest
value format such as JSON

Register the Pulsar source factory in Gluten Flink.
Add unit tests for factory discovery and option mapping.
Validate the implementation with a local Pulsar standalone smoke test.

Expected Behavior

A Flink SQL job reading from Pulsar should be able to run through Gluten/Velox.
With parallelism greater than 1, the source should support shared subscriptions so that multiple Pulsar consumers can read from the same topic without ConsumerBusy errors.

Test Plan

Add unit tests for Pulsar source factory discovery and option mapping.
Run Gluten Flink unit tests for the new Pulsar source factory.
Manually verify with local Pulsar standalone:create a Pulsar-backed Flink SQL source table
submit an insert query into a blackhole or print sink
produce records to the Pulsar topic
confirm Flink source metrics and Pulsar subscription stats show records consumed

Gluten version

None

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions