Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT source with wildcard topic may lose data. #19707

Open
KeXiangWang opened this issue Dec 7, 2024 · 1 comment
Open

MQTT source with wildcard topic may lose data. #19707

KeXiangWang opened this issue Dec 7, 2024 · 1 comment
Labels
type/bug Something isn't working
Milestone

Comments

@KeXiangWang
Copy link
Contributor

KeXiangWang commented Dec 7, 2024

Describe the bug

As titled.
Found it while investigating #19641.

Error message/log

No erro message.

To Reproduce

set sink_decouple = false;

CREATE TABLE
  personnel (id integer, name varchar);

CREATE TABLE mqtt_source_table
(
  id integer,
  name varchar
)
WITH (
    connector='mqtt',
    url='tcp://localhost:1883',
    topic= 'tele/+/SENSOR',
    qos = 'at_least_once',
    max_packet_size =  200000
) FORMAT PLAIN ENCODE JSON;

CREATE SINK mqtt_sink_1
AS
SELECT 
  id,
  name
FROM
  personnel
WITH
(
    connector='mqtt',
    url='tcp://localhost:1883',
    topic= 'tele/d1/SENSOR',
    type = 'append-only',
    retain = 'true',
    qos = 'at_least_once',
) FORMAT PLAIN ENCODE JSON (
    force_append_only='true',
);

CREATE SINK mqtt_sink_2
AS 
SELECT 
  id + 10 as id,
  name
FROM
  personnel
WITH
(
    connector='mqtt',
    url='tcp://localhost:1883',
    topic= 'tele/d2/SENSOR',
    type = 'append-only',
    retain = 'true',
    qos = 'at_least_once',
) FORMAT PLAIN ENCODE JSON (
    force_append_only='true',
);


INSERT INTO
  personnel
VALUES
  (1, 'Alice'),
  (2, 'Bob'),
  (3, 'Tom'),
  (4, 'Jerry'),
  (5, 'Araminta'),
  (6, 'Clover'),
  (7, 'Posey'),
  (8, 'Waverly');

FLUSH;

Expected behavior

You should be able to see 16 rows from select * from mqtt_source_table.
However, you can only observe only 2 rows.

How did you deploy RisingWave?

Anyway

The version of RisingWave

I guess any version. At least 2.1.0 has this problem.

Additional context

No response

@KeXiangWang KeXiangWang added the type/bug Something isn't working label Dec 7, 2024
@github-actions github-actions bot added this to the release-2.2 milestone Dec 7, 2024
@KeXiangWang
Copy link
Contributor Author

Root Cause:

Typically, our source operates as follows:

Meta: Periodically lists splits and assigns them to actors, enabling dynamic discovery of splits.
CN: Scans data from the assigned splits.
However, MQTT is a relatively primitive protocol:

  1. It lacks a list API.
  2. MQTT does not retain historical data, meaning data arriving at MQTT before your subscription won't be received. For instance, if data is sent to the MQTT server from 1 to 10 and the client subscribes before 5 arrives, the client will only receive data from 5 to 10. Enabling the retain parameter improves this slightly, capturing data from 4 to 10.

In our current MQTT soruce implementation, Meta is used to scan/receive data to detect the list of splits, which are then assigned to actors. Consequently, when CN clients start subscribing to the split topics in MQTT, many events have already occurred, resulting in the loss of the initial events.

This approach also means that the entire message stream is scanned twice—once by Meta and once by CN.

Proposed Improvement: To minimize or prevent event loss, the optimal solution is to eliminate the dynamic split discovery process and use a single actor to continuously scan the data, treating it as a regular topic without wildcard.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant