Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/ov_cli/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,10 +596,15 @@ impl HttpClient {

self.post("/api/v1/resources", &body).await
} else if path_obj.is_file() {
let source_name = path_obj
.file_name()
.and_then(|n| n.to_str())
.map(|s| s.to_string());
let temp_file_id = self.upload_temp_file(path_obj).await?;

let body = serde_json::json!({
"temp_file_id": temp_file_id,
"source_name": source_name,
"to": to,
"parent": parent,
"reason": reason,
Expand Down
25 changes: 21 additions & 4 deletions openviking/parse/parsers/markdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,17 @@ async def parse_content(
await viking_fs.mkdir(temp_uri)
logger.debug(f"[MarkdownParser] Created temp directory: {temp_uri}")

# Get document title
explicit_name = kwargs.get("resource_name") or kwargs.get("source_name")

# Preserve the original uploaded filename when available instead of
# the temp upload name (e.g. upload_<uuid>.txt).
doc_title = meta.get("frontmatter", {}).get(
"title", Path(source_path).stem if source_path else "Document"
"title",
Path(explicit_name).stem
if explicit_name
else Path(source_path).stem
if source_path
else "Document",
)

# Create root directory
Expand All @@ -187,7 +195,13 @@ async def parse_content(
logger.info(f"[MarkdownParser] Found {len(headings)} headings")

# Parse and create directory structure
await self._parse_and_create_structure(content, headings, root_dir, source_path)
await self._parse_and_create_structure(
content,
headings,
root_dir,
source_path,
doc_name=self._sanitize_for_path(Path(doc_title).stem),
)

parse_time = time.time() - start_time
logger.info(f"[MarkdownParser] Parse completed in {parse_time:.2f}s")
Expand Down Expand Up @@ -365,6 +379,7 @@ async def _parse_and_create_structure(
headings: List[Tuple[int, int, str, int]],
root_dir: str,
source_path: Optional[str] = None,
doc_name: Optional[str] = None,
) -> None:
"""
Parse markdown and create directory structure directly in VikingFS.
Expand Down Expand Up @@ -395,7 +410,9 @@ async def _parse_and_create_structure(
await viking_fs.mkdir(root_dir)

# Get document name
doc_name = self._sanitize_for_path(Path(source_path).stem if source_path else "content")
doc_name = doc_name or self._sanitize_for_path(
Path(source_path).stem if source_path else "content"
)

# Small document: save as single file (check both token and char limits)
if estimated_tokens <= max_size and len(content) <= max_chars:
Expand Down
27 changes: 20 additions & 7 deletions openviking/parse/tree_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ async def finalize_from_temp(
viking_fs = get_viking_fs()
temp_uri = temp_dir_path

def is_resources_root(uri: Optional[str]) -> bool:
return (uri or "").rstrip("/") == "viking://resources"

# 1. Find document root directory
entries = await viking_fs.ls(temp_uri, ctx=ctx)
doc_dirs = [e for e in entries if e.get("isDir") and e["name"] not in [".", ".."]]
Expand Down Expand Up @@ -153,21 +156,31 @@ async def finalize_from_temp(
# 2. Determine base_uri and final document name with org/repo for GitHub/GitLab
auto_base_uri = self._get_base_uri(scope, source_path, source_format)
base_uri = parent_uri or auto_base_uri
use_to_as_parent = is_resources_root(to_uri)
# 3. Determine candidate_uri
if to_uri:
if to_uri and not use_to_as_parent:
candidate_uri = to_uri
else:
if parent_uri:
effective_parent_uri = parent_uri or to_uri if use_to_as_parent else parent_uri
if effective_parent_uri:
# Parent URI must exist and be a directory
try:
stat_result = await viking_fs.stat(parent_uri, ctx=ctx)
stat_result = await viking_fs.stat(effective_parent_uri, ctx=ctx)
except Exception as e:
raise FileNotFoundError(f"Parent URI does not exist: {parent_uri}") from e
raise FileNotFoundError(
f"Parent URI does not exist: {effective_parent_uri}"
) from e
if not stat_result.get("isDir"):
raise ValueError(f"Parent URI is not a directory: {parent_uri}")
raise ValueError(f"Parent URI is not a directory: {effective_parent_uri}")
base_uri = effective_parent_uri
candidate_uri = VikingURI(base_uri).join(final_doc_name).uri

if to_uri:
if to_uri and not use_to_as_parent:
final_uri = candidate_uri
elif use_to_as_parent:
# Treat an explicit resources root target as "import under this
# directory" while preserving the child URI so downstream logic can
# incrementally update viking://resources/<child> when it exists.
final_uri = candidate_uri
else:
final_uri = await self._resolve_unique_uri(candidate_uri)
Expand All @@ -177,7 +190,7 @@ async def finalize_from_temp(
source_format=source_format,
)
tree._root_uri = final_uri
if not to_uri:
if not to_uri or use_to_as_parent:
tree._candidate_uri = candidate_uri

# Create a minimal Context object for the root so that tree.root is not None
Expand Down
18 changes: 17 additions & 1 deletion openviking/utils/media_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,29 @@ async def _process_file(
try:
with zipfile.ZipFile(file_path, "r") as zipf:
safe_extract_zip(zipf, temp_dir)

extracted_entries = [p for p in temp_dir.iterdir() if p.name not in {".", ".."}]
if len(extracted_entries) == 1 and extracted_entries[0].is_dir():
dir_kwargs = dict(kwargs)
dir_kwargs.pop("source_name", None)
return await self._process_directory(
extracted_entries[0], instruction, **dir_kwargs
)

return await self._process_directory(temp_dir, instruction, **kwargs)
finally:
pass # Don't delete temp_dir yet, it will be used by TreeBuilder
source_name = kwargs.get("source_name")
if source_name:
kwargs["resource_name"] = Path(source_name).stem
kwargs.setdefault("source_name", source_name)
else:
kwargs.setdefault("resource_name", file_path.stem)

return await parse(
str(file_path),
instruction=instruction,
vlm_processor=self._get_vlm_processor(),
storage=self.storage,
resource_name=file_path.stem,
**kwargs,
)
75 changes: 54 additions & 21 deletions openviking/utils/summarizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
Handles summarization and key information extraction.
"""

from typing import TYPE_CHECKING, Any, Dict, List
from typing import TYPE_CHECKING, Any, Dict, List, Tuple

from openviking.core.directories import get_context_type_for_uri
from openviking.storage.queuefs import SemanticMsg, get_queue_manager
from openviking.storage.viking_fs import get_viking_fs
from openviking.telemetry import get_current_telemetry
from openviking.telemetry.request_wait_tracker import get_request_wait_tracker
from openviking_cli.utils import get_logger
from openviking_cli.utils.uri import VikingURI

if TYPE_CHECKING:
from openviking.parse.vlm import VLMProcessor
Expand Down Expand Up @@ -57,29 +59,60 @@ async def summarize(
enqueued_count = 0

telemetry = get_current_telemetry()

def is_resources_root(uri: str) -> bool:
return (uri or "").rstrip("/") == "viking://resources"

async def list_top_children(temp_uri: str) -> List[Tuple[str, str]]:
viking_fs = get_viking_fs()
entries = await viking_fs.ls(temp_uri, show_all_hidden=True, ctx=ctx)
children: List[Tuple[str, str]] = []
for entry in entries:
name = entry.get("name", "")
if not name or name in {".", ".."}:
continue
child_temp_uri = VikingURI(temp_uri).join(name).uri
children.append((name, child_temp_uri))
return children

for uri, temp_uri in zip(resource_uris, temp_uris, strict=True):
# Determine context_type based on URI
context_type = get_context_type_for_uri(uri)

msg = SemanticMsg(
uri=temp_uri,
context_type=context_type,
account_id=ctx.account_id,
user_id=ctx.user.user_id,
agent_id=ctx.user.agent_id,
role=ctx.role.value,
skip_vectorization=skip_vectorization,
telemetry_id=telemetry.telemetry_id,
target_uri=uri if uri != temp_uri else None,
lifecycle_lock_handle_id=lifecycle_lock_handle_id,
is_code_repo=kwargs.get("is_code_repo", False),
)
await semantic_queue.enqueue(msg)
if msg.telemetry_id:
get_request_wait_tracker().register_semantic_root(msg.telemetry_id, msg.id)
enqueued_count += 1
logger.info(
f"Enqueued semantic generation for: {uri} (skip_vectorization={skip_vectorization})"
)
enqueue_units: List[Tuple[str, str]] = []
if is_resources_root(uri) and uri != temp_uri:
children = await list_top_children(temp_uri)
if not children:
return {
"status": "error",
"message": f"no top-level import items found under temp uri: {temp_uri}",
}
for name, child_temp_uri in children:
child_target_uri = VikingURI("viking://resources").join(name).uri
enqueue_units.append((child_target_uri, child_temp_uri))
else:
enqueue_units.append((uri, temp_uri))

for target_uri, source_uri in enqueue_units:
msg = SemanticMsg(
uri=source_uri,
context_type=context_type,
account_id=ctx.account_id,
user_id=ctx.user.user_id,
agent_id=ctx.user.agent_id,
role=ctx.role.value,
skip_vectorization=skip_vectorization,
telemetry_id=telemetry.telemetry_id,
target_uri=target_uri if target_uri != source_uri else None,
lifecycle_lock_handle_id=lifecycle_lock_handle_id,
is_code_repo=kwargs.get("is_code_repo", False),
)
await semantic_queue.enqueue(msg)
if msg.telemetry_id:
get_request_wait_tracker().register_semantic_root(msg.telemetry_id, msg.id)
enqueued_count += 1
logger.info(
f"Enqueued semantic generation for: {target_uri} (skip_vectorization={skip_vectorization})"
)

return {"status": "success", "enqueued_count": enqueued_count}
1 change: 1 addition & 0 deletions openviking_cli/client/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ async def add_resource(
finally:
Path(zip_path).unlink(missing_ok=True)
elif path_obj.is_file():
request_data["source_name"] = path_obj.name
temp_file_id = await self._upload_temp_file(path)
request_data["temp_file_id"] = temp_file_id
else:
Expand Down
91 changes: 91 additions & 0 deletions tests/misc/test_media_processor_zip_root.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#!/usr/bin/env python3
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: AGPL-3.0

import zipfile
from pathlib import Path
from unittest.mock import AsyncMock

import pytest

from openviking.utils.media_processor import UnifiedResourceProcessor


@pytest.mark.asyncio
async def test_zip_single_top_level_dir_uses_real_root(tmp_path: Path):
zip_path = tmp_path / "tt_b.zip"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("tt_b/bb/readme.md", "# hello\n")

processor = UnifiedResourceProcessor()
processor._process_directory = AsyncMock(return_value="ok")

result = await processor._process_file(zip_path, instruction="")

assert result == "ok"
called_dir = processor._process_directory.await_args.args[0]
assert isinstance(called_dir, Path)
assert called_dir.name == "tt_b"


@pytest.mark.asyncio
async def test_zip_single_top_level_dir_ignores_zip_source_name(tmp_path: Path):
zip_path = tmp_path / "tt_b.zip"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("tt_b/bb/readme.md", "# hello\n")

processor = UnifiedResourceProcessor()
processor._process_directory = AsyncMock(return_value="ok")

result = await processor._process_file(
zip_path,
instruction="",
source_name="tt_b.zip",
)

assert result == "ok"
called_dir = processor._process_directory.await_args.args[0]
assert isinstance(called_dir, Path)
assert called_dir.name == "tt_b"
assert "source_name" not in processor._process_directory.await_args.kwargs


@pytest.mark.asyncio
async def test_zip_multiple_top_level_entries_keeps_extract_root(tmp_path: Path):
zip_path = tmp_path / "mixed.zip"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("a/readme.md", "# a\n")
zf.writestr("b/readme.md", "# b\n")

processor = UnifiedResourceProcessor()
processor._process_directory = AsyncMock(return_value="ok")

result = await processor._process_file(zip_path, instruction="")

assert result == "ok"
called_dir = processor._process_directory.await_args.args[0]
assert isinstance(called_dir, Path)
assert called_dir.name != "a"
assert called_dir.name != "b"


@pytest.mark.asyncio
async def test_single_file_uses_source_name_for_resource_name(tmp_path: Path):
file_path = tmp_path / "upload_123.txt"
file_path.write_text("hello\n")

processor = UnifiedResourceProcessor()

with pytest.MonkeyPatch.context() as mp:
parse_mock = AsyncMock(return_value="ok")
mp.setattr("openviking.utils.media_processor.parse", parse_mock)

result = await processor._process_file(
file_path,
instruction="",
source_name="aa.txt",
)

assert result == "ok"
assert parse_mock.await_args.kwargs["resource_name"] == "aa"
assert parse_mock.await_args.kwargs["source_name"] == "aa.txt"
Loading
Loading