Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 158 additions & 38 deletions apps/api/routers/books.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import math
import threading
from collections import defaultdict
from typing import Any

from cachetools import TTLCache
Expand All @@ -11,12 +12,30 @@
from sqlalchemy.orm import Session, selectinload

from bookdb.db.crud import ReviewCRUD
from bookdb.db.models import Author, Book, BookAuthor, BookRating, Review, ReviewComment, ReviewLike, User
from bookdb.models.chatbot_llm import create_groq_client_sync, generate_response_sync, rewrite_query_sync
from bookdb.db.models import (
Author,
Book,
BookAuthor,
BookRating,
Review,
ReviewComment,
ReviewLike,
User,
)
from bookdb.models.chatbot_llm import (
create_groq_client_sync,
generate_response_sync,
rewrite_query_sync,
)

from ..core.book_engagement import build_book_engagement_map
from ..core.book_metrics import get_metrics_for_goodreads_ids
from ..core.book_queries import BOOK_LOAD_OPTIONS, load_books_by_ids, load_books_by_goodreads_ids, serialize_books_with_engagement
from ..core.book_queries import (
BOOK_LOAD_OPTIONS,
load_books_by_ids,
load_books_by_goodreads_ids,
serialize_books_with_engagement,
)
from ..core.config import settings
from ..core.deps import get_current_user, get_db, get_optional_user
from ..core.embeddings import most_similar, most_similar_by_vector
Expand All @@ -29,6 +48,11 @@
_qdrant_failure_cache: TTLCache = TTLCache(maxsize=500, ttl=60)
_qdrant_lock = threading.Lock()

# Configuration for book diversification
_MAX_BOOKS_PER_AUTHOR = 2
# When requesting similar books, fetch this multiple of candidates to apply diversification
_CANDIDATE_MULTIPLIER = 3


def _empty_search_response() -> dict[str, Any]:
return {
Expand All @@ -39,7 +63,9 @@ def _empty_search_response() -> dict[str, Any]:
}


def _search_response_from_ranked_books(db: Session, books: list[Book]) -> dict[str, Any]:
def _search_response_from_ranked_books(
db: Session, books: list[Book]
) -> dict[str, Any]:
serialized = serialize_books_with_engagement(db, books)
return {
"directHit": serialized[0] if serialized else None,
Expand All @@ -54,6 +80,46 @@ def _book_author_names(book: Book) -> str:
return ", ".join(names) if names else "Unknown"


def _diversify_books_by_author(
books: list[Book], limit: int, max_per_author: int = _MAX_BOOKS_PER_AUTHOR
) -> list[Book]:
"""Diversify books by limiting the number of books from the same author.

Args:
books: List of books to diversify (ordered by relevance/similarity)
limit: Maximum number of books to return
max_per_author: Maximum number of books to include per author

Returns:
Diversified list of books with at most max_per_author books per author
"""
if not books or limit <= 0:
return []

# Track how many books we've included per author
author_count: dict[int, int] = defaultdict(int)

diversified: list[Book] = []
for book in books:
# Get all author IDs for this book
author_ids = {ba.author_id for ba in book.authors if ba.author_id is not None}

# Check if we've already included too many books from these authors
if any(author_count[author_id] >= max_per_author for author_id in author_ids):
continue

# Include this book
diversified.append(book)
for author_id in author_ids:
author_count[author_id] += 1

# Stop if we've reached the limit
if len(diversified) >= limit:
break

return diversified


def _payload_to_book_context(book: Book, payload: dict[str, Any]) -> str:
metadata = payload.get("metadata")
metadata = metadata if isinstance(metadata, dict) else {}
Expand Down Expand Up @@ -138,7 +204,9 @@ def _run_chatbot_search_pipeline(
return None, []

rewritten_text = "\n\n".join(
part.strip() for part in [rewritten_description, rewritten_review] if part and part.strip()
part.strip()
for part in [rewritten_description, rewritten_review]
if part and part.strip()
)
if not rewritten_text:
return None, []
Expand Down Expand Up @@ -186,10 +254,12 @@ def _run_chatbot_search_pipeline(
if book is None:
continue
ranked_books.append(book)
llm_books.append({
"book_id": int(book.id),
"description": _payload_to_book_context(book, hit.get("payload", {})),
})
llm_books.append(
{
"book_id": int(book.id),
"description": _payload_to_book_context(book, hit.get("payload", {})),
}
)

if not ranked_books:
return None, []
Expand Down Expand Up @@ -254,19 +324,19 @@ def _run_chatbot_search_pipeline(


def _load_book(db: Session, book_id: int) -> Book:
book = db.scalar(
select(Book)
.where(Book.id == book_id)
.options(*BOOK_LOAD_OPTIONS)
)
book = db.scalar(select(Book).where(Book.id == book_id).options(*BOOK_LOAD_OPTIONS))
if book is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Book not found")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Book not found"
)
return book


def _check_book_exists(db: Session, book_id: int) -> None:
if db.scalar(select(Book.id).where(Book.id == book_id).limit(1)) is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Book not found")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Book not found"
)


@router.get("/search")
Expand Down Expand Up @@ -350,12 +420,18 @@ def search_books(
candidate_ids = [int(row.id) for row in rows]
metrics_parquet_path: str | None = None
if request is not None:
metrics_parquet_path = getattr(request.app.state, "book_metrics_parquet_path", None)
metrics_parquet_path = getattr(
request.app.state, "book_metrics_parquet_path", None
)
popularity_by_goodreads_id: dict[int, int] = {}

if metrics_parquet_path is not None:
goodreads_ids = [int(row.goodreads_id) for row in rows if row.goodreads_id is not None]
metrics_by_id = get_metrics_for_goodreads_ids(metrics_parquet_path, goodreads_ids)
goodreads_ids = [
int(row.goodreads_id) for row in rows if row.goodreads_id is not None
]
metrics_by_id = get_metrics_for_goodreads_ids(
metrics_parquet_path, goodreads_ids
)
popularity_by_goodreads_id = {
gid: int(metrics.get("num_ratings", 0) or 0)
for gid, metrics in metrics_by_id.items()
Expand All @@ -369,7 +445,9 @@ def search_books(
.where(BookRating.book_id.in_(candidate_ids))
.group_by(BookRating.book_id)
).all()
rating_count_by_book_id = {int(row.book_id): int(row.rating_count or 0) for row in rating_rows}
rating_count_by_book_id = {
int(row.book_id): int(row.rating_count or 0) for row in rating_rows
}
popularity_by_goodreads_id = {
int(row.goodreads_id): rating_count_by_book_id.get(int(row.id), 0)
for row in rows
Expand All @@ -378,8 +456,14 @@ def search_books(

scored = []
for row in rows:
popularity = popularity_by_goodreads_id.get(int(row.goodreads_id), 0) if row.goodreads_id is not None else 0
combined_score = float(row.rank_score or 0) + math.log(max(popularity, 1)) * 50.0
popularity = (
popularity_by_goodreads_id.get(int(row.goodreads_id), 0)
if row.goodreads_id is not None
else 0
)
combined_score = (
float(row.rank_score or 0) + math.log(max(popularity, 1)) * 50.0
)
scored.append((combined_score, int(row.title_len or 0), int(row.id)))

scored.sort(key=lambda x: (-x[0], x[1], x[2]))
Expand Down Expand Up @@ -411,9 +495,12 @@ def get_book_reviews(
current_user: User | None = Depends(get_optional_user),
):
_check_book_exists(db, book_id)
total: int = db.scalar(
select(func.count()).select_from(Review).where(Review.book_id == book_id)
) or 0
total: int = (
db.scalar(
select(func.count()).select_from(Review).where(Review.book_id == book_id)
)
or 0
)
uid = current_user.id if current_user else None

# Own review always surfaces at the top of page 0 so the user always sees it.
Expand Down Expand Up @@ -456,7 +543,9 @@ def get_book_reviews(
.subquery()
)
stmt = stmt.add_columns(
case((my_likes_sq.c.review_id.is_not(None), 1), else_=0).label("is_liked_by_me")
case((my_likes_sq.c.review_id.is_not(None), 1), else_=0).label(
"is_liked_by_me"
)
).outerjoin(my_likes_sq, my_likes_sq.c.review_id == Review.id)
else:
stmt = stmt.add_columns(literal(0).label("is_liked_by_me"))
Expand Down Expand Up @@ -519,9 +608,13 @@ def get_related_books(
request: Request = None,
db: Session = Depends(get_db),
):
row = db.execute(select(Book.id, Book.goodreads_id).where(Book.id == book_id)).first()
row = db.execute(
select(Book.id, Book.goodreads_id).where(Book.id == book_id)
).first()
if row is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Book not found")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Book not found"
)
goodreads_id = row.goodreads_id

qdrant = getattr(request.app.state, "qdrant", None)
Expand All @@ -531,34 +624,61 @@ def get_related_books(
cached_ids = _qdrant_cache.get(goodreads_id)
recent_failure = _qdrant_failure_cache.get(goodreads_id, False)

if cached_ids is not None:
related = load_books_by_goodreads_ids(db, cached_ids)
return serialize_books_with_engagement(db, related)

# Skip cached results - they don't have diversification applied
# Let them naturally expire (30 min TTL)
Comment on lines 624 to +628
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The caching logic for Qdrant results appears to be ineffective. cached_ids are fetched from the cache on line 631 but are then ignored. The code proceeds to call most_similar on every request, bypassing the cache. This negates the benefit of caching, increasing latency and load on the Qdrant service.

To fix this, you should use the cached_ids if they exist, and only query Qdrant if there's a cache miss. The misleading comment on lines 634-635 should also be removed or updated.

if recent_failure:
qdrant = None

try:
if qdrant is not None:
similar_goodreads_ids = most_similar(qdrant, goodreads_id, top_k=limit)
# Fetch more candidates to apply diversification
candidate_top_k = min(limit * _CANDIDATE_MULTIPLIER, 100)
similar_goodreads_ids = most_similar(
qdrant, goodreads_id, top_k=candidate_top_k
)
if similar_goodreads_ids:
# Cache the larger candidate set (not the diversified final result)
# This allows us to diversify differently if limits change, while still
# reducing Qdrant load
with _qdrant_lock:
_qdrant_cache[goodreads_id] = similar_goodreads_ids
related = load_books_by_goodreads_ids(db, similar_goodreads_ids)
return serialize_books_with_engagement(db, related)

# Load all candidate books with author information
candidates = load_books_by_goodreads_ids(db, similar_goodreads_ids)

# Apply author diversification
diversified = _diversify_books_by_author(
candidates, limit, max_per_author=_MAX_BOOKS_PER_AUTHOR
)

# If we couldn't get enough diversified books, fill in with more from the original list
if len(diversified) < limit and len(candidates) > len(diversified):
# Find books not already in the diversified list
seen_ids = {book.id for book in diversified}
for book in candidates:
if len(diversified) >= limit:
break
if book.id not in seen_ids:
diversified.append(book)

return serialize_books_with_engagement(db, diversified[:limit])
except Exception as e:
with _qdrant_lock:
_qdrant_failure_cache[goodreads_id] = True
print(f"Qdrant recommend failed for book {book_id}: {e}")

# Fallback: popular books.
# Fallback: popular books (diversified by author)
fallback = db.scalars(
select(Book)
.outerjoin(BookRating, Book.id == BookRating.book_id)
.where(Book.id != book_id)
.group_by(Book.id)
.order_by(func.count(BookRating.user_id).desc(), Book.id.asc())
.options(*BOOK_LOAD_OPTIONS)
.limit(limit)
.limit(limit * _CANDIDATE_MULTIPLIER) # Fetch more candidates
).all()
return serialize_books_with_engagement(db, fallback)
fallback_list = fallback
diversified_fallback = _diversify_books_by_author(
fallback_list, limit, max_per_author=_MAX_BOOKS_PER_AUTHOR
)
return serialize_books_with_engagement(db, diversified_fallback)
Loading