diff --git a/README.md b/README.md index e5d7593..ea59c5f 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ [![Worfklow](https://github.com/andrewyates/bsparse/workflows/pytest/badge.svg)](https://github.com/andrewyates/bsparse/actions) [![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff) # bsparse -bsparse is a toolkit for creating and searching learned sparse representations +bsparse is a toolkit for creating, indexing, and searching learned sparse representations ## Usage examples ``` @@ -59,4 +59,41 @@ java -cp anserini-1.0.0-fatjar-AY.jar io.anserini.index.IndexCollection \ # 3) search index # Create sparse query representations in `$QUERY_VECTORS` and create an index in `$INDEX`, then: python -m bsparse.cli search --index $INDEX --queries $QUERY_VECTORS --out test.run --topk 1000 + +``` + +### Seismic backend + +[Seismic](https://github.com/TusKANNy/seismic) is an alternative backend that indexes learned +sparse representations natively in Python (no Java/JAR required). The encoded JSONL files produced +by `encode` are already in the format Seismic expects, so the same doc/query files work for both +backends. + +``` +# install the Seismic Python bindings (optional dependency; only needed for this backend) +uv pip install pyseismic-lsr +# for best performance, build against your CPU instead: +# RUSTFLAGS="-C target-cpu=native" uv pip install --no-binary :all: pyseismic-lsr + +# 1) build a Seismic index from encoded docs +python -m bsparse.cli index --backend seismic --input nfcorpus-docs.jsonl --index $INDEX +# --input accepts multiple files, gzipped (.gz) input, and directories of .jsonl/.jsonl.gz files; +# if the in-memory API gives you trouble, --build-method file falls back to concatenating +# the inputs into a temporary uncompressed JSONL file and using Seismic's file-based build +# +# note: seismic appends ".index.seismic" to the path, so the on-disk file is $INDEX.index.seismic; +# search --index accepts either the build-time path or the full on-disk filename +# +# indexing hyperparameters are flags with defaults, e.g.: +# --n-postings 3000 --centroid-fraction 0.2 --summary-energy 0.5 --max-fraction 6 --min-cluster-size 2 --nknn 0 +# +# use --variant large_vocab for collections with more than 65k unique tokens + +# 2) search the index and evaluate +python -m bsparse.cli search --backend seismic --index $INDEX \ + --queries nfcorpus-queries.jsonl --out test.run --topk 1000 \ + --query-cut 10 --heap-factor 0.8 --qrels beir/nfcorpus/test + +# query-time thread count is index-independent and set via the environment: +# SEISMIC_THREADS=16 python -m bsparse.cli search --backend seismic ... ``` diff --git a/bsparse/__init__.py b/bsparse/__init__.py index 06040ac..7b35b87 100644 --- a/bsparse/__init__.py +++ b/bsparse/__init__.py @@ -10,4 +10,4 @@ from .utils import batch_encode, get_torch_device, token_ids_to_binary_vec -__version__ = "0.1.0" +__version__ = "0.2.0" diff --git a/bsparse/cli.py b/bsparse/cli.py index 8e0e568..aad14a3 100644 --- a/bsparse/cli.py +++ b/bsparse/cli.py @@ -8,6 +8,7 @@ COMMANDS = { "encode": bsparse.commands.Encode, "check": bsparse.commands.Check, + "index": bsparse.commands.Index, "search": bsparse.commands.Search, "memsearch": bsparse.commands.MemSearch, } diff --git a/bsparse/commands.py b/bsparse/commands.py index e77a053..02ab128 100644 --- a/bsparse/commands.py +++ b/bsparse/commands.py @@ -10,6 +10,7 @@ from bsparse import load_dict, save_dict from bsparse.anserini import Anserini +from bsparse.seismic import Seismic from bsparse.utils import psgid_to_docid @@ -152,27 +153,155 @@ def run(self): return run +class Index(Command): + @classmethod + def add_arguments(cls, parser): + parser.add_argument( + "--input", + type=Path, + nargs="+", + required=True, + help="One or more encoded JSONL doc files (plain or .gz), or directories of such files", + ) + parser.add_argument("--index", type=Path, required=True, help="Output index path") + parser.add_argument( + "--backend", type=str, default="seismic", choices=["seismic"], help="Indexing backend (default: %(default)s)" + ) + # Seismic index-building hyperparameters (these affect the index, so they are kwargs, not env vars). + # Defaults match the recommended config in the Seismic guidelines. + parser.add_argument( + "--n-postings", type=int, default=3000, help="[Seismic only] avg postings per list (default: %(default)s)" + ) + parser.add_argument( + "--centroid-fraction", + type=float, + default=0.2, + help="[Seismic only] centroids per list as a fraction (default: %(default)s)", + ) + parser.add_argument( + "--summary-energy", + type=float, + default=0.5, + help="[Seismic only] fraction of summary L1 norm to keep (default: %(default)s)", + ) + parser.add_argument( + "--min-cluster-size", type=int, default=2, help="[Seismic only] minimum cluster size (default: %(default)s)" + ) + parser.add_argument( + "--max-fraction", + type=float, + default=6, + help="[Seismic only] max summary block size as a fraction (default: %(default)s)", + ) + parser.add_argument( + "--nknn", type=int, default=0, help="[Seismic only] kNN graph size; 0 disables it (default: %(default)s)" + ) + parser.add_argument( + "--batched-indexing", + type=int, + default=100000, + help="[Seismic only] docs per indexing batch (default: %(default)s)", + ) + parser.add_argument( + "--variant", + type=str, + default="standard", + choices=["standard", "large_vocab"], + help="[Seismic only] index variant; use large_vocab for >65k tokens (default: %(default)s)", + ) + parser.add_argument( + "--build-method", + type=str, + default="dataset", + choices=["dataset", "file"], + help="[Seismic only] feed docs via the in-memory dataset API, or via a temporary " + "uncompressed JSONL file as a fallback (default: %(default)s)", + ) + + def __init__(self, config): + self.cfg = config + + def run(self): + if self.cfg.backend == "seismic": + Seismic.build( + self.cfg.input, + self.cfg.index, + n_postings=self.cfg.n_postings, + centroid_fraction=self.cfg.centroid_fraction, + summary_energy=self.cfg.summary_energy, + min_cluster_size=self.cfg.min_cluster_size, + max_fraction=self.cfg.max_fraction, + nknn=self.cfg.nknn, + batched_indexing=self.cfg.batched_indexing, + variant=self.cfg.variant, + method=self.cfg.build_method, + ) + else: + raise ValueError(f"unknown indexing backend: {self.cfg.backend}") + + class Search(Command): @classmethod def add_arguments(cls, parser): - parser.add_argument("--index", type=Path, required=True, help="Anserini index path") + parser.add_argument("--index", type=Path, required=True, help="Index path") parser.add_argument("--queries", type=Path, required=True, help="Query file path") parser.add_argument("--out", type=Path, required=True, help="Output file path") + parser.add_argument( + "--backend", + type=str, + default="anserini", + choices=["anserini", "seismic"], + help="Search backend (default: %(default)s)", + ) parser.add_argument("--topk", type=int, default=1000, help="Top K results to return (default: %(default)s)") parser.add_argument("--qrels", type=str, default=None, help="Relevance judgments dataset (default: %(default)s)") + # Backend-specific args default to None so we can tell whether the user set them: unset args fall + # back to the backend's own default, and passing an arg for a different backend is rejected (see run()). + # anserini-specific + parser.add_argument("--scale", type=int, default=None, help="[Anserini only] impact scaling factor (default: 50)") + # seismic-specific + parser.add_argument("--query-cut", type=int, default=None, help="[Seismic only] query_cut (default: 10)") + parser.add_argument("--heap-factor", type=float, default=None, help="[Seismic only] heap_factor (default: 0.8)") + + # maps each backend to the args that only apply to it + BACKEND_ARGS = {"anserini": ["scale"], "seismic": ["query_cut", "heap_factor"]} def __init__(self, config): self.cfg = config + def _check_backend_args(self): + """Reject args that only apply to a backend other than the one selected.""" + for backend, names in self.BACKEND_ARGS.items(): + if backend == self.cfg.backend: + continue + misused = [f"--{name.replace('_', '-')}" for name in names if getattr(self.cfg, name) is not None] + if misused: + raise ValueError(f"{', '.join(misused)} only valid with the {backend} backend, not '{self.cfg.backend}'") + def run(self): + self._check_backend_args() + if self.cfg.out.is_dir(): raise ValueError(f"--out is a directory: {self.cfg.out}") queries = load_dict(self.cfg.queries) queries.ids = [psgid_to_docid(qid) for qid in queries.ids] - - anserini = Anserini(self.cfg.index.as_posix()) - results = anserini.query_from_vectors([{"vector": rep} for rep in queries.weights], k=self.cfg.topk) + vectors = [{"vector": rep} for rep in queries.weights] + + if self.cfg.backend == "anserini": + retriever = Anserini(self.cfg.index.as_posix()) + kwargs = {} if self.cfg.scale is None else {"scale": self.cfg.scale} + results = retriever.query_from_vectors(vectors, k=self.cfg.topk, **kwargs) + elif self.cfg.backend == "seismic": + retriever = Seismic(self.cfg.index.as_posix()) + kwargs = {} + if self.cfg.query_cut is not None: + kwargs["query_cut"] = self.cfg.query_cut + if self.cfg.heap_factor is not None: + kwargs["heap_factor"] = self.cfg.heap_factor + results = retriever.query_from_vectors(vectors, k=self.cfg.topk, **kwargs) + else: + raise ValueError(f"unknown search backend: {self.cfg.backend}") run = TRECRun(dict(zip(queries.ids, results))).aggregate_docids(psgid_to_docid).topk(self.cfg.topk) print(f"saving run to: {self.cfg.out}") diff --git a/bsparse/seismic.py b/bsparse/seismic.py index c3df391..5e9be6c 100644 --- a/bsparse/seismic.py +++ b/bsparse/seismic.py @@ -1,15 +1,299 @@ -# TODO implement -from seismic import SeismicIndex +import gzip +import json +import os +import sys +import tempfile +import warnings +from contextlib import contextmanager +import numpy as np +from tqdm import tqdm -json_input_file = "combined.jsonl" +from bsparse.models import Model -# batched_indexing is important to speed seismic up a lot -index = SeismicIndex.build(json_input_file, batched_indexing=100000) -print("Number of documents:", index.len) -print("Avg number of non-zero components:", index.nnz / index.len) -print("Dimensionality of the vectors:", index.dim) -index.print_space_usage_byte() +THREADS = int(os.environ.get("SEISMIC_THREADS", os.cpu_count())) -index.save("SEISMICINDEX") +# Index-building hyperparameter defaults (override per-call via build() kwargs). +# These match the recommended configuration in the Seismic guidelines: +# https://github.com/TusKANNy/seismic/blob/main/docs/Guidelines.md +# build(n_postings=3000, centroid_fraction=0.2, min_cluster_size=2, summary_energy=0.5, max_fraction=6) +# (reasonable centroid_fraction in {0.05, 0.1, 0.2}; summary_energy in [0.4, 0.6]) +DEFAULT_N_POSTINGS = 3000 +DEFAULT_CENTROID_FRACTION = 0.2 +DEFAULT_SUMMARY_ENERGY = 0.5 +DEFAULT_MIN_CLUSTER_SIZE = 2 +DEFAULT_MAX_FRACTION = 6 +# nknn=0 builds no kNN graph (not needed for basic indexing); batched_indexing only affects +# build speed/memory, not the resulting index +DEFAULT_NKNN = 0 +DEFAULT_BATCHED_INDEXING = 100_000 + +# Query-time hyperparameter defaults. +# query_cut ~ 10 generally yields high effectiveness; heap_factor in [0.7, 1]. +DEFAULT_QUERY_CUT = 10 +DEFAULT_HEAP_FACTOR = 0.8 + +# seismic's save(path) writes to path + this suffix, but load() expects the full on-disk filename +INDEX_SUFFIX = ".index.seismic" + + +def _resolve_index_path(path: str) -> str: + """Resolve an index path to the on-disk filename, accepting either the exact saved file or the + prefix that was passed to build() (to which seismic's save() appends INDEX_SUFFIX).""" + if os.path.isfile(path): + return path + if os.path.isfile(path + INDEX_SUFFIX): + return path + INDEX_SUFFIX + raise FileNotFoundError(f"no Seismic index found at: {path} (also tried: {path + INDEX_SUFFIX})") + + +def _import_seismic(): + try: + import seismic + except ImportError: + print("ERROR: the 'seismic' package is required for Seismic indexing/search.", file=sys.stderr) + print(" Install it with: pip install pyseismic-lsr", file=sys.stderr) + print( + ' (for best performance: RUSTFLAGS="-C target-cpu=native" pip install --no-binary :all: pyseismic-lsr)', + file=sys.stderr, + ) + raise + + return seismic + + +# set once the truncation warning has fired, so we don't re-scan every document of a large build +_truncation_warned = False + + +def _to_seismic_strings(strings: list, string_type) -> np.ndarray: + """Convert tokens to seismic's fixed-width unicode dtype, warning if any would be truncated. + + numpy silently truncates strings longer than the dtype's width (e.g. 30 chars for "U30"). + Truncated tokens can collide with each other or fail to match tokens that were stored without + truncation (e.g. by the file-based build, which bypasses numpy), so this is worth a warning. + """ + global _truncation_warned + if not _truncation_warned: + width = np.dtype(string_type).itemsize // np.dtype("U1").itemsize + too_long = next((s for s in strings if len(s) > width), None) + if too_long is not None: + _truncation_warned = True + warnings.warn( + f"token longer than seismic's {width}-char string dtype will be truncated: {too_long!r} " + "(truncated tokens may collide or fail to match; further warnings suppressed)" + ) + return np.array(strings, dtype=string_type) + + +JSONL_SUFFIXES = (".jsonl", ".jsonl.gz") + + +def _expand_inputs(inputs) -> list[str]: + """Normalize build inputs into a concrete list of JSONL files, expanding directories. + + `inputs` may be a single path or a list of paths; each entry may be a .jsonl/.jsonl.gz file or a + directory (expanded to the .jsonl/.jsonl.gz files it directly contains, sorted). This is + experiment code, so we fail loudly on paths that don't exist or directories with no matches + rather than silently skipping them. + """ + if isinstance(inputs, (str, os.PathLike)): + inputs = [inputs] + inputs = [str(p) for p in inputs] + if not inputs: + raise ValueError("no input files provided to Seismic.build") + + expanded = [] + for path in inputs: + if os.path.isdir(path): + matches = sorted(os.path.join(path, name) for name in os.listdir(path) if name.endswith(JSONL_SUFFIXES)) + if not matches: + raise FileNotFoundError(f"no .jsonl/.jsonl.gz files found in directory: {path}") + expanded.extend(matches) + elif os.path.isfile(path): + expanded.append(path) + else: + raise FileNotFoundError(f"Seismic.build input is not a readable file or directory: {path}") + + return expanded + + +def _iter_docs(inputs: list[str]): + """Yield (doc_id, vector) pairs from encoded JSONL files (plain or .gz), failing loudly.""" + for fn in inputs: + opener = gzip.open if fn.endswith(".gz") else open + with opener(fn, "rt", encoding="utf-8") as f: + for line_no, line in enumerate(f, start=1): + try: + d = json.loads(line) + doc_id, vector = d["id"], d["vector"] + except (json.JSONDecodeError, KeyError) as e: + raise ValueError(f"invalid document on line {line_no} of {fn}: {e}") from e + + yield str(doc_id), vector + + +@contextmanager +def _seismic_input(inputs: list[str]): + """Yield a single uncompressed JSONL path that Seismic can build from. + + Seismic's build() only accepts one uncompressed JSONL file. A single plain-JSONL input is used + as-is; otherwise (multiple files and/or .gz) the inputs are streamed line-by-line into a + temporary uncompressed file (decompressing .gz on the way) that is deleted afterwards. + """ + if len(inputs) == 1 and not inputs[0].endswith(".gz"): + yield inputs[0] + return + + tmp_dir = os.environ.get("TMPDIR", "/tmp") + os.makedirs(tmp_dir, exist_ok=True) + with tempfile.TemporaryDirectory(prefix=tmp_dir + "/") as tmpdir: + combined = os.path.join(tmpdir, "combined.jsonl") + with open(combined, "wt", encoding="utf-8") as out: + for fn in inputs: + opener = gzip.open if fn.endswith(".gz") else open + with opener(fn, "rt", encoding="utf-8") as f: + for line in f: + # guarantee a newline between concatenated files so we never merge two + # JSON objects onto one line + out.write(line if line.endswith("\n") else line + "\n") + yield combined + + +class Seismic: + def __init__(self, index_path: str): + self.index_path = index_path + self._index = None + + @property + def index(self): + # the index is loaded lazily so that constructing a Seismic object (and importing this + # module) does not require the native 'seismic' package to be installed + if self._index is None: + self._index = _import_seismic().SeismicIndex.load(_resolve_index_path(self.index_path)) + return self._index + + def query_from_raw_text( + self, + queries: list[str], + model: Model, + k: int = 1000, + query_cut: int = DEFAULT_QUERY_CUT, + heap_factor: float = DEFAULT_HEAP_FACTOR, + ): + dataset = [(str(idx), query) for idx, query in enumerate(queries)] + ids, reps = model.encode(dataset) + vectors = [{"vector": rep} for rep in reps] + return self.query_from_vectors(vectors, k=k, query_cut=query_cut, heap_factor=heap_factor) + + def query_from_vectors( + self, + queries: list[dict], + k: int = 1000, + query_cut: int = DEFAULT_QUERY_CUT, + heap_factor: float = DEFAULT_HEAP_FACTOR, + ): + string_type = _import_seismic().get_seismic_string() + + query_ids = [str(i) for i, _ in enumerate(queries)] + query_components = [_to_seismic_strings(list(q["vector"].keys()), string_type) for q in queries] + query_values = [np.array(list(q["vector"].values()), dtype=np.float32) for q in queries] + + results = self.index.batch_search( + queries_ids=np.array(query_ids, dtype=string_type), + query_components=query_components, + query_values=query_values, + k=k, + query_cut=query_cut, + heap_factor=heap_factor, + num_threads=THREADS, + ) + + # batch_search returns one list per query, each a list of (query_id, score, doc_id) tuples. + # we map by the returned query_id (rather than position) and return results aligned with the + # input queries, matching the {docid: score} shape produced by Anserini.query_from_vectors + scores_by_qid = {qid: {} for qid in query_ids} + for query_results in results: + for query_id, score, doc_id in query_results: + scores_by_qid[query_id][doc_id] = score + + return [scores_by_qid[qid] for qid in query_ids] + + @staticmethod + def build( + inputs, + index_path, + n_postings: int = DEFAULT_N_POSTINGS, + centroid_fraction: float = DEFAULT_CENTROID_FRACTION, + summary_energy: float = DEFAULT_SUMMARY_ENERGY, + min_cluster_size: int = DEFAULT_MIN_CLUSTER_SIZE, + max_fraction: float = DEFAULT_MAX_FRACTION, + nknn: int = DEFAULT_NKNN, + batched_indexing: int = DEFAULT_BATCHED_INDEXING, + variant: str = "standard", + method: str = "dataset", + ): + """Build a Seismic index from one or more encoded JSONL files and save it to index_path. + + Inputs are in the same format produced by `bsparse encode`: one JSON object per line with an + "id" and a "vector" (a {token: weight} dict). Unlike Anserini, Seismic indexes the float + weights directly, so no integer scaling is applied. `inputs` may be a single path or a list + of paths; each may be a plain or gzipped (.gz) JSONL file, or a directory of such files. + + method="dataset" streams docs into a SeismicDataset and calls build_from_dataset (the API + used by sentence-transformers). method="file" instead concatenates/decompresses the inputs + into a temporary uncompressed JSONL file and calls the file-based build; use it as a + fallback if build_from_dataset rejects a hyperparameter. + """ + # resolve files/directories up front and fail loudly on unreadable input (this is experiment + # code: we want a hard error, not a silently-skipped shard) + inputs = _expand_inputs(inputs) + + seismic = _import_seismic() + + if variant == "standard": + index_cls = seismic.SeismicIndex + elif variant == "large_vocab": + # needed for collections with more than 65k unique tokens (e.g. some multilingual LSR vocabs) + index_cls = seismic.SeismicIndexLV + else: + raise ValueError(f"unknown Seismic index variant: {variant}") + + build_kwargs = { + "n_postings": n_postings, + "centroid_fraction": centroid_fraction, + "summary_energy": summary_energy, + "min_cluster_size": min_cluster_size, + "max_fraction": max_fraction, + "nknn": nknn, + "batched_indexing": batched_indexing, + "num_threads": THREADS, + } + + if method == "dataset": + string_type = seismic.get_seismic_string() + dataset = seismic.SeismicDataset() + for doc_id, vector in tqdm(_iter_docs(inputs), desc="seismic: adding documents", leave=False): + dataset.add_document( + doc_id, + _to_seismic_strings(list(vector.keys()), string_type), + np.array(list(vector.values()), dtype=np.float32), + ) + + index = index_cls.build_from_dataset(dataset, **build_kwargs) + elif method == "file": + with _seismic_input(inputs) as build_input: + index = index_cls.build(build_input, **build_kwargs) + else: + raise ValueError(f"unknown Seismic build method: {method}") + + print("Number of documents:", index.len) + print("Avg number of non-zero components:", index.nnz / index.len) + print("Dimensionality of the vectors:", index.dim) + index.print_space_usage_byte() + + index.save(str(index_path)) + # seismic's save() appends INDEX_SUFFIX, so the on-disk filename differs from index_path + print(f"saved index to: {index_path}{INDEX_SUFFIX}") + return index diff --git a/tests/test_commands.py b/tests/test_commands.py new file mode 100644 index 0000000..792e3f5 --- /dev/null +++ b/tests/test_commands.py @@ -0,0 +1,32 @@ +import argparse + +import pytest + + +def _make_search(**kwargs): + # importing bsparse.commands requires trecrun/ir_datasets, which are optional in the test env + pytest.importorskip("trecrun") + pytest.importorskip("ir_datasets") + from bsparse.commands import Search + + defaults = {"backend": "anserini", "scale": None, "query_cut": None, "heap_factor": None} + defaults.update(kwargs) + return Search(argparse.Namespace(**defaults)) + + +def test_search_accepts_args_for_selected_backend(): + # args belonging to the chosen backend are fine; so is passing nothing + _make_search(backend="anserini")._check_backend_args() + _make_search(backend="anserini", scale=80)._check_backend_args() + _make_search(backend="seismic", query_cut=12, heap_factor=0.9)._check_backend_args() + + +def test_search_rejects_args_for_other_backend(): + with pytest.raises(ValueError, match="seismic backend"): + _make_search(backend="anserini", heap_factor=0.9)._check_backend_args() + + with pytest.raises(ValueError, match="seismic backend"): + _make_search(backend="anserini", query_cut=5)._check_backend_args() + + with pytest.raises(ValueError, match="anserini backend"): + _make_search(backend="seismic", scale=80)._check_backend_args() diff --git a/tests/test_seismic.py b/tests/test_seismic.py new file mode 100644 index 0000000..d4516f3 --- /dev/null +++ b/tests/test_seismic.py @@ -0,0 +1,302 @@ +import os +import types + +import numpy as np +import pytest + + +def fake_seismic_module(**attrs): + """A stand-in for the native 'seismic' module, so tests run without it installed.""" + return types.SimpleNamespace(get_seismic_string=lambda: "U30", **attrs) + + +def test_query_from_vectors_builds_arrays_and_maps_results(monkeypatch): + """Seismic.query_from_vectors should convert each query dict into numpy component/value arrays + and map the (query_id, score, doc_id) tuples back into per-query {doc_id: score} dicts, aligned + with the input order. This exercises the conversion/mapping without the native 'seismic' lib.""" + import bsparse.seismic as seismic_mod + from bsparse.seismic import Seismic + + # avoid importing the native package: stub the string type and capture the batch_search args + monkeypatch.setattr(seismic_mod, "_import_seismic", fake_seismic_module) + + captured = {} + + class FakeIndex: + def batch_search(self, queries_ids, query_components, query_values, **kwargs): + captured["queries_ids"] = queries_ids + captured["query_components"] = query_components + captured["query_values"] = query_values + captured["kwargs"] = kwargs + # one result list per query; tuples are (query_id, score, doc_id) + return [ + [("0", 2.5, "docA"), ("0", 1.0, "docB")], + [("1", 3.0, "docC")], + ] + + retriever = Seismic("ignored") + retriever._index = FakeIndex() + + queries = [{"vector": {"dog": 1.5, "cat": 0.5}}, {"vector": {"fish": 2.0}}] + results = retriever.query_from_vectors(queries, k=5, query_cut=7, heap_factor=0.9) + + assert results == [{"docA": 2.5, "docB": 1.0}, {"docC": 3.0}] + + # the native batch_search requires an ndarray[str] (not a list) for the query ids + assert captured["queries_ids"].dtype == np.dtype("U30") + assert list(captured["queries_ids"]) == ["0", "1"] + assert captured["kwargs"]["k"] == 5 + assert captured["kwargs"]["query_cut"] == 7 + assert captured["kwargs"]["heap_factor"] == 0.9 + + assert captured["query_components"][0].dtype == np.dtype("U30") + assert list(captured["query_components"][0]) == ["dog", "cat"] + assert captured["query_values"][0].dtype == np.float32 + assert list(captured["query_values"][1]) == pytest.approx([2.0]) + + +def test_warns_when_tokens_exceed_string_dtype_width(monkeypatch): + """Tokens longer than seismic's fixed-width string dtype (30 chars for U30) are silently + truncated by numpy; query_from_vectors should warn (once) when that would happen, and stay + silent when all tokens fit.""" + import warnings + + import bsparse.seismic as seismic_mod + from bsparse.seismic import Seismic + + monkeypatch.setattr(seismic_mod, "_import_seismic", fake_seismic_module) + monkeypatch.setattr(seismic_mod, "_truncation_warned", False) + + class FakeIndex: + def batch_search(self, queries_ids, query_components, query_values, **kwargs): + return [[] for _ in queries_ids] + + retriever = Seismic("ignored") + retriever._index = FakeIndex() + + # tokens within the 30-char width: no warning + with warnings.catch_warnings(): + warnings.simplefilter("error") + retriever.query_from_vectors([{"vector": {"x" * 30: 1.0}}]) + + # a 31-char token would be truncated: warn, naming the offending token + long_token = "x" * 31 + with pytest.warns(UserWarning, match="truncated.*x{31}"): + retriever.query_from_vectors([{"vector": {long_token: 1.0}}]) + + # the warning fires only once per process, so big builds aren't flooded + with warnings.catch_warnings(): + warnings.simplefilter("error") + retriever.query_from_vectors([{"vector": {long_token: 1.0}}]) + + +def test_build_rejects_missing_input(): + from bsparse.seismic import Seismic + + # input validation happens before the native lib is imported, so this fails loudly even + # without 'seismic' installed + with pytest.raises(FileNotFoundError, match="readable file"): + Seismic.build("does-not-exist.jsonl", "out_index") + + +def test_build_dataset_method_streams_docs(tmp_path, monkeypatch): + """The default build method should stream docs (plain and .gz) into a SeismicDataset via + add_document and pass the hyperparameters to build_from_dataset.""" + import gzip + import json + + import bsparse.seismic as seismic_mod + from bsparse.seismic import Seismic + + plain = tmp_path / "docs-0.jsonl" + plain.write_text( + json.dumps({"id": "a", "vector": {"dog": 1.5, "pet": 0.5}}) + "\n" + json.dumps({"id": "b", "vector": {"cat": 2.0}}), + encoding="utf-8", + ) + gzipped = tmp_path / "docs-1.jsonl.gz" + with gzip.open(gzipped, "wt", encoding="utf-8") as f: + f.write(json.dumps({"id": 3, "vector": {"fish": 3.0}}) + "\n") + + added = [] + captured = {} + + class FakeDataset: + def add_document(self, doc_id, components, values): + added.append((doc_id, components, values)) + + class FakeBuiltIndex: + len = 3 + nnz = 4 + dim = 4 + + def print_space_usage_byte(self): + pass + + def save(self, path): + captured["saved_to"] = path + + class FakeSeismicIndex: + @staticmethod + def build_from_dataset(dataset, **kwargs): + captured["dataset"] = dataset + captured["kwargs"] = kwargs + return FakeBuiltIndex() + + monkeypatch.setattr( + seismic_mod, + "_import_seismic", + lambda: fake_seismic_module(SeismicDataset=FakeDataset, SeismicIndex=FakeSeismicIndex), + ) + + Seismic.build([plain, gzipped], tmp_path / "out_index", n_postings=123, nknn=7) + + # docs streamed in order, ids coerced to str, arrays use the seismic dtypes + assert [d[0] for d in added] == ["a", "b", "3"] + assert list(added[0][1]) == ["dog", "pet"] + assert added[0][1].dtype == np.dtype("U30") + assert added[0][2].dtype == np.float32 + assert list(added[2][2]) == pytest.approx([3.0]) + + assert isinstance(captured["dataset"], FakeDataset) + assert captured["kwargs"]["n_postings"] == 123 + assert captured["kwargs"]["nknn"] == 7 + assert captured["kwargs"]["centroid_fraction"] == seismic_mod.DEFAULT_CENTROID_FRACTION + assert captured["saved_to"] == str(tmp_path / "out_index") + + +def test_build_dataset_method_fails_loudly_on_bad_json(tmp_path, monkeypatch): + import bsparse.seismic as seismic_mod + from bsparse.seismic import Seismic + + fn = tmp_path / "docs.jsonl" + fn.write_text('{"id": "a", "vector": {"dog": 1.0}}\nnot json\n', encoding="utf-8") + + class FakeDataset: + def add_document(self, *args): + pass + + monkeypatch.setattr( + seismic_mod, + "_import_seismic", + lambda: fake_seismic_module(SeismicDataset=FakeDataset, SeismicIndex=None), + ) + + with pytest.raises(ValueError, match="line 2"): + Seismic.build(fn, tmp_path / "out_index") + + +def test_expand_inputs_handles_files_dirs_and_order(tmp_path): + from bsparse.seismic import _expand_inputs + + explicit = tmp_path / "explicit.jsonl" + explicit.write_text("{}\n", encoding="utf-8") + + docs_dir = tmp_path / "shards" + docs_dir.mkdir() + (docs_dir / "b.jsonl").write_text("{}\n", encoding="utf-8") + (docs_dir / "a.jsonl.gz").write_text("{}\n", encoding="utf-8") + (docs_dir / "ignore.txt").write_text("nope\n", encoding="utf-8") + + # a directory expands to its sorted .jsonl/.jsonl.gz files; non-matching files are ignored; + # explicit args keep their position + result = _expand_inputs([str(explicit), str(docs_dir)]) + assert result == [ + str(explicit), + str(docs_dir / "a.jsonl.gz"), + str(docs_dir / "b.jsonl"), + ] + + +def test_expand_inputs_fails_loudly(tmp_path): + from bsparse.seismic import _expand_inputs + + with pytest.raises(FileNotFoundError, match="file or directory"): + _expand_inputs(str(tmp_path / "does-not-exist")) + + empty_dir = tmp_path / "empty" + empty_dir.mkdir() + with pytest.raises(FileNotFoundError, match="no .jsonl"): + _expand_inputs(str(empty_dir)) + + +def test_resolve_index_path_accepts_prefix_or_filename(tmp_path): + """seismic's save(path) writes to path + '.index.seismic' but load() wants the real filename, so + searching with the same --index used at build time must resolve to the suffixed file.""" + from bsparse.seismic import _resolve_index_path + + prefix = tmp_path / "myindex" + on_disk = tmp_path / "myindex.index.seismic" + on_disk.write_bytes(b"") + + assert _resolve_index_path(str(prefix)) == str(on_disk) + assert _resolve_index_path(str(on_disk)) == str(on_disk) + + with pytest.raises(FileNotFoundError, match="index.seismic"): + _resolve_index_path(str(tmp_path / "missing")) + + +def test_seismic_input_single_plain_file_used_directly(tmp_path): + from bsparse.seismic import _seismic_input + + fn = tmp_path / "docs.jsonl" + fn.write_text('{"id": "a", "vector": {"dog": 1.0}}\n', encoding="utf-8") + + with _seismic_input([str(fn)]) as build_input: + # a single uncompressed file is passed through untouched (no temp copy) + assert build_input == str(fn) + + +def test_seismic_input_concatenates_and_decompresses(tmp_path): + import gzip + import json + + from bsparse.seismic import _seismic_input + + plain = tmp_path / "docs-0.jsonl" + # no trailing newline on the last line, to check files are joined safely + plain.write_text('{"id": "a", "vector": {"dog": 1.0}}\n{"id": "b", "vector": {"cat": 2.0}}', encoding="utf-8") + + gzipped = tmp_path / "docs-1.jsonl.gz" + with gzip.open(gzipped, "wt", encoding="utf-8") as f: + f.write('{"id": "c", "vector": {"fish": 3.0}}\n') + + with _seismic_input([str(plain), str(gzipped)]) as build_input: + assert build_input != str(plain) # a temp combined file + lines = [json.loads(line) for line in open(build_input, encoding="utf-8")] + + ids = [d["id"] for d in lines] + assert ids == ["a", "b", "c"] + + # the temp file is cleaned up on exit + assert not os.path.exists(build_input) + + +@pytest.mark.parametrize("method", ["dataset", "file"]) +def test_build_and_search_end_to_end(tmp_path, method): + """End-to-end build + search against the real Seismic library (skipped if not installed).""" + pytest.importorskip("seismic") + + import json + + from bsparse.seismic import Seismic + + docs = [ + {"id": "doc1", "vector": {"dog": 2.0, "pet": 1.0}}, + {"id": "doc2", "vector": {"cat": 2.0, "pet": 1.0}}, + {"id": "doc3", "vector": {"car": 2.0, "road": 1.0}}, + ] + input_fn = tmp_path / "docs.jsonl" + with open(input_fn, "wt", encoding="utf-8") as f: + for d in docs: + print(json.dumps(d), file=f) + + index_path = tmp_path / "index.seismic" + Seismic.build(input_fn, index_path, n_postings=100, centroid_fraction=0.2, min_cluster_size=1, method=method) + + retriever = Seismic(str(index_path)) + results = retriever.query_from_vectors([{"vector": {"dog": 1.0, "pet": 1.0}}], k=3) + + assert len(results) == 1 + assert "doc1" in results[0] + # the dog/pet query should rank doc1 (dog+pet) above doc3 (unrelated) + assert results[0].get("doc1", 0) >= results[0].get("doc3", 0)