diff --git a/internal/query/duckdb_text.go b/internal/query/duckdb_text.go index 61935840..904bf825 100644 --- a/internal/query/duckdb_text.go +++ b/internal/query/duckdb_text.go @@ -17,6 +17,20 @@ func textTypeFilter() string { return "msg.message_type IN ('whatsapp','imessage','sms','mms','google_voice_text')" } +// textSenderJoin resolves the sending participant (p_sender) for each text +// message. The sender lives on either messages.sender_id (iMessage/SMS, +// Messenger) or a message_recipients row of type 'from', so we COALESCE the +// two. The 'from' lookup uses an uncorrelated derived table joined on +// message_id rather than a correlated scalar subquery in the JOIN ON clause: +// DuckDB cannot push the message_type filter through a correlated join and +// instead evaluates it across the entire (email-dominated) messages dataset, +// exhausting memory. The derived table optimizes cleanly. +const textSenderJoin = `LEFT JOIN ( + SELECT message_id, ANY_VALUE(participant_id) AS participant_id + FROM mr WHERE recipient_type = 'from' GROUP BY message_id + ) fr ON fr.message_id = msg.id + JOIN p p_sender ON p_sender.id = COALESCE(msg.sender_id, fr.participant_id)` + // buildTextFilterConditions builds WHERE conditions from a TextFilter. // All conditions use the msg. prefix and assume the standard parquetCTEs. func (e *DuckDBEngine) buildTextFilterConditions( @@ -196,24 +210,16 @@ func textAggViewDef( case TextViewContacts: keyExpr := "COALESCE(NULLIF(p_sender.phone_number, ''), " + "p_sender.email_address)" - senderJoin := `JOIN p p_sender ON p_sender.id = COALESCE(msg.sender_id, - (SELECT mr_fb.participant_id FROM mr mr_fb - WHERE mr_fb.message_id = msg.id AND mr_fb.recipient_type = 'from' - LIMIT 1))` return aggViewDef{ keyExpr: keyExpr, - joinClause: senderJoin, + joinClause: textSenderJoin, nullGuard: keyExpr + " IS NOT NULL", }, nil case TextViewContactNames: nameExpr := participantNameExpr("p_sender") - senderJoin := `JOIN p p_sender ON p_sender.id = COALESCE(msg.sender_id, - (SELECT mr_fb.participant_id FROM mr mr_fb - WHERE mr_fb.message_id = msg.id AND mr_fb.recipient_type = 'from' - LIMIT 1))` return aggViewDef{ keyExpr: nameExpr, - joinClause: senderJoin, + joinClause: textSenderJoin, nullGuard: nameExpr + " IS NOT NULL", }, nil case TextViewSources: diff --git a/internal/query/duckdb_text_test.go b/internal/query/duckdb_text_test.go new file mode 100644 index 00000000..a7812da4 --- /dev/null +++ b/internal/query/duckdb_text_test.go @@ -0,0 +1,100 @@ +package query + +import ( + "context" + "testing" + + _ "github.com/marcboeker/go-duckdb" + assertpkg "github.com/stretchr/testify/assert" + requirepkg "github.com/stretchr/testify/require" +) + +// buildTextContactsEngine builds a DuckDB engine over a small Parquet dataset +// shaped like real text data: an iMessage source, phone/email participants, and +// messages whose sender is recorded on messages.sender_id (iMessage/SMS shape) +// or only on a message_recipients row of type 'from' (Messenger shape). It also +// includes an email message that must be excluded from text aggregates. +func buildTextContactsEngine(t *testing.T) *DuckDBEngine { + t.Helper() + b := NewTestDataBuilder(t) + + smsSrc := b.AddSourceWithType("me@imessage.local", "imessage") + emailSrc := b.AddSourceWithType("me@gmail.com", "gmail") + + alice := b.AddPhoneParticipant("+14155550001", "Alice") + bob := b.AddPhoneParticipant("+14155550002", "Bob") + emailSender := b.AddParticipant("carol@example.com", "example.com", "Carol") + + // Two iMessages from Alice, sender on messages.sender_id (direct shape). + m1 := b.AddMessage(MessageOpt{MessageType: "imessage", SourceID: smsSrc, SenderID: &alice}) + m2 := b.AddMessage(MessageOpt{MessageType: "imessage", SourceID: smsSrc, SenderID: &alice}) + b.AddFrom(m1, alice, "Alice") + b.AddFrom(m2, alice, "Alice") + + // One SMS from Bob, sender recorded ONLY via the 'from' recipient row + // (no messages.sender_id) — exercises the COALESCE fallback. + m3 := b.AddMessage(MessageOpt{MessageType: "sms", SourceID: smsSrc}) + b.AddFrom(m3, bob, "Bob") + + // One email — must NOT appear in the text contacts aggregate. + m4 := b.AddMessage(MessageOpt{MessageType: "email", SourceID: emailSrc, SenderID: &emailSender}) + b.AddFrom(m4, emailSender, "Carol") + + return b.BuildEngine() +} + +// TestDuckDBTextAggregate_Contacts guards the DuckDB text Contacts aggregate. +// A prior implementation embedded a correlated scalar subquery in the JOIN ON +// clause, which DuckDB could not optimize over a large messages dataset; this +// asserts the view returns the expected non-email contacts. +func TestDuckDBTextAggregate_Contacts(t *testing.T) { + engine := buildTextContactsEngine(t) + ctx := context.Background() + + tests := []struct { + name string + view TextViewType + wantKeys map[string]int64 // key -> message count + absent string // key that must not appear (the email sender) + }{ + { + name: "contacts (phone/email key)", + view: TextViewContacts, + wantKeys: map[string]int64{ + "+14155550001": 2, // Alice, via sender_id + "+14155550002": 1, // Bob, via 'from' recipient fallback + }, + absent: "carol@example.com", + }, + { + name: "contact names (display name key)", + view: TextViewContactNames, + wantKeys: map[string]int64{ + "Alice": 2, + "Bob": 1, + }, + absent: "Carol", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + require := requirepkg.New(t) + assert := assertpkg.New(t) + + rows, err := engine.TextAggregate(ctx, tc.view, TextAggregateOptions{}) + require.NoError(err) + require.NotEmpty(rows, "contacts aggregate must not be empty") + + got := make(map[string]int64, len(rows)) + for _, r := range rows { + got[r.Key] = r.Count + } + for key, count := range tc.wantKeys { + assert.Equal(count, got[key], "count for %q", key) + } + _, present := got[tc.absent] + assert.False(present, "email sender %q must be excluded from text aggregate", tc.absent) + }) + } +} diff --git a/internal/query/testfixtures_test.go b/internal/query/testfixtures_test.go index c80e4265..369f800d 100644 --- a/internal/query/testfixtures_test.go +++ b/internal/query/testfixtures_test.go @@ -178,8 +178,10 @@ type MessageOpt struct { SizeEstimate int64 HasAttachments bool DeletedAt *time.Time - SourceID int64 // defaults to 1 - ConversationID int64 // 0 = auto-assign + SourceID int64 // defaults to 1 + ConversationID int64 // 0 = auto-assign + MessageType string // defaults to "email" + SenderID *int64 // nil = NULL (direct sender for text/chat messages) } // AddMessage adds a message and returns its ID. @@ -217,6 +219,8 @@ func (b *TestDataBuilder) AddMessage(opt MessageOpt) int64 { SizeEstimate: opt.SizeEstimate, HasAttachments: opt.HasAttachments, DeletedAt: opt.DeletedAt, + SenderID: opt.SenderID, + MessageType: opt.MessageType, Year: sentAt.Year(), Month: int(sentAt.Month()), })