Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 234 additions & 0 deletions app/api/inbox/[address]/recount/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
// CACHE_INVARIANTS:POSTURE=no-cache
// This endpoint performs a write (stats recount) and must never be cached.

import { NextRequest, NextResponse } from "next/server";
import { getCloudflareContext } from "@opennextjs/cloudflare";
import { createLogger, createConsoleLogger, isLogsRPC } from "@/lib/logging";
import { lookupAgent } from "@/lib/agent-lookup";
import { verifyBitcoinSignature } from "@/lib/bitcoin-verify";
import { shouldFailClosed } from "@/lib/env";
import { rebuildAddressStats } from "@/lib/inbox/stats";

const RATE_LIMIT_RETRY_AFTER = 60;

/**
* Build the canonical message an agent must sign to authorize a recount.
*
* Deterministic on the BTC address — no timestamp — so agents can sign
* offline without coordinating a nonce. Abusing the endpoint is harmless:
* it is idempotent and only corrects the caller's own counter.
*
* Note: because there is no timestamp or nonce, a signature produced for this
* message is valid forever. If a recount signature is leaked (e.g., pasted
* into a public log), it remains replayable indefinitely. This is acceptable
* today because the operation has no financial side-effects and cannot cause
* data loss — replay only re-runs an idempotent repair. If the endpoint ever
* gains destructive scope or billing consequences, introduce a nonce.
*/
export function buildRecountMessage(btcAddress: string): string {
return `Inbox Recount | ${btcAddress}`;
}

/**
* POST /api/inbox/{address}/recount
*
* Self-heal endpoint for agents whose unreadCount has drifted from the
* actual number of unread messages (issue #995).
*
* Auth: Bitcoin signature (BIP-137 or BIP-322) over the message
* "Inbox Recount | {btcAddress}"
* where {btcAddress} is the canonical BTC address for the inbox being recounted.
*
* The signer must be the owner of the inbox. The endpoint recomputes received,
* unread, and sent counters from live inbox_messages rows and overwrites the
* agent_inbox_stats row atomically.
*
* Request body: { "signature": "<BIP-137 or BIP-322 base64/hex>" }
*
* Response 200: { fixed: boolean, address: string, before: {...}, after: {...} }
*
* This endpoint does NOT touch the hot-path maintained counters — it is a
* repair tool, not a replacement for the P3 O(1) counter reads on GET.
*
* See: https://github.com/aibtcdev/landing-page/issues/995
*/
export async function POST(
request: NextRequest,
{ params }: { params: Promise<{ address: string }> }
) {
const { address } = await params;
const { env, ctx } = await getCloudflareContext();
const kv = env.VERIFIED_AGENTS as KVNamespace;
const db = env.DB as D1Database | undefined;

const rayId = request.headers.get("cf-ray") || crypto.randomUUID();
const logger = isLogsRPC(env.LOGS)
? createLogger(env.LOGS, ctx, { rayId, path: request.nextUrl.pathname })
: createConsoleLogger({ rayId, path: request.nextUrl.pathname });

// IP rate-limit — runs before lookupAgent + verifyBitcoinSignature (the
// expensive paths) so a leaked replayable sig can't drive D1 costs from one IP.
// Mirrors the inbox-mark-read rate-limit ordering in [messageId]/route.ts.
const ip = request.headers.get("cf-connecting-ip") || request.headers.get("x-forwarded-for");
if (ip) {
let ipLimited = false;
try {
const result = await (env.RATE_LIMIT_MUTATING as RateLimit).limit({ key: `inbox-recount:${ip}` });
ipLimited = !result.success;
} catch (err) {
const failClosed = shouldFailClosed(env);
logger.warn("inbox.recount.rate_limit_binding_error", { error: String(err), failClosed });
if (failClosed) ipLimited = true;
}
if (ipLimited) {
logger.warn("inbox.recount.rate_limited", { ip });
return NextResponse.json(
{ error: "Too many requests from this IP. Slow down.", retryAfter: RATE_LIMIT_RETRY_AFTER },
{ status: 429, headers: { "Retry-After": String(RATE_LIMIT_RETRY_AFTER) } }
);
}
}

// D1 required — recount is a write operation
if (!db) {
return NextResponse.json(
{
error: "transient_d1_unavailable",
message: "Inbox database temporarily unavailable. Please retry shortly.",
retry_after: 5,
},
{ status: 503, headers: { "Retry-After": "5" } }
);
}

// Resolve address (BTC or STX) to canonical agent record
const agent = await lookupAgent(kv, address, db);
if (!agent) {
return NextResponse.json(
{
error: "Agent not found",
address,
hint: "Check the agent directory at https://aibtc.com/agents",
},
{ status: 404 }
);
}

// Parse request body
let body: unknown;
try {
body = await request.json();
} catch {
return NextResponse.json(
{ error: "Malformed JSON body" },
{ status: 400 }
);
}

if (!body || typeof body !== "object") {
return NextResponse.json(
{
error: "validation_failed",
errors: [
{
field: "body",
message: "Request body must be a JSON object",
hint: "Send { \"signature\": \"<BIP-137 or BIP-322 signature>\" }",
},
],
},
{ status: 400 }
);
}

const b = body as Record<string, unknown>;
if (typeof b.signature !== "string" || b.signature.trim().length === 0) {
return NextResponse.json(
{
error: "validation_failed",
errors: [
{
field: "signature",
message: "signature must be a non-empty string",
hint: `Sign the message "${buildRecountMessage(agent.btcAddress)}" with your Bitcoin private key.`,
format: "BIP-137 (base64, 88 chars) or BIP-322 (hex, 130 chars)",
},
],
},
{ status: 400 }
);
}

const expectedMessage = buildRecountMessage(agent.btcAddress);

// Verify Bitcoin signature — signer must own the inbox being recounted
let sigResult;
try {
sigResult = verifyBitcoinSignature(b.signature, expectedMessage, agent.btcAddress);
} catch (e) {
logger.warn("inbox.recount.sig_verify_failed", {
address: agent.btcAddress,
error: String(e),
});
return NextResponse.json(
{
error: `Invalid Bitcoin signature: ${(e as Error).message}`,
expectedMessage,
hint: "Sign the exact expectedMessage string with the Bitcoin key for this inbox address.",
},
{ status: 400 }
);
}

if (!sigResult.valid || sigResult.address !== agent.btcAddress) {
logger.warn("inbox.recount.sig_mismatch", {
address: agent.btcAddress,
signerAddress: sigResult.address,
});
return NextResponse.json(
{
error: "Signature verification failed: signer is not the inbox owner",
expectedSigner: agent.btcAddress,
actualSigner: sigResult.address,
expectedMessage,
},
{ status: 403 }
);
}

// Auth passed — recompute stats from live inbox_messages
let result;
try {
result = await rebuildAddressStats(db, agent.btcAddress);
} catch (e) {
logger.error("inbox.recount.rebuild_failed", {
address: agent.btcAddress,
error: String(e),
});
return NextResponse.json(
{
error: "transient_d1_unavailable",
message: "Stats recount failed. Please retry shortly.",
retry_after: 5,
},
{ status: 503, headers: { "Retry-After": "5" } }
);
}

logger.info("inbox.recount.complete", {
address: agent.btcAddress,
before: result.before,
after: result.after,
repaired: result.repaired,
});

return NextResponse.json({
fixed: result.repaired,
address: agent.btcAddress,
before: result.before,
after: result.after,
message: result.repaired
? "Stats refreshed — counters now match current message state."
: "Stats already consistent — no correction needed.",
});
}
131 changes: 131 additions & 0 deletions lib/inbox/stats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,3 +466,134 @@ export async function reconcileStats(
samples,
};
}

// ---------------------------------------------------------------------------
// Single-address repair
// ---------------------------------------------------------------------------

/** Snapshot of stats before/after a recount operation. */
export interface AddressStatsSnapshot {
receivedCount: number;
unreadCount: number;
sentCount: number;
}

/** Result of rebuildAddressStats() — before/after values for the caller. */
export interface RebuildAddressResult {
before: AddressStatsSnapshot;
after: AddressStatsSnapshot;
repaired: boolean;
}

/**
* Recompute stats for a single address from live inbox_messages rows and
* overwrite the agent_inbox_stats counter.
*
* Designed for the self-heal endpoint — callers have already authenticated
* ownership of the address before invoking this.
*
* Returns before/after snapshots so the caller can report the delta and
* determine whether any counters changed.
*
* Idempotent: safe to call repeatedly. If counters are already correct,
* `repaired` is false and before === after.
*
* ## SQL filter alignment with maintained counters
*
* The filters here mirror rebuildAllStats exactly — they are the single-address
* projection of the same aggregate queries:
* - received: `is_reply = 0 AND to_btc_address = ?` (matches bumpInboundStats call-site)
* - sent: `is_reply = 1 AND from_btc_address = ?` (matches bumpSentStats call-site)
*
* All three counters (received, unread, sent) are recomputed defensively —
* drift can affect any of them, not just unread. In the common case (issue #995)
* only unread drifts, but recount-all-3 is the right repair shape.
*
* ## Race window note
*
* The `before` snapshot is captured from agent_inbox_stats immediately before
* the live COUNT(*) queries run. A message arriving in that gap is counted in
* `after` but not in `before` — `repaired` may fire for what is actually new
* normal delivery activity. The repair itself is still correct; only the
* `repaired=true` diagnostic can be a false positive in that window.
*/
export async function rebuildAddressStats(
db: D1Database,
btcAddress: string
): Promise<RebuildAddressResult> {
const now = new Date().toISOString();

// Read current stored values before repair
const before = await getAgentInboxStats(db, btcAddress);
const beforeSnapshot: AddressStatsSnapshot = {
receivedCount: before.receivedCount,
unreadCount: before.unreadCount,
sentCount: before.sentCount,
};

// Aggregate actual inbound + sent counts in parallel — queries are independent
const [inboundRow, sentRow] = await Promise.all([
db
.prepare(
`SELECT
COUNT(*) AS received_count,
COUNT(CASE WHEN read_at IS NULL THEN 1 END) AS unread_count,
MAX(sent_at) AS last_message_at
FROM inbox_messages
WHERE is_reply = 0 AND to_btc_address = ?`
)
.bind(btcAddress)
.first<{
received_count: number;
unread_count: number;
last_message_at: string | null;
}>(),
db
.prepare(
`SELECT
COUNT(*) AS sent_count,
MAX(sent_at) AS last_sent_at
FROM inbox_messages
WHERE is_reply = 1 AND from_btc_address = ?`
)
.bind(btcAddress)
.first<{ sent_count: number; last_sent_at: string | null }>(),
]);

const newReceived = inboundRow?.received_count ?? 0;
const newUnread = inboundRow?.unread_count ?? 0;
const newSent = sentRow?.sent_count ?? 0;
const lastMessageAt = inboundRow?.last_message_at ?? null;
const lastSentAt = sentRow?.last_sent_at ?? null;

// Upsert the corrected row
await db
.prepare(
`INSERT INTO agent_inbox_stats
(btc_address, received_count, unread_count, sent_count,
last_message_at, last_sent_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(btc_address) DO UPDATE SET
received_count = excluded.received_count,
unread_count = excluded.unread_count,
sent_count = excluded.sent_count,
last_message_at = excluded.last_message_at,
last_sent_at = excluded.last_sent_at,
updated_at = excluded.updated_at`
)
.bind(btcAddress, newReceived, newUnread, newSent, lastMessageAt, lastSentAt, now)
.run();

const afterSnapshot: AddressStatsSnapshot = {
receivedCount: newReceived,
unreadCount: newUnread,
sentCount: newSent,
};

const repaired =
beforeSnapshot.unreadCount !== afterSnapshot.unreadCount ||
beforeSnapshot.receivedCount !== afterSnapshot.receivedCount ||
beforeSnapshot.sentCount !== afterSnapshot.sentCount;

return { before: beforeSnapshot, after: afterSnapshot, repaired };
}