Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions lib/muninn/index_writer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,46 @@ defmodule Muninn.IndexWriter do
error -> error
end
end

@doc """
Deletes all documents matching a (field, value) term.

Tantivy marks the matching documents as deleted; the delete becomes
visible to searches after the next `commit/1`. Useful for incremental
refresh: delete docs whose key matches `value`, then re-add the new
versions in the same transaction.

Supports text, u64, i64, and bool fields. f64 is not supported (Tantivy
has no stable term encoding for floats).

## Parameters

* `index` - The index to delete from
* `field_name` - Name of the indexed field to match against
* `value` - The value to match (string / integer / boolean)

## Returns

* `:ok` - Delete queued (visible after `commit/1`)
* `{:error, reason}` - Failed (e.g. field not found, unsupported type)

## Examples

:ok = Muninn.IndexWriter.add_document(index, %{"id" => "doc-1", "body" => "hello"})
:ok = Muninn.IndexWriter.commit(index)

:ok = Muninn.IndexWriter.delete_term(index, "id", "doc-1")
:ok = Muninn.IndexWriter.commit(index)
# "doc-1" no longer searchable

"""
@spec delete_term(reference(), String.t(), String.t() | integer() | boolean()) ::
:ok | {:error, String.t()}
def delete_term(index, field_name, value) when is_binary(field_name) do
case Native.writer_delete_term(index, field_name, value) do
{:ok, _} -> :ok
:ok -> :ok
error -> error
end
end
end
4 changes: 4 additions & 0 deletions lib/muninn/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ defmodule Muninn.Native do
@doc false
def writer_rollback(_index), do: :erlang.nif_error(:nif_not_loaded)

@doc false
def writer_delete_term(_index, _field_name, _value),
do: :erlang.nif_error(:nif_not_loaded)

## Reader functions

@doc false
Expand Down
9 changes: 9 additions & 0 deletions native/muninn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ fn writer_rollback(index: rustler::ResourceArc<index::IndexResource>) -> Result<
writer::writer_rollback(index)
}

#[rustler::nif]
fn writer_delete_term(
index: rustler::ResourceArc<index::IndexResource>,
field_name: String,
value: rustler::Term,
) -> Result<(), String> {
writer::writer_delete_term(index, field_name, value)
}

#[rustler::nif]
fn reader_new(
index: rustler::ResourceArc<index::IndexResource>,
Expand Down
83 changes: 82 additions & 1 deletion native/muninn/src/writer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use rustler::{Env, ResourceArc, Term};
use std::collections::HashMap;
use tantivy::schema::FieldType;
use tantivy::TantivyDocument;
use tantivy::{TantivyDocument, Term as TantivyTerm};

use crate::index::IndexResource;

Expand Down Expand Up @@ -101,6 +101,87 @@ pub fn writer_add_document(
Ok(())
}

/// Deletes all documents containing the given term (field+value pair).
///
/// Tantivy marks matching documents as deleted; the delete becomes visible
/// to readers after the next `commit`. Supports text, u64, i64, and bool
/// fields. f64 fields are not supported because Tantivy does not provide a
/// stable term encoding for floats.
pub fn writer_delete_term(
index_res: ResourceArc<IndexResource>,
field_name: String,
value: Term,
) -> Result<(), String> {
let index = index_res
.index
.lock()
.map_err(|_| "Failed to acquire index lock".to_string())?;

let schema = index.schema();
let field = schema
.get_field(&field_name)
.map_err(|_| format!("Field '{}' not found in schema", field_name))?;
let field_entry = schema.get_field_entry(field);

let term = match field_entry.field_type() {
FieldType::Str(_) => {
let s = value
.decode::<String>()
.map_err(|_| "Expected string value for text field".to_string())?;
TantivyTerm::from_field_text(field, &s)
}
FieldType::U64(_) => {
let n = value
.decode::<u64>()
.or_else(|_| value.decode::<i64>().map(|i| i as u64))
.map_err(|_| "Expected integer value for u64 field".to_string())?;
TantivyTerm::from_field_u64(field, n)
}
FieldType::I64(_) => {
let n = value
.decode::<i64>()
.or_else(|_| value.decode::<u64>().map(|u| u as i64))
.map_err(|_| "Expected integer value for i64 field".to_string())?;
TantivyTerm::from_field_i64(field, n)
}
FieldType::Bool(_) => {
let b = value
.decode::<bool>()
.map_err(|_| "Expected boolean value for bool field".to_string())?;
TantivyTerm::from_field_bool(field, b)
}
_ => {
return Err(format!(
"Field '{}' has unsupported type for delete_term (text, u64, i64, bool only)",
field_name
));
}
};

drop(index);

let mut writer_lock = index_res
.writer
.lock()
.map_err(|_| "Failed to acquire writer lock".to_string())?;

if writer_lock.is_none() {
let index = index_res
.index
.lock()
.map_err(|_| "Failed to acquire index lock".to_string())?;
let new_writer = index
.writer(50_000_000)
.map_err(|e| format!("Failed to create writer: {}", e))?;
*writer_lock = Some(new_writer);
}

let writer = writer_lock.as_mut().unwrap();
writer.delete_term(term);

Ok(())
}

/// Commits all pending changes to the index
pub fn writer_commit(index_res: ResourceArc<IndexResource>) -> Result<(), String> {
let mut writer_lock = index_res
Expand Down
145 changes: 145 additions & 0 deletions test/muninn/index_writer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,151 @@ defmodule Muninn.IndexWriterTest do
end
end

describe "delete_term/3" do
alias Muninn.{IndexReader, Searcher}
alias Muninn.Query.Term, as: TermQuery

test "deletes documents matching a text term", %{test_path: test_path} do
schema =
Schema.new()
|> Schema.add_text_field("id", stored: true, indexed: true)
|> Schema.add_text_field("body", stored: true, indexed: true)

{:ok, index} = Index.create(test_path, schema)

:ok =
IndexWriter.add_documents(index, [
%{"id" => "doc1", "body" => "hello"},
%{"id" => "doc2", "body" => "world"}
])

:ok = IndexWriter.commit(index)

{:ok, reader} = IndexReader.new(index)
{:ok, searcher} = Searcher.new(reader)

assert {:ok, %{"total_hits" => 1}} =
Searcher.search(searcher, %TermQuery{field: "id", value: "doc1"}, limit: 10)

:ok = IndexWriter.delete_term(index, "id", "doc1")
:ok = IndexWriter.commit(index)

{:ok, reader} = IndexReader.new(index)
{:ok, searcher} = Searcher.new(reader)

assert {:ok, %{"total_hits" => 0}} =
Searcher.search(searcher, %TermQuery{field: "id", value: "doc1"}, limit: 10)

assert {:ok, %{"total_hits" => 1}} =
Searcher.search(searcher, %TermQuery{field: "id", value: "doc2"}, limit: 10)
end

test "deletes all documents matching the term", %{test_path: test_path} do
schema =
Schema.new()
|> Schema.add_text_field("category", stored: true, indexed: true)
|> Schema.add_text_field("title", stored: true, indexed: true)

{:ok, index} = Index.create(test_path, schema)

:ok =
IndexWriter.add_documents(index, [
%{"category" => "draft", "title" => "A"},
%{"category" => "draft", "title" => "B"},
%{"category" => "published", "title" => "C"}
])

:ok = IndexWriter.commit(index)
:ok = IndexWriter.delete_term(index, "category", "draft")
:ok = IndexWriter.commit(index)

{:ok, reader} = IndexReader.new(index)
{:ok, searcher} = Searcher.new(reader)

assert {:ok, %{"total_hits" => 0}} =
Searcher.search(
searcher,
%TermQuery{field: "category", value: "draft"},
limit: 10
)

assert {:ok, %{"total_hits" => 1}} =
Searcher.search(
searcher,
%TermQuery{field: "category", value: "published"},
limit: 10
)
end

test "delete then re-add same term (incremental refresh pattern)", %{test_path: test_path} do
schema =
Schema.new()
|> Schema.add_text_field("id", stored: true, indexed: true)
|> Schema.add_text_field("body", stored: true, indexed: true)

{:ok, index} = Index.create(test_path, schema)

:ok = IndexWriter.add_document(index, %{"id" => "doc1", "body" => "old content"})
:ok = IndexWriter.commit(index)

:ok = IndexWriter.delete_term(index, "id", "doc1")
:ok = IndexWriter.add_document(index, %{"id" => "doc1", "body" => "new content"})
:ok = IndexWriter.commit(index)

{:ok, reader} = IndexReader.new(index)
{:ok, searcher} = Searcher.new(reader)

assert {:ok, %{"total_hits" => 1, "hits" => [hit]}} =
Searcher.search(searcher, %TermQuery{field: "id", value: "doc1"}, limit: 10)

body = hit["doc"]["body"]
assert body == ["new content"] or body == "new content"
end

test "delete on uncommitted writer is queued and applied at commit",
%{test_path: test_path} do
schema =
Schema.new()
|> Schema.add_text_field("id", stored: true, indexed: true)

{:ok, index} = Index.create(test_path, schema)

:ok = IndexWriter.add_document(index, %{"id" => "ghost"})
:ok = IndexWriter.delete_term(index, "id", "ghost")
:ok = IndexWriter.commit(index)

{:ok, reader} = IndexReader.new(index)
{:ok, searcher} = Searcher.new(reader)

assert {:ok, %{"total_hits" => 0}} =
Searcher.search(searcher, %TermQuery{field: "id", value: "ghost"}, limit: 10)
end

test "delete by u64 term", %{test_path: test_path} do
schema =
Schema.new()
|> Schema.add_u64_field("id", stored: true, indexed: true)
|> Schema.add_text_field("body", stored: true)

{:ok, index} = Index.create(test_path, schema)

:ok = IndexWriter.add_document(index, %{"id" => 42, "body" => "answer"})
:ok = IndexWriter.add_document(index, %{"id" => 43, "body" => "next"})
:ok = IndexWriter.commit(index)

assert :ok = IndexWriter.delete_term(index, "id", 42)
:ok = IndexWriter.commit(index)
end

test "returns error for unknown field", %{test_path: test_path} do
schema = Schema.new() |> Schema.add_text_field("id", stored: true, indexed: true)
{:ok, index} = Index.create(test_path, schema)

assert {:error, reason} = IndexWriter.delete_term(index, "nope", "x")
assert reason =~ "not found"
end
end

describe "real-world scenarios" do
test "e-commerce product indexing", %{test_path: test_path} do
schema =
Expand Down
Loading