Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions internal/query/duckdb_text.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@ func textTypeFilter() string {
return "msg.message_type IN ('whatsapp','imessage','sms','mms','google_voice_text')"
}

// textSenderJoin resolves the sending participant (p_sender) for each text
// message. The sender lives on either messages.sender_id (iMessage/SMS,
// Messenger) or a message_recipients row of type 'from', so we COALESCE the
// two. The 'from' lookup uses an uncorrelated derived table joined on
// message_id rather than a correlated scalar subquery in the JOIN ON clause:
// DuckDB cannot push the message_type filter through a correlated join and
// instead evaluates it across the entire (email-dominated) messages dataset,
// exhausting memory. The derived table optimizes cleanly.
const textSenderJoin = `LEFT JOIN (
SELECT message_id, ANY_VALUE(participant_id) AS participant_id
FROM mr WHERE recipient_type = 'from' GROUP BY message_id
) fr ON fr.message_id = msg.id
JOIN p p_sender ON p_sender.id = COALESCE(msg.sender_id, fr.participant_id)`

// buildTextFilterConditions builds WHERE conditions from a TextFilter.
// All conditions use the msg. prefix and assume the standard parquetCTEs.
func (e *DuckDBEngine) buildTextFilterConditions(
Expand Down Expand Up @@ -196,24 +210,16 @@ func textAggViewDef(
case TextViewContacts:
keyExpr := "COALESCE(NULLIF(p_sender.phone_number, ''), " +
"p_sender.email_address)"
senderJoin := `JOIN p p_sender ON p_sender.id = COALESCE(msg.sender_id,
(SELECT mr_fb.participant_id FROM mr mr_fb
WHERE mr_fb.message_id = msg.id AND mr_fb.recipient_type = 'from'
LIMIT 1))`
return aggViewDef{
keyExpr: keyExpr,
joinClause: senderJoin,
joinClause: textSenderJoin,
nullGuard: keyExpr + " IS NOT NULL",
}, nil
case TextViewContactNames:
nameExpr := participantNameExpr("p_sender")
senderJoin := `JOIN p p_sender ON p_sender.id = COALESCE(msg.sender_id,
(SELECT mr_fb.participant_id FROM mr mr_fb
WHERE mr_fb.message_id = msg.id AND mr_fb.recipient_type = 'from'
LIMIT 1))`
return aggViewDef{
keyExpr: nameExpr,
joinClause: senderJoin,
joinClause: textSenderJoin,
nullGuard: nameExpr + " IS NOT NULL",
}, nil
case TextViewSources:
Expand Down
100 changes: 100 additions & 0 deletions internal/query/duckdb_text_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package query

import (
"context"
"testing"

_ "github.com/marcboeker/go-duckdb"
assertpkg "github.com/stretchr/testify/assert"
requirepkg "github.com/stretchr/testify/require"
)

// buildTextContactsEngine builds a DuckDB engine over a small Parquet dataset
// shaped like real text data: an iMessage source, phone/email participants, and
// messages whose sender is recorded on messages.sender_id (iMessage/SMS shape)
// or only on a message_recipients row of type 'from' (Messenger shape). It also
// includes an email message that must be excluded from text aggregates.
func buildTextContactsEngine(t *testing.T) *DuckDBEngine {
t.Helper()
b := NewTestDataBuilder(t)

smsSrc := b.AddSourceWithType("me@imessage.local", "imessage")
emailSrc := b.AddSourceWithType("me@gmail.com", "gmail")

alice := b.AddPhoneParticipant("+14155550001", "Alice")
bob := b.AddPhoneParticipant("+14155550002", "Bob")
emailSender := b.AddParticipant("carol@example.com", "example.com", "Carol")

// Two iMessages from Alice, sender on messages.sender_id (direct shape).
m1 := b.AddMessage(MessageOpt{MessageType: "imessage", SourceID: smsSrc, SenderID: &alice})
m2 := b.AddMessage(MessageOpt{MessageType: "imessage", SourceID: smsSrc, SenderID: &alice})
b.AddFrom(m1, alice, "Alice")
b.AddFrom(m2, alice, "Alice")

// One SMS from Bob, sender recorded ONLY via the 'from' recipient row
// (no messages.sender_id) — exercises the COALESCE fallback.
m3 := b.AddMessage(MessageOpt{MessageType: "sms", SourceID: smsSrc})
b.AddFrom(m3, bob, "Bob")

// One email — must NOT appear in the text contacts aggregate.
m4 := b.AddMessage(MessageOpt{MessageType: "email", SourceID: emailSrc, SenderID: &emailSender})
b.AddFrom(m4, emailSender, "Carol")

return b.BuildEngine()
}

// TestDuckDBTextAggregate_Contacts guards the DuckDB text Contacts aggregate.
// A prior implementation embedded a correlated scalar subquery in the JOIN ON
// clause, which DuckDB could not optimize over a large messages dataset; this
// asserts the view returns the expected non-email contacts.
func TestDuckDBTextAggregate_Contacts(t *testing.T) {
engine := buildTextContactsEngine(t)
ctx := context.Background()

tests := []struct {
name string
view TextViewType
wantKeys map[string]int64 // key -> message count
absent string // key that must not appear (the email sender)
}{
{
name: "contacts (phone/email key)",
view: TextViewContacts,
wantKeys: map[string]int64{
"+14155550001": 2, // Alice, via sender_id
"+14155550002": 1, // Bob, via 'from' recipient fallback
},
absent: "carol@example.com",
},
{
name: "contact names (display name key)",
view: TextViewContactNames,
wantKeys: map[string]int64{
"Alice": 2,
"Bob": 1,
},
absent: "Carol",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
require := requirepkg.New(t)
assert := assertpkg.New(t)

rows, err := engine.TextAggregate(ctx, tc.view, TextAggregateOptions{})
require.NoError(err)
require.NotEmpty(rows, "contacts aggregate must not be empty")

got := make(map[string]int64, len(rows))
for _, r := range rows {
got[r.Key] = r.Count
}
for key, count := range tc.wantKeys {
assert.Equal(count, got[key], "count for %q", key)
}
_, present := got[tc.absent]
assert.False(present, "email sender %q must be excluded from text aggregate", tc.absent)
})
}
}
8 changes: 6 additions & 2 deletions internal/query/testfixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,10 @@ type MessageOpt struct {
SizeEstimate int64
HasAttachments bool
DeletedAt *time.Time
SourceID int64 // defaults to 1
ConversationID int64 // 0 = auto-assign
SourceID int64 // defaults to 1
ConversationID int64 // 0 = auto-assign
MessageType string // defaults to "email"
SenderID *int64 // nil = NULL (direct sender for text/chat messages)
}

// AddMessage adds a message and returns its ID.
Expand Down Expand Up @@ -217,6 +219,8 @@ func (b *TestDataBuilder) AddMessage(opt MessageOpt) int64 {
SizeEstimate: opt.SizeEstimate,
HasAttachments: opt.HasAttachments,
DeletedAt: opt.DeletedAt,
SenderID: opt.SenderID,
MessageType: opt.MessageType,
Year: sentAt.Year(),
Month: int(sentAt.Month()),
})
Expand Down
Loading