Skip to content

feat: add HTTP polling DPU transport#206

Open
avikalpg wants to merge 2 commits into
mainfrom
nia/http-polling-transport
Open

feat: add HTTP polling DPU transport#206
avikalpg wants to merge 2 commits into
mainfrom
nia/http-polling-transport

Conversation

@avikalpg

@avikalpg avikalpg commented May 24, 2026

Copy link
Copy Markdown
Member

Summary\n- add HTTP polling transport for self-hosted DPUs via DPU_QUEUE_TRANSPORT=http\n- reuse the existing DPU message dispatcher for install_callback, webhook_callback, manual_trigger, and PATSetup\n- add GHCR Docker publishing for ghcr.io/vibinex/dpu: and latest\n- update Dockerfile/README for release binary and HTTP queue envs\n\n## Paired PRs\n- Server queue/API: https://github.com/vibinex/vibinex-server/pull/444\n- Parent submodule sync: https://github.com/vibinex/vibinex/pull/3\n\n## Verification\n- cargo check\n\n## Notes\n- cargo check passes with existing repo warnings.\n- cargo fmt --check is not a usable whole-repo gate yet because the existing codebase is broadly unformatted; I did not run rustfmt over unrelated files.\n- Docker runtime smoke is blocked locally because this user cannot access /var/run/docker.sock. The new GHCR workflow will validate image build in CI.

Summary by CodeRabbit

  • New Features

    • Added support for HTTP-based queue transport as an alternative to the existing Pub/Sub messaging system.
    • Enabled automated Docker image publishing to GitHub Container Registry (GHCR) on code pushes.
  • Documentation

    • Updated setup instructions with HTTP queue transport configuration options and examples.
  • Chores

    • Optimized Docker image builds to use release binaries for improved performance.

Review Change Stack

@coderabbitai

coderabbitai Bot commented May 24, 2026

Copy link
Copy Markdown
Contributor

Walkthrough

This PR introduces HTTP queue transport as an alternative to Pub/Sub for job polling, implements a configurable transport selection mechanism in the application startup, and establishes Docker image building and publishing infrastructure via GitHub Actions with updated container configuration documentation.

Changes

HTTP Queue Transport Support

Layer / File(s) Summary
HTTP listener types and module structure
vibi-dpu/src/http_queue/listener.rs, vibi-dpu/src/http_queue/mod.rs, vibi-dpu/src/pubsub/listener.rs
HTTP request/response payload structures (ClaimRequest, ClaimResponse, DpuJob, AckRequest, FailRequest), configuration constants for polling interval and lease duration, and public module exports. Pubsub process_message is made public to enable shared job payload processing across both HTTP and Pub/Sub transports.
HTTP listener polling and job operations
vibi-dpu/src/http_queue/listener.rs
Main poll_messages loop reads environment configuration with defaults, normalizes server URL, and repeatedly calls claim_job with configurable sleep between attempts. Helper functions perform HTTP POSTs with bearer auth to claim jobs, acknowledge processed jobs, and trigger job failure with retry semantics. Processing serializes job payloads to bytes, calls process_message, then acks on success or triggers best-effort job failure on serialization errors.
Application transport selection
vibi-dpu/src/main.rs
Application startup reads DPU_QUEUE_TRANSPORT environment variable (default pubsub) and INSTALL_ID as required variables. When transport is http, requires SERVER_URL and DPU_AUTH_TOKEN and invokes HTTP listener. Otherwise requires GCP_CREDENTIALS and invokes Pub/Sub listener. Stored-auth fallback path remains intact.

Docker and Deployment Infrastructure

Layer / File(s) Summary
Dockerfile configuration and documentation
Dockerfile, README.md
Dockerfile declares build arguments for DPU queue transport, auth token, poll interval, and job lease seconds, promotes them to runtime environment variables, and switches application binary from debug to release build. README documents Rust release build before Docker image build, provides Docker build command with DPU_QUEUE_TRANSPORT=http argument, and expands Docker run instructions to a multi-line command that passes all required environment variables (DPU_QUEUE_TRANSPORT, INSTALL_ID, SERVER_URL, DPU_AUTH_TOKEN).
GitHub Actions GHCR publishing workflow
.github/workflows/docker-ghcr.yml
New workflow triggered on main pushes and pull requests checks out code, installs stable Rust toolchain, builds release binary from vibi-dpu/, conditionally logs into GHCR only on push, and uses docker/build-push-action to build and conditionally push images tagged with commit SHA and latest to ghcr.io/vibinex/dpu.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The pull request description is incomplete; it lacks required checklist items from the template (Build running, Eventing functional, Tests added, Manual QA) and only mentions cargo check verification. Complete the provided checklist by checking off items that were verified or explicitly documenting which items were not applicable and why (e.g., if manual QA was deferred, state this clearly).
Docstring Coverage ⚠️ Warning Docstring coverage is 11.11% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The pull request title accurately and concisely summarizes the main change: adding HTTP polling as a new DPU transport option.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch nia/http-polling-transport

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (1)
README.md (1)

42-47: ⚡ Quick win

Prefer --env-file for DPU_AUTH_TOKEN instead of inline -e.

Inline secrets in the run command are easy to leak via shell history. Recommend documenting an env file for runtime secrets.

Suggested docs tweak
-    docker run \
-      -e DPU_QUEUE_TRANSPORT=http \
-      -e INSTALL_ID=your-install-id \
-      -e SERVER_URL=your-server-url \
-      -e DPU_AUTH_TOKEN=your-shared-dpu-token \
-      dpu
+    # .env.dpu
+    # DPU_AUTH_TOKEN=your-shared-dpu-token
+
+    docker run \
+      --env-file .env.dpu \
+      -e DPU_QUEUE_TRANSPORT=http \
+      -e INSTALL_ID=your-install-id \
+      -e SERVER_URL=your-server-url \
+      dpu
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@README.md` around lines 42 - 47, Update the README docker run instructions to
avoid inline secrets by recommending Docker's --env-file usage instead of
passing DPU_AUTH_TOKEN with -e; remove the inline -e DPU_AUTH_TOKEN from the
example and add guidance to create an env file (e.g., containing DPU_AUTH_TOKEN,
INSTALL_ID, SERVER_URL, DPU_QUEUE_TRANSPORT) and then pass that file to docker
run via --env-file so runtime secrets are not stored in shell history. Also
mention preserving other non-secret vars inline is fine and include a short note
about file permissions for the env file.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In @.github/workflows/docker-ghcr.yml:
- Around line 18-20: The workflow uses mutable action references; replace
"actions/checkout@v4" and "dtolnay/rust-toolchain@stable" with their
corresponding immutable commit SHAs (full @<sha> refs) and harden the checkout
step by adding persist-credentials: false to the actions/checkout step
configuration; update the workflow so the checkout step (identified by
actions/checkout) includes persist-credentials: false and the rust toolchain
step (identified by dtolnay/rust-toolchain) is pinned to its full commit SHA.

In `@Dockerfile`:
- Line 14: The Dockerfile currently declares the secret as an image build arg
(ARG DPU_AUTH_TOKEN) which bakes the secret into image metadata/history; remove
the ARG (and any corresponding ENV) lines referring to DPU_AUTH_TOKEN and stop
passing it during build. Instead supply the token at runtime (e.g., via docker
run -e / orchestrator secret mount) or, if needed during build, use a build-time
secret mechanism (Docker BuildKit --secret or similar) so the token never ends
up in the image metadata; update Dockerfile references to read the value from
the runtime environment or a mounted secret and remove all occurrences of
DPU_AUTH_TOKEN from build-time ARG/ENV declarations.

In `@vibi-dpu/src/http_queue/listener.rs`:
- Around line 138-141: The current flow ACKs jobs immediately after calling
process_message(&attributes, &msg_bytes) which can ack before async-spawned
background work finishes; change process_message to return a Result (or enum)
indicating final success/failure for ack-critical message types and update its
call site to await and inspect that result before calling ack_job(client,
server_url, installation_id, auth_token, &job.id); also remove or convert
detached tokio::spawn usage inside process_message for ack-critical handlers so
work is awaited (or propagate a JoinHandle/oneshot that you await) and only call
ack_job on Ok, calling the existing log/error path on Err.
- Line 52: Set an explicit request timeout when creating the HTTP polling client
instead of Client::new() (e.g., use
reqwest::Client::builder().timeout(Duration::from_secs(...)).build() and replace
uses like claim_job(...).send().await so the polling loop cannot hang
indefinitely). Also ensure ACK is sent only after all spawned background work
finishes: in process_job/process_message where you currently call
task::spawn(...) for webhook_callback/process_review and install_callback
handlers in pubsub::listener.rs, capture the JoinHandles and await them (or use
tokio::spawn and await the JoinHandle) before sending the ACK so background work
completes prior to acknowledging the message.

In `@vibi-dpu/src/main.rs`:
- Line 56: The debug line in main is logging sensitive GCP credential material
via gcp_credentials; remove gcp_credentials from the log and instead log only
non-secret context (e.g., installation_id) or a safe indicator (e.g.,
"gcp_credentials=REDACTED" or a boolean/length check). Update the log::debug!
call in main to avoid including the gcp_credentials variable (or replace it with
a redacted placeholder) and ensure any future logs referencing gcp_credentials
use the same redaction approach.
- Around line 50-55: The code currently treats any non-"http"
DPU_QUEUE_TRANSPORT as Pub/Sub which hides config typos; update the startup
logic that reads queue_transport to normalize (e.g., to_lowercase) and use a
match/if chain that explicitly handles "http" and "pubsub" values, calling
http_queue::listener::poll_messages(...) for "http" and the existing GCP/pubsub
branch (the branch that reads GCP_CREDENTIALS) for "pubsub", and for any other
value log a clear error including the invalid value and call
std::process::exit(1) (or return Err) to fail fast so typos don’t silently pick
the wrong transport.

---

Nitpick comments:
In `@README.md`:
- Around line 42-47: Update the README docker run instructions to avoid inline
secrets by recommending Docker's --env-file usage instead of passing
DPU_AUTH_TOKEN with -e; remove the inline -e DPU_AUTH_TOKEN from the example and
add guidance to create an env file (e.g., containing DPU_AUTH_TOKEN, INSTALL_ID,
SERVER_URL, DPU_QUEUE_TRANSPORT) and then pass that file to docker run via
--env-file so runtime secrets are not stored in shell history. Also mention
preserving other non-secret vars inline is fine and include a short note about
file permissions for the env file.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 0657469b-17dd-4238-b79c-5b5f81215bcf

📥 Commits

Reviewing files that changed from the base of the PR and between 59fd2d0 and a53a52f.

📒 Files selected for processing (7)
  • .github/workflows/docker-ghcr.yml
  • Dockerfile
  • README.md
  • vibi-dpu/src/http_queue/listener.rs
  • vibi-dpu/src/http_queue/mod.rs
  • vibi-dpu/src/main.rs
  • vibi-dpu/src/pubsub/listener.rs

Comment on lines +18 to +20
- uses: actions/checkout@v4

- uses: dtolnay/rust-toolchain@stable

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify workflow actions are pinned to immutable SHAs
rg -nP '^\s*-\s*uses:\s*[^@\s]+@(?!(?:[a-f0-9]{40})$)' .github/workflows

# Verify checkout hardening includes persist-credentials: false
rg -nP 'uses:\s*actions/checkout@' .github/workflows -A5 -B1

Repository: vibinex/vibi-dpu

Length of output: 1469


Pin movable GitHub Actions in docker-ghcr.yml and harden checkout.

.github/workflows/docker-ghcr.yml uses actions/checkout@v4 and dtolnay/rust-toolchain@stable, which are not pinned to immutable commit SHAs; update them to full SHAs and set persist-credentials: false on the actions/checkout step.

Suggested hardening patch
-      - uses: actions/checkout@v4
+      - uses: actions/checkout@<FULL_LENGTH_COMMIT_SHA>
+        with:
+          persist-credentials: false

-      - uses: dtolnay/rust-toolchain@stable
+      - uses: dtolnay/rust-toolchain@<FULL_LENGTH_COMMIT_SHA>
🧰 Tools
🪛 zizmor (1.25.2)

[warning] 18-18: credential persistence through GitHub Actions artifacts (artipacked): does not set persist-credentials: false

(artipacked)


[error] 18-18: unpinned action reference (unpinned-uses): action is not pinned to a hash (required by blanket policy)

(unpinned-uses)


[error] 20-20: unpinned action reference (unpinned-uses): action is not pinned to a hash (required by blanket policy)

(unpinned-uses)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @.github/workflows/docker-ghcr.yml around lines 18 - 20, The workflow uses
mutable action references; replace "actions/checkout@v4" and
"dtolnay/rust-toolchain@stable" with their corresponding immutable commit SHAs
(full @<sha> refs) and harden the checkout step by adding persist-credentials:
false to the actions/checkout step configuration; update the workflow so the
checkout step (identified by actions/checkout) includes persist-credentials:
false and the rust toolchain step (identified by dtolnay/rust-toolchain) is
pinned to its full commit SHA.

Comment thread Dockerfile
ARG TOPIC_NAME
ARG SUBSCRIPTION_NAME
ARG DPU_QUEUE_TRANSPORT
ARG DPU_AUTH_TOKEN

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Do not bake DPU_AUTH_TOKEN into image build args/env.

DPU_AUTH_TOKEN is a secret; keeping it as ARG/ENV can expose it through image metadata/history. Keep it runtime-only.

Suggested fix
-ARG DPU_AUTH_TOKEN
@@
-ENV DPU_AUTH_TOKEN=$DPU_AUTH_TOKEN

Also applies to: 34-34

🧰 Tools
🪛 Trivy (0.69.3)

[error] 14-14: Secrets passed via build-args or envs or copied secret files

Possible exposure of secret env "DPU_AUTH_TOKEN" in ARG

Rule: DS-0031

Learn more

(IaC/Dockerfile)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@Dockerfile` at line 14, The Dockerfile currently declares the secret as an
image build arg (ARG DPU_AUTH_TOKEN) which bakes the secret into image
metadata/history; remove the ARG (and any corresponding ENV) lines referring to
DPU_AUTH_TOKEN and stop passing it during build. Instead supply the token at
runtime (e.g., via docker run -e / orchestrator secret mount) or, if needed
during build, use a build-time secret mechanism (Docker BuildKit --secret or
similar) so the token never ends up in the image metadata; update Dockerfile
references to read the value from the runtime environment or a mounted secret
and remove all occurrences of DPU_AUTH_TOKEN from build-time ARG/ENV
declarations.

}

pub async fn poll_messages(server_url: &str, installation_id: &str, auth_token: &str) {
let client = Client::new();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Locate the file and print the relevant section with line numbers.
ls -la vibi-dpu/src/http_queue/listener.rs
nl -ba vibi-dpu/src/http_queue/listener.rs | sed -n '1,220p'

Repository: vibinex/vibi-dpu

Length of output: 206


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="vibi-dpu/src/http_queue/listener.rs"
echo "=== File: $FILE ==="
cat -n "$FILE" | sed -n '1,220p'

echo
echo "=== Search for Client::new and timeout-related usage ==="
rg -n "Client::new|Client::builder|connect_timeout|timeout\(" "$FILE" || true

echo
echo "=== Search for ACK call sites ==="
rg -n "ack|ACK|Ack" "$FILE" || true

echo
echo "=== Search for process_job / process_message wiring ==="
rg -n "process_job|process_message" "$FILE" || true

Repository: vibinex/vibi-dpu

Length of output: 6324


🏁 Script executed:

#!/bin/bash
set -euo pipefail

rg -n "process_message" -S vibi-dpu/src | head -n 50

echo "----"
rg -n "async fn process_message" -S vibi-dpu/src || true

echo "----"
# Show the file(s) that define process_message
files=$(rg -l "process_message" vibi-dpu/src || true)
echo "$files" | sed -n '1,120p'

Repository: vibinex/vibi-dpu

Length of output: 1049


🏁 Script executed:

#!/bin/bash
set -euo pipefail
FILE="vibi-dpu/src/pubsub/listener.rs"
echo "=== $FILE (first 260 lines) ==="
cat -n "$FILE" | sed -n '1,260p'

echo
echo "=== Search for spawn/task usage near process_message ==="
rg -n "spawn|tokio::spawn|JoinHandle|task::spawn|async move" "$FILE" || true

echo
echo "=== Search for webhook callback spawn/async patterns ==="
rg -n "webhook_callback|callback" "$FILE" || true

Repository: vibinex/vibi-dpu

Length of output: 11471


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find reqwest version in Cargo.toml files
rg -n "reqwest\s*=" -S . || true
rg -n "\[dependencies\][^\[]*reqwest" -S . || true
rg -n "reqwest::" vibi-dpu/Cargo.toml . || true

# Show the crate's Cargo.toml section if present
ls -la vibi-dpu/Cargo.toml || true
[ -f vibi-dpu/Cargo.toml ] && sed -n '1,220p' vibi-dpu/Cargo.toml || true

Repository: vibinex/vibi-dpu

Length of output: 3494


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="vibi-dpu/src/utils/reqwest_client.rs"
ls -la "$FILE" || true
cat -n "$FILE" | sed -n '1,200p'

echo
echo "=== Search for connect_timeout/timeout usage across vibi-dpu/src ==="
rg -n "connect_timeout|\.timeout\(|request_timeout|read_timeout|write_timeout" vibi-dpu/src || true

Repository: vibinex/vibi-dpu

Length of output: 506


Set explicit timeouts on the polling client (and ensure ACK isn’t sent before spawned work finishes).

  • vibi-dpu/src/http_queue/listener.rs: let client = Client::new(); has no timeout, so claim_job(...).send().await/response parsing can hang and stall the polling loop.
  • vibi-dpu/src/http_queue/listener.rs: process_job ACKs after process_message returns, but vibi-dpu/src/pubsub/listener.rs spawns background tasks for webhook_callback (process_review) and install_callback (install handlers) via task::spawn(...) without awaiting—so ACK may happen before that work completes.
Proposed fix
-use reqwest::Client;
+use reqwest::Client;
 use serde::{Deserialize, Serialize};
 use serde_json::Value;
 use std::{collections::HashMap, time::Duration};
@@
 pub async fn poll_messages(server_url: &str, installation_id: &str, auth_token: &str) {
-	let client = Client::new();
+	let client = Client::builder()
+		.connect_timeout(Duration::from_secs(10))
+		.timeout(Duration::from_secs(30))
+		.build()
+		.expect("Failed to build HTTP client");
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let client = Client::new();
let client = Client::builder()
.connect_timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(30))
.build()
.expect("Failed to build HTTP client");
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@vibi-dpu/src/http_queue/listener.rs` at line 52, Set an explicit request
timeout when creating the HTTP polling client instead of Client::new() (e.g.,
use reqwest::Client::builder().timeout(Duration::from_secs(...)).build() and
replace uses like claim_job(...).send().await so the polling loop cannot hang
indefinitely). Also ensure ACK is sent only after all spawned background work
finishes: in process_job/process_message where you currently call
task::spawn(...) for webhook_callback/process_review and install_callback
handlers in pubsub::listener.rs, capture the JoinHandles and await them (or use
tokio::spawn and await the JoinHandle) before sending the ACK so background work
completes prior to acknowledging the message.

Comment on lines +138 to +141
process_message(&attributes, &msg_bytes).await;
if let Err(err) = ack_job(client, server_url, installation_id, auth_token, &job.id).await {
log::error!("[http_queue] Failed to ack job {}: {:?}", job.id, err);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Avoid ACK-before-completion for async-spawned handlers.

At Line 138 and Line 139, jobs are ACKed immediately after process_message(...). For message types that spawn background tasks, this ACK can happen before actual processing finishes, causing silent job loss on later task failure.

Refactor direction
-	process_message(&attributes, &msg_bytes).await;
-	if let Err(err) = ack_job(client, server_url, installation_id, auth_token, &job.id).await {
+	let processing_ok = process_message(&attributes, &msg_bytes).await;
+	if processing_ok {
+		if let Err(err) = ack_job(client, server_url, installation_id, auth_token, &job.id).await {
+			log::error!("[http_queue] Failed to ack job {}: {:?}", job.id, err);
+		}
+	} else if let Err(err) = fail_job(
+		client,
+		server_url,
+		installation_id,
+		auth_token,
+		&job.id,
+		"processing failed",
+		true,
+	).await {
+		log::error!("[http_queue] Failed to fail job {}: {:?}", job.id, err);
+	}
-		log::error!("[http_queue] Failed to ack job {}: {:?}", job.id, err);
-	}

This requires process_message to return a success/failure signal and avoiding detached processing for ack-critical paths.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@vibi-dpu/src/http_queue/listener.rs` around lines 138 - 141, The current flow
ACKs jobs immediately after calling process_message(&attributes, &msg_bytes)
which can ack before async-spawned background work finishes; change
process_message to return a Result (or enum) indicating final success/failure
for ack-critical message types and update its call site to await and inspect
that result before calling ack_job(client, server_url, installation_id,
auth_token, &job.id); also remove or convert detached tokio::spawn usage inside
process_message for ack-critical handlers so work is awaited (or propagate a
JoinHandle/oneshot that you await) and only call ack_job on Ok, calling the
existing log/error path on Err.

Comment thread vibi-dpu/src/main.rs
Comment on lines +50 to +55
if queue_transport.eq_ignore_ascii_case("http") {
let server_url = env::var("SERVER_URL").expect("SERVER_URL must be set for HTTP DPU queue transport");
let dpu_auth_token = env::var("DPU_AUTH_TOKEN").expect("DPU_AUTH_TOKEN must be set for HTTP DPU queue transport");
http_queue::listener::poll_messages(&server_url, &installation_id, &dpu_auth_token).await;
} else {
let gcp_credentials = env::var("GCP_CREDENTIALS").expect("GCP_CREDENTIALS must be set");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Fail fast on invalid DPU_QUEUE_TRANSPORT values.

Current if/else treats any non-http value as pubsub. Typos in config silently route to the wrong transport.

Proposed fix
-	if queue_transport.eq_ignore_ascii_case("http") {
-		let server_url = env::var("SERVER_URL").expect("SERVER_URL must be set for HTTP DPU queue transport");
-		let dpu_auth_token = env::var("DPU_AUTH_TOKEN").expect("DPU_AUTH_TOKEN must be set for HTTP DPU queue transport");
-		http_queue::listener::poll_messages(&server_url, &installation_id, &dpu_auth_token).await;
-	} else {
-		let gcp_credentials = env::var("GCP_CREDENTIALS").expect("GCP_CREDENTIALS must be set");
+	match queue_transport.to_ascii_lowercase().as_str() {
+		"http" => {
+			let server_url = env::var("SERVER_URL").expect("SERVER_URL must be set for HTTP DPU queue transport");
+			let dpu_auth_token = env::var("DPU_AUTH_TOKEN").expect("DPU_AUTH_TOKEN must be set for HTTP DPU queue transport");
+			http_queue::listener::poll_messages(&server_url, &installation_id, &dpu_auth_token).await;
+		}
+		"pubsub" => {
+			let gcp_credentials = env::var("GCP_CREDENTIALS").expect("GCP_CREDENTIALS must be set");
 		log::debug!("[main] env vars = {}, {}", &gcp_credentials, &installation_id);
 		pubsub::listener::listen_messages(
 			&gcp_credentials,
 			&installation_id,
 		).await;
-	}
+		}
+		other => panic!("Unsupported DPU_QUEUE_TRANSPORT: {other}. Use 'pubsub' or 'http'."),
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if queue_transport.eq_ignore_ascii_case("http") {
let server_url = env::var("SERVER_URL").expect("SERVER_URL must be set for HTTP DPU queue transport");
let dpu_auth_token = env::var("DPU_AUTH_TOKEN").expect("DPU_AUTH_TOKEN must be set for HTTP DPU queue transport");
http_queue::listener::poll_messages(&server_url, &installation_id, &dpu_auth_token).await;
} else {
let gcp_credentials = env::var("GCP_CREDENTIALS").expect("GCP_CREDENTIALS must be set");
match queue_transport.to_ascii_lowercase().as_str() {
"http" => {
let server_url = env::var("SERVER_URL").expect("SERVER_URL must be set for HTTP DPU queue transport");
let dpu_auth_token = env::var("DPU_AUTH_TOKEN").expect("DPU_AUTH_TOKEN must be set for HTTP DPU queue transport");
http_queue::listener::poll_messages(&server_url, &installation_id, &dpu_auth_token).await;
}
"pubsub" => {
let gcp_credentials = env::var("GCP_CREDENTIALS").expect("GCP_CREDENTIALS must be set");
log::debug!("[main] env vars = {}, {}", &gcp_credentials, &installation_id);
pubsub::listener::listen_messages(
&gcp_credentials,
&installation_id,
).await;
}
other => panic!("Unsupported DPU_QUEUE_TRANSPORT: {other}. Use 'pubsub' or 'http'."),
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@vibi-dpu/src/main.rs` around lines 50 - 55, The code currently treats any
non-"http" DPU_QUEUE_TRANSPORT as Pub/Sub which hides config typos; update the
startup logic that reads queue_transport to normalize (e.g., to_lowercase) and
use a match/if chain that explicitly handles "http" and "pubsub" values, calling
http_queue::listener::poll_messages(...) for "http" and the existing GCP/pubsub
branch (the branch that reads GCP_CREDENTIALS) for "pubsub", and for any other
value log a clear error including the invalid value and call
std::process::exit(1) (or return Err) to fail fast so typos don’t silently pick
the wrong transport.

Comment thread vibi-dpu/src/main.rs
http_queue::listener::poll_messages(&server_url, &installation_id, &dpu_auth_token).await;
} else {
let gcp_credentials = env::var("GCP_CREDENTIALS").expect("GCP_CREDENTIALS must be set");
log::debug!("[main] env vars = {}, {}", &gcp_credentials, &installation_id);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Do not log credential material.

Line 56 logs gcp_credentials; if this env var carries raw JSON/key content, secrets are exposed in logs.

Proposed fix
-		log::debug!("[main] env vars = {}, {}", &gcp_credentials, &installation_id);
+		log::debug!("[main] PubSub transport selected for installation_id={}", &installation_id);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
log::debug!("[main] env vars = {}, {}", &gcp_credentials, &installation_id);
log::debug!("[main] PubSub transport selected for installation_id={}", &installation_id);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@vibi-dpu/src/main.rs` at line 56, The debug line in main is logging sensitive
GCP credential material via gcp_credentials; remove gcp_credentials from the log
and instead log only non-secret context (e.g., installation_id) or a safe
indicator (e.g., "gcp_credentials=REDACTED" or a boolean/length check). Update
the log::debug! call in main to avoid including the gcp_credentials variable (or
replace it with a redacted placeholder) and ensure any future logs referencing
gcp_credentials use the same redaction approach.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant