Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 191 additions & 0 deletions apps/web/app/api/automation/ai/ask/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import { NextResponse } from 'next/server';
import { z } from 'zod';
import { tool, stepCountIs } from 'ai';
import { ErrorCodes } from '@/lib/errors';
import { ResponseUtil } from '@/lib/result';
import { generateText } from '@/lib/ai/gateway';
import { getEffectiveModelBundle } from '@/lib/ai/model';
import { buildSchemaContext, getDefaultSchemaSampleLimits } from '@/lib/ai/prompts/contexts/schema';
import { SYSTEM_PROMPT } from '@/lib/ai/prompts/system/core';
import { ensureConnectionPoolForUser } from '@/app/api/connection/utils';
import { isMissingAiEnvError } from '@/lib/ai/errors';
import { withAutomationHandler } from '../../with-automation-handler';
import { isReadOnlyQuery, AI_ROW_LIMIT } from '../../utils';

/**
* POST /api/automation/ai/ask
*
* Ask a natural language question about your data.
* AI will generate SQL, execute it, and return the answer.
*
* Body:
* {
* "connectionId": "xxx",
* "question": "Top 10 slow queries yesterday",
* "database": "mydb" // optional
* }
*/
export const POST = withAutomationHandler(async ({ req, userId, organizationId }) => {
const body = await req.json();
const { connectionId, question, database } = body;

if (!connectionId) {
return NextResponse.json(
ResponseUtil.error({
code: ErrorCodes.INVALID_PARAMS,
message: 'Missing required field: connectionId',
}),
{ status: 400 },
);
}

if (!question || typeof question !== 'string' || !question.trim()) {
return NextResponse.json(
ResponseUtil.error({
code: ErrorCodes.INVALID_PARAMS,
message: 'Missing required field: question',
}),
{ status: 400 },
);
}

// Verify connection exists and is accessible
try {
await ensureConnectionPoolForUser(userId, organizationId, connectionId, null);
} catch {
return NextResponse.json(
ResponseUtil.error({
code: ErrorCodes.NOT_FOUND,
message: 'Connection not found or could not be established.',
}),
{ status: 404 },
);
}

// Build schema context for AI
const defaults = getDefaultSchemaSampleLimits();
const schemaContext = await buildSchemaContext({
userId,
organizationId,
datasourceId: connectionId,
database,
tableSampleLimit: defaults.table,
columnSampleLimit: defaults.column,
});

// Build SQL runner tool for AI to use
const sqlResults: Array<{ sql: string; rows: unknown[]; columns: unknown; rowCount: number; error?: string }> = [];

const sqlRunner = tool({
description: 'Execute a read-only SQL query against the database and return the results.',
inputSchema: z.object({
sql: z.string().min(1, 'SQL query is required'),
database: z.string().optional(),
}),
execute: async ({ sql, database: db }) => {
const trimmed = sql.trim();
if (!isReadOnlyQuery(trimmed)) {
const errorResult = {
ok: false,
error: 'Only read-only queries (SELECT, SHOW, DESCRIBE, EXPLAIN) are allowed.',
sql: trimmed,
};
sqlResults.push({ sql: trimmed, rows: [], columns: null, rowCount: 0, error: errorResult.error });
return errorResult;
}

try {
const { entry } = await ensureConnectionPoolForUser(userId, organizationId, connectionId, null);
const result = await entry.instance.queryWithContext(trimmed, {
database: db ?? database,
});

const rows = Array.isArray(result.rows) ? result.rows.slice(0, AI_ROW_LIMIT) : [];
const columns = result.columns ?? null;
const rowCount = result.rowCount ?? rows.length;

sqlResults.push({ sql: trimmed, rows, columns, rowCount });

return {
ok: true,
columns,
rows,
rowCount,
limited: rows.length >= AI_ROW_LIMIT,
};
} catch (err: any) {
const errorMsg = String(err?.message || err);
sqlResults.push({ sql: trimmed, rows: [], columns: null, rowCount: 0, error: errorMsg });
return {
ok: false,
error: errorMsg,
sql: trimmed,
};
}
},
});

// Build system prompt
const schemaSection = schemaContext ? `\nSchema Context:\n${schemaContext}` : '';
const systemPrompt = [
SYSTEM_PROMPT.trim(),
'You have access to a sqlRunner tool to execute read-only SQL queries.',
'When the user asks a data question, generate appropriate SQL and execute it using the tool.',
'After getting results, provide a clear, concise answer based on the data.',
schemaSection,
]
.filter(Boolean)
.join('\n\n');

try {
const { model, modelName } = getEffectiveModelBundle('chat');

const result = await generateText({
model,
system: systemPrompt,
messages: [{ role: 'user' as const, content: question.trim() }],
tools: { sqlRunner },
toolChoice: 'auto',
stopWhen: stepCountIs(4),
context: {
organizationId,
userId,
feature: 'automation_ai_ask',
model: modelName,
},
});

return NextResponse.json(
ResponseUtil.success({
answer: result.text,
sqlResults,
usage: result.usage
? {
inputTokens: result.usage.inputTokens,
outputTokens: result.usage.outputTokens,
totalTokens: result.usage.totalTokens,
}
: null,
}),
);
} catch (error: any) {
if (isMissingAiEnvError(error)) {
return NextResponse.json(
ResponseUtil.error({
code: ErrorCodes.ERROR,
message: 'AI service is not configured. Set DORY_AI_PROVIDER and related environment variables.',
}),
{ status: 503 },
);
}

console.error('[automation/ai/ask] failed', error);
return NextResponse.json(
ResponseUtil.error({
code: ErrorCodes.ERROR,
message: error?.message ?? 'AI request failed',
}),
{ status: 500 },
);
}
});
25 changes: 25 additions & 0 deletions apps/web/app/api/automation/connections/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { NextResponse } from 'next/server';
import { ResponseUtil } from '@/lib/result';
import { withAutomationHandler } from '../with-automation-handler';
import { ensureDemoConnection } from '@/lib/demo/ensure-demo-connection';

/**
* GET /api/automation/connections
*
* List all database connections for the organization.
* Returns metadata only (no passwords or secrets).
*/
export const GET = withAutomationHandler(async ({ db, userId, organizationId }) => {
let data = await db.connections.list(organizationId);

if (data.length === 0) {
try {
await ensureDemoConnection(db, userId, organizationId);
data = await db.connections.list(organizationId);
} catch (err) {
console.warn('[automation/connections] failed to create demo connection:', err);
}
}

return NextResponse.json(ResponseUtil.success(data));
});
138 changes: 138 additions & 0 deletions apps/web/app/api/automation/query/execute/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import { NextResponse } from 'next/server';
import { ErrorCodes } from '@/lib/errors';
import { ResponseUtil } from '@/lib/result';
import { getOrCreateConnectionPool } from '@/lib/connection/connection-service';
import { splitMultiSQL } from '@/lib/utils/split-multi-sql';
import { BaseConnection } from '@/lib/connection/base/base-connection';
import { withAutomationHandler } from '../../with-automation-handler';
import { parseSqlOp, MAX_STATEMENTS } from '../../utils';

async function executeStatement(
connection: BaseConnection,
statement: string,
context: { database?: string },
) {
const perfStart = performance.now();
try {
const result = await connection.queryWithContext(statement, {
database: context.database,
});
const rows = result.rows ?? [];
const durationMs = Math.round(performance.now() - perfStart);
const isArrayRows = Array.isArray(rows);

return {
ok: true as const,
sql: statement,
operation: parseSqlOp(statement),
columns: result.columns ?? null,
rows: isArrayRows ? rows : [],
rowCount: result.rowCount ?? (isArrayRows ? rows.length : 0),
durationMs,
};
} catch (err: any) {
const durationMs = Math.round(performance.now() - perfStart);
return {
ok: false as const,
sql: statement,
operation: parseSqlOp(statement),
columns: null,
rows: [],
rowCount: 0,
durationMs,
error: {
message: String(err?.message || err),
code: err?.code ?? null,
},
};
}
}

/**
* POST /api/automation/query/execute
*
* Execute SQL against a database connection.
*
* Body:
* {
* "connectionId": "xxx",
* "sql": "SELECT * FROM users LIMIT 100",
* "database": "mydb" // optional
* "stopOnError": true // optional, default true
* }
*/
export const POST = withAutomationHandler(async ({ req, organizationId }) => {
const body = await req.json();
const { connectionId, sql, database, stopOnError = true } = body;

if (!connectionId) {
return NextResponse.json(
ResponseUtil.error({
code: ErrorCodes.INVALID_PARAMS,
message: 'Missing required field: connectionId',
}),
{ status: 400 },
);
}

if (!sql || typeof sql !== 'string' || !sql.trim()) {
return NextResponse.json(
ResponseUtil.error({
code: ErrorCodes.INVALID_PARAMS,
message: 'Missing required field: sql',
}),
{ status: 400 },
);
}

const poolEntry = await getOrCreateConnectionPool(organizationId, connectionId);
if (!poolEntry) {
return NextResponse.json(
ResponseUtil.error({
code: ErrorCodes.NOT_FOUND,
message: 'Connection not found or could not be established.',
}),
{ status: 404 },
);
}

const connection = poolEntry.instance;
const statements = splitMultiSQL(sql).filter(s => !!s.trim());

if (!statements.length) {
return NextResponse.json(ResponseUtil.success({ results: [], durationMs: 0 }));
}

if (statements.length > MAX_STATEMENTS) {
return NextResponse.json(
ResponseUtil.error({
code: ErrorCodes.VALIDATION_ERROR,
message: `Too many statements (${statements.length}). Maximum is ${MAX_STATEMENTS}.`,
}),
{ status: 400 },
);
}

const perfStart = performance.now();
const results: Array<Awaited<ReturnType<typeof executeStatement>>> = [];

for (const statement of statements) {
const result = await executeStatement(connection, statement, { database });
results.push(result);
if (!result.ok && stopOnError) break;
}

const totalDurationMs = Math.round(performance.now() - perfStart);

return NextResponse.json(
ResponseUtil.success({
results,
summary: {
totalStatements: results.length,
successful: results.filter(r => r.ok).length,
failed: results.filter(r => !r.ok).length,
durationMs: totalDurationMs,
},
}),
);
});
Loading
Loading