Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions infrastructure/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ frontend:

The following values should be adjusted for the deployment:

> ⓘ INFO: If the backend pod gets `OOMKilled` (exit code `137`) on local k3d/Tilt setups, reduce `backend.workers` (each uvicorn worker is a separate Python process), disable reranking `RERANKER_ENABLED: false` or pin a smaller Flashrank model (e.g. `RERANKER_MODEL: ms-marco-TinyBERT-L-2-v2`), and/or increase the memory available to Docker/k3d.

```yaml
backend:
secrets:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@
{{- printf "%s-source-uploader-configmap" .Release.Name | trunc 63 | trimSuffix "-" -}}
{{- end -}}

{{- define "configmap.extractorSitemapName" -}}
{{- printf "%s-extractor-sitemap-configmap" .Release.Name | trunc 63 | trimSuffix "-" -}}
{{- end -}}

# image
{{- define "adminBackend.fullImageName" -}}
{{- $tag := default .Chart.AppVersion .Values.adminBackend.image.tag -}}
Expand Down
8 changes: 8 additions & 0 deletions infrastructure/rag/templates/extractor/configmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ template "configmap.extractorSitemapName" . }}
data:
{{- range $key, $value := .Values.extractor.envs.sitemap }}
{{ $key }}: {{ $value | quote }}
{{- end }}
2 changes: 2 additions & 0 deletions infrastructure/rag/templates/extractor/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ spec:
envFrom:
- configMapRef:
name: {{ template "configmap.s3Name" . }}
- configMapRef:
name: {{ template "configmap.extractorSitemapName" . }}
- secretRef:
name: {{ template "secret.s3Name" . }}
{{- $hfCacheDir := include "extractor.huggingfaceCacheDir" . }}
Expand Down
11 changes: 11 additions & 0 deletions infrastructure/rag/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ backend:
- "--loop"
- "asyncio"

# Note: Each uvicorn worker is a separate Python process and can significantly
# increase memory usage.
workers: 3
wsMaxQueue: 6

Expand Down Expand Up @@ -222,6 +224,7 @@ backend:
RERANKER_K_DOCUMENTS: 5
RERANKER_MIN_RELEVANCE_SCORE: 0.001
RERANKER_ENABLED: true
RERANKER_MODEL: "ms-marco-MultiBERT-L-12"
chatHistory:
CHAT_HISTORY_LIMIT: 4
CHAT_HISTORY_REVERSE: true
Expand Down Expand Up @@ -355,6 +358,7 @@ adminBackend:
USECASE_KEYVALUE_PORT: 6379
USECASE_KEYVALUE_HOST: "rag-keydb"
sourceUploader:
# Large sitemap ingestions (per-page summaries) can take > 1 hour.
SOURCE_UPLOADER_TIMEOUT: 3600

extractor:
Expand Down Expand Up @@ -408,6 +412,13 @@ extractor:
# Directory inside the container to use as writable cache for ModelScope / OCR models
modelscopeCacheDir: /var/modelscope

envs:
sitemap:
# Controls how HTML pages are parsed when loading from an XML sitemap.
# Options: "docusaurus" (default), "astro", "generic"
# Note: https://docs.stackit.cloud is built with Astro/Starlight -> use "astro".
SITEMAP_PARSER: docusaurus

adminFrontend:
name: admin-frontend
replicaCount: 1
Expand Down
1 change: 1 addition & 0 deletions libs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ For sitemap sources, additional parameters can be provided, e.g.:

Technically, all parameters of the `SitemapLoader` from LangChain can be provided.

The HTML parsing logic can be tuned via the `SITEMAP_PARSER` environment variable (default: `docusaurus`; options: `docusaurus`, `astro`, `generic`). For Helm deployments, set `extractor.envs.sitemap.SITEMAP_PARSER` in `infrastructure/rag/values.yaml`. You can also override the parser per upload by passing a `sitemap_parser` key/value pair (same options) in the `/upload_source` request (available as a dropdown in the admin frontend).

### 3.3 Replaceable parts

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ def _thread_worker(self, source_name, source_type, kwargs, timeout):
)
)
except asyncio.TimeoutError:
logger.error("Upload of %s timed out after %s seconds", source_name, timeout)
logger.error(
"Upload of %s timed out after %s seconds (increase SOURCE_UPLOADER_TIMEOUT to allow longer ingestions)",
source_name,
timeout,
)
self._key_value_store.upsert(source_name, Status.ERROR)
except Exception:
logger.exception("Error while uploading %s", source_name)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Module for enhancing the summary of pages by grouping information by page and summarizing each page."""

from asyncio import gather
import asyncio
from hashlib import sha256
from typing import Optional
from typing import Any

Comment on lines 5 to 7
Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate import statement. The from typing import Any appears on both line 5 and line 6. Remove the duplicate import.

Suggested change
from typing import Optional
from typing import Any
from typing import Optional, Any

Copilot uses AI. Check for mistakes.
from langchain_core.documents import Document
from langchain_core.runnables import RunnableConfig
Expand All @@ -25,8 +26,36 @@ class PageSummaryEnhancer(SummaryEnhancer):
"""

BASE64_IMAGE_KEY = "base64_image"
DOCUMENT_URL_KEY = "document_url"
DEFAULT_PAGE_NR = 1

@staticmethod
def _parse_max_concurrency(config: Optional[RunnableConfig]) -> int:
if not config:
return 1
raw = config.get("max_concurrency")
if raw is None:
return 1
try:
return max(1, int(raw))
except (TypeError, ValueError):
return 1

def _group_key(self, piece: Document) -> tuple[Any, ...]:
document_url = piece.metadata.get(self.DOCUMENT_URL_KEY)
page = piece.metadata.get("page", self.DEFAULT_PAGE_NR)

# For paged documents (PDF/docling/etc.) keep per-page summaries even if a shared document URL exists.
if isinstance(page, int) or (isinstance(page, str) and page != "Unknown Title"):
return ("page_number", document_url, page)
Comment on lines +49 to +50
Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for the edge case where page is a non-"Unknown Title" string value without a document_url. The _group_key method on line 49 will return ("page_number", None, page_string), which could incorrectly group documents from different sources that happen to have the same page name. Add a test to verify this behavior or adjust the logic to handle this case.

Copilot uses AI. Check for mistakes.

# For sources like sitemaps/confluence, `page` can be a non-unique title (or missing),
# so group by the page URL when available to ensure one summary per page.
if document_url:
return ("document_url", document_url)

return ("page", page)

async def _asummarize_page(self, page_pieces: list[Document], config: Optional[RunnableConfig]) -> Document:
full_page_content = " ".join([piece.page_content for piece in page_pieces])
summary = await self._summarizer.ainvoke(full_page_content, config)
Expand All @@ -39,24 +68,46 @@ async def _asummarize_page(self, page_pieces: list[Document], config: Optional[R
return Document(metadata=meta, page_content=summary)

async def _acreate_summary(self, information: list[Document], config: Optional[RunnableConfig]) -> list[Document]:
distinct_pages = []
grouped = self._group_information(information)
max_concurrency = self._parse_max_concurrency(config)
return await self._summarize_groups(grouped, config, max_concurrency=max_concurrency)

def _group_information(self, information: list[Document]) -> list[list[Document]]:
ordered_keys: list[tuple[Any, ...]] = []
groups: dict[tuple[Any, ...], list[Document]] = {}
for info in information:
if info.metadata.get("page", self.DEFAULT_PAGE_NR) not in distinct_pages:
distinct_pages.append(info.metadata.get("page", self.DEFAULT_PAGE_NR))

grouped = []
for page in distinct_pages:
group = []
for compare_info in information:
if compare_info.metadata.get("page", self.DEFAULT_PAGE_NR) == page:
group.append(compare_info)
if (
self._chunker_settings
and len(" ".join([item.page_content for item in group])) < self._chunker_settings.max_size
):
continue
grouped.append(group)

summary_tasks = [self._asummarize_page(info_group, config) for info_group in tqdm(grouped)]

return await gather(*summary_tasks)
key = self._group_key(info)
if key not in groups:
ordered_keys.append(key)
groups[key] = []
groups[key].append(info)
return [groups[key] for key in ordered_keys]

async def _summarize_groups(
self,
grouped: list[list[Document]],
config: Optional[RunnableConfig],
*,
max_concurrency: int,
) -> list[Document]:
if max_concurrency == 1:
summaries: list[Document] = []
for info_group in tqdm(grouped):
summaries.append(await self._asummarize_page(info_group, config))
return summaries

semaphore = asyncio.Semaphore(max_concurrency)
results: list[Document | None] = [None] * len(grouped)

async def _run(idx: int, info_group: list[Document]) -> tuple[int, Document]:
async with semaphore:
return idx, await self._asummarize_page(info_group, config)

tasks = [asyncio.create_task(_run(idx, info_group)) for idx, info_group in enumerate(grouped)]
with tqdm(total=len(tasks)) as pbar:
for task in asyncio.as_completed(tasks):
idx, summary = await task
results[idx] = summary
pbar.update(1)

return [summary for summary in results if summary is not None]
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,24 @@ def __init__(
self._semaphore = semaphore
self._retry_decorator_settings = create_retry_decorator_settings(summarizer_settings, retry_decorator_settings)

@staticmethod
def _parse_max_concurrency(config: RunnableConfig) -> Optional[int]:
"""Parse max concurrency from a RunnableConfig.

Returns
-------
Optional[int]
An integer >= 1 if configured and valid, otherwise None.
"""
max_concurrency = config.get("max_concurrency")
if max_concurrency is None:
return None

try:
return max(1, int(max_concurrency))
except (TypeError, ValueError):
return None
Comment on lines +47 to +63
Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _parse_max_concurrency method is missing the Parameters section in its docstring. Add a Parameters section to document the config parameter following NumPy style.

Copilot uses AI. Check for mistakes.

async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] = None) -> SummarizerOutput:
"""
Asynchronously invokes the summarization process on the given query.
Expand Down Expand Up @@ -77,9 +95,8 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig]
langchain_documents = self._chunker.split_documents([document])
logger.debug("Summarizing %d chunk(s)...", len(langchain_documents))

# Fan out with concurrency, bounded by your semaphore inside _summarize_chunk
tasks = [asyncio.create_task(self._summarize_chunk(doc.page_content, config)) for doc in langchain_documents]
outputs = await asyncio.gather(*tasks)
max_concurrency = self._parse_max_concurrency(config)
outputs = await self._summarize_documents(langchain_documents, config, max_concurrency=max_concurrency)

if len(outputs) == 1:
return outputs[0]
Expand All @@ -93,6 +110,34 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig]
)
return await self._summarize_chunk(merged, config)

async def _summarize_documents(
self,
documents: list[Document],
config: RunnableConfig,
*,
max_concurrency: Optional[int],
) -> list[SummarizerOutput]:
"""Summarize a set of already-chunked documents.

Notes
-----
This optionally limits task fan-out using a per-call semaphore (max_concurrency).
The actual LLM call concurrency is always bounded by the instance semaphore held
inside `_summarize_chunk`.
"""
Comment on lines +113 to +127
Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _summarize_documents method is missing the Parameters and Returns sections in its docstring. Add these sections following NumPy style to document the documents, config, and max_concurrency parameters, as well as the return type.

Copilot uses AI. Check for mistakes.
if max_concurrency == 1:
return [await self._summarize_chunk(doc.page_content, config) for doc in documents]

limiter: asyncio.Semaphore | None = asyncio.Semaphore(max_concurrency) if max_concurrency is not None else None

async def _run(doc: Document) -> SummarizerOutput:
if limiter is None:
return await self._summarize_chunk(doc.page_content, config)
async with limiter:
return await self._summarize_chunk(doc.page_content, config)

return await asyncio.gather(*(_run(doc) for doc in documents))

def _create_chain(self) -> Runnable:
return self._langfuse_manager.get_base_prompt(self.__class__.__name__) | self._langfuse_manager.get_base_llm(
self.__class__.__name__
Expand Down
57 changes: 57 additions & 0 deletions libs/admin-api-lib/tests/langchain_summarizer_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import asyncio

import pytest
from langchain_core.documents import Document

from admin_api_lib.impl.settings.summarizer_settings import SummarizerSettings
from admin_api_lib.impl.summarizer.langchain_summarizer import LangchainSummarizer
from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings
from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore


class _StaticChunker:
def __init__(self, docs: list[Document]):
self._docs = docs

def split_documents(self, _docs: list[Document]) -> list[Document]:
return self._docs


class _ConcurrencyTrackingSummarizer(LangchainSummarizer):
def __init__(self, docs: list[Document]):
super().__init__(
langfuse_manager=object(), # type: ignore[arg-type]
chunker=_StaticChunker(docs), # type: ignore[arg-type]
semaphore=AsyncThreadsafeSemaphore(100),
summarizer_settings=SummarizerSettings(),
retry_decorator_settings=RetryDecoratorSettings(),
)
self.in_flight = 0
self.max_in_flight = 0

async def _summarize_chunk(self, text: str, config): # type: ignore[override]
self.in_flight += 1
self.max_in_flight = max(self.max_in_flight, self.in_flight)
await asyncio.sleep(0.01)
self.in_flight -= 1
return text


@pytest.mark.asyncio
async def test_langchain_summarizer_respects_max_concurrency_one():
docs = [Document(page_content=f"chunk-{idx}") for idx in range(5)]
summarizer = _ConcurrencyTrackingSummarizer(docs)

await summarizer.ainvoke("input", config={"max_concurrency": 1})

assert summarizer.max_in_flight == 1


@pytest.mark.asyncio
async def test_langchain_summarizer_respects_max_concurrency_limit():
docs = [Document(page_content=f"chunk-{idx}") for idx in range(8)]
summarizer = _ConcurrencyTrackingSummarizer(docs)

await summarizer.ainvoke("input", config={"max_concurrency": 2})

assert summarizer.max_in_flight <= 2
Loading
Loading