diff --git a/infrastructure/README.md b/infrastructure/README.md
index a6e5ba0f..fddffec4 100644
--- a/infrastructure/README.md
+++ b/infrastructure/README.md
@@ -257,6 +257,8 @@ frontend:
The following values should be adjusted for the deployment:
+> ⓘ INFO: If the backend pod gets `OOMKilled` (exit code `137`) on local k3d/Tilt setups, reduce `backend.workers` (each uvicorn worker is a separate Python process), disable reranking `RERANKER_ENABLED: false` or pin a smaller Flashrank model (e.g. `RERANKER_MODEL: ms-marco-TinyBERT-L-2-v2`), and/or increase the memory available to Docker/k3d.
+
```yaml
backend:
secrets:
diff --git a/infrastructure/rag/templates/_admin_backend_and_extractor_helpers.tpl b/infrastructure/rag/templates/_admin_backend_and_extractor_helpers.tpl
index 5f851477..0f860234 100644
--- a/infrastructure/rag/templates/_admin_backend_and_extractor_helpers.tpl
+++ b/infrastructure/rag/templates/_admin_backend_and_extractor_helpers.tpl
@@ -68,6 +68,10 @@
{{- printf "%s-source-uploader-configmap" .Release.Name | trunc 63 | trimSuffix "-" -}}
{{- end -}}
+{{- define "configmap.extractorSitemapName" -}}
+{{- printf "%s-extractor-sitemap-configmap" .Release.Name | trunc 63 | trimSuffix "-" -}}
+{{- end -}}
+
# image
{{- define "adminBackend.fullImageName" -}}
{{- $tag := default .Chart.AppVersion .Values.adminBackend.image.tag -}}
diff --git a/infrastructure/rag/templates/extractor/configmap.yaml b/infrastructure/rag/templates/extractor/configmap.yaml
new file mode 100644
index 00000000..5f02f2c0
--- /dev/null
+++ b/infrastructure/rag/templates/extractor/configmap.yaml
@@ -0,0 +1,8 @@
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: {{ template "configmap.extractorSitemapName" . }}
+data:
+ {{- range $key, $value := .Values.extractor.envs.sitemap }}
+ {{ $key }}: {{ $value | quote }}
+ {{- end }}
diff --git a/infrastructure/rag/templates/extractor/deployment.yaml b/infrastructure/rag/templates/extractor/deployment.yaml
index adfd2e38..7b5e16de 100644
--- a/infrastructure/rag/templates/extractor/deployment.yaml
+++ b/infrastructure/rag/templates/extractor/deployment.yaml
@@ -110,6 +110,8 @@ spec:
envFrom:
- configMapRef:
name: {{ template "configmap.s3Name" . }}
+ - configMapRef:
+ name: {{ template "configmap.extractorSitemapName" . }}
- secretRef:
name: {{ template "secret.s3Name" . }}
{{- $hfCacheDir := include "extractor.huggingfaceCacheDir" . }}
diff --git a/infrastructure/rag/values.yaml b/infrastructure/rag/values.yaml
index 78226195..636426e9 100644
--- a/infrastructure/rag/values.yaml
+++ b/infrastructure/rag/values.yaml
@@ -100,6 +100,8 @@ backend:
- "--loop"
- "asyncio"
+ # Note: Each uvicorn worker is a separate Python process and can significantly
+ # increase memory usage.
workers: 3
wsMaxQueue: 6
@@ -222,6 +224,7 @@ backend:
RERANKER_K_DOCUMENTS: 5
RERANKER_MIN_RELEVANCE_SCORE: 0.001
RERANKER_ENABLED: true
+ RERANKER_MODEL: "ms-marco-MultiBERT-L-12"
chatHistory:
CHAT_HISTORY_LIMIT: 4
CHAT_HISTORY_REVERSE: true
@@ -355,6 +358,7 @@ adminBackend:
USECASE_KEYVALUE_PORT: 6379
USECASE_KEYVALUE_HOST: "rag-keydb"
sourceUploader:
+ # Large sitemap ingestions (per-page summaries) can take > 1 hour.
SOURCE_UPLOADER_TIMEOUT: 3600
extractor:
@@ -408,6 +412,13 @@ extractor:
# Directory inside the container to use as writable cache for ModelScope / OCR models
modelscopeCacheDir: /var/modelscope
+ envs:
+ sitemap:
+ # Controls how HTML pages are parsed when loading from an XML sitemap.
+ # Options: "docusaurus" (default), "astro", "generic"
+ # Note: https://docs.stackit.cloud is built with Astro/Starlight -> use "astro".
+ SITEMAP_PARSER: docusaurus
+
adminFrontend:
name: admin-frontend
replicaCount: 1
diff --git a/libs/README.md b/libs/README.md
index ec608ee0..bd6c5f71 100644
--- a/libs/README.md
+++ b/libs/README.md
@@ -331,6 +331,7 @@ For sitemap sources, additional parameters can be provided, e.g.:
Technically, all parameters of the `SitemapLoader` from LangChain can be provided.
+The HTML parsing logic can be tuned via the `SITEMAP_PARSER` environment variable (default: `docusaurus`; options: `docusaurus`, `astro`, `generic`). For Helm deployments, set `extractor.envs.sitemap.SITEMAP_PARSER` in `infrastructure/rag/values.yaml`. You can also override the parser per upload by passing a `sitemap_parser` key/value pair (same options) in the `/upload_source` request (available as a dropdown in the admin frontend).
### 3.3 Replaceable parts
diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_source_uploader.py b/libs/admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_source_uploader.py
index 6a05ba2f..f896df3d 100644
--- a/libs/admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_source_uploader.py
+++ b/libs/admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_source_uploader.py
@@ -149,7 +149,11 @@ def _thread_worker(self, source_name, source_type, kwargs, timeout):
)
)
except asyncio.TimeoutError:
- logger.error("Upload of %s timed out after %s seconds", source_name, timeout)
+ logger.error(
+ "Upload of %s timed out after %s seconds (increase SOURCE_UPLOADER_TIMEOUT to allow longer ingestions)",
+ source_name,
+ timeout,
+ )
self._key_value_store.upsert(source_name, Status.ERROR)
except Exception:
logger.exception("Error while uploading %s", source_name)
diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/information_enhancer/page_summary_enhancer.py b/libs/admin-api-lib/src/admin_api_lib/impl/information_enhancer/page_summary_enhancer.py
index 6ebadf34..83053324 100644
--- a/libs/admin-api-lib/src/admin_api_lib/impl/information_enhancer/page_summary_enhancer.py
+++ b/libs/admin-api-lib/src/admin_api_lib/impl/information_enhancer/page_summary_enhancer.py
@@ -1,8 +1,9 @@
"""Module for enhancing the summary of pages by grouping information by page and summarizing each page."""
-from asyncio import gather
+import asyncio
from hashlib import sha256
from typing import Optional
+from typing import Any
from langchain_core.documents import Document
from langchain_core.runnables import RunnableConfig
@@ -25,8 +26,36 @@ class PageSummaryEnhancer(SummaryEnhancer):
"""
BASE64_IMAGE_KEY = "base64_image"
+ DOCUMENT_URL_KEY = "document_url"
DEFAULT_PAGE_NR = 1
+ @staticmethod
+ def _parse_max_concurrency(config: Optional[RunnableConfig]) -> int:
+ if not config:
+ return 1
+ raw = config.get("max_concurrency")
+ if raw is None:
+ return 1
+ try:
+ return max(1, int(raw))
+ except (TypeError, ValueError):
+ return 1
+
+ def _group_key(self, piece: Document) -> tuple[Any, ...]:
+ document_url = piece.metadata.get(self.DOCUMENT_URL_KEY)
+ page = piece.metadata.get("page", self.DEFAULT_PAGE_NR)
+
+ # For paged documents (PDF/docling/etc.) keep per-page summaries even if a shared document URL exists.
+ if isinstance(page, int) or (isinstance(page, str) and page != "Unknown Title"):
+ return ("page_number", document_url, page)
+
+ # For sources like sitemaps/confluence, `page` can be a non-unique title (or missing),
+ # so group by the page URL when available to ensure one summary per page.
+ if document_url:
+ return ("document_url", document_url)
+
+ return ("page", page)
+
async def _asummarize_page(self, page_pieces: list[Document], config: Optional[RunnableConfig]) -> Document:
full_page_content = " ".join([piece.page_content for piece in page_pieces])
summary = await self._summarizer.ainvoke(full_page_content, config)
@@ -39,24 +68,46 @@ async def _asummarize_page(self, page_pieces: list[Document], config: Optional[R
return Document(metadata=meta, page_content=summary)
async def _acreate_summary(self, information: list[Document], config: Optional[RunnableConfig]) -> list[Document]:
- distinct_pages = []
+ grouped = self._group_information(information)
+ max_concurrency = self._parse_max_concurrency(config)
+ return await self._summarize_groups(grouped, config, max_concurrency=max_concurrency)
+
+ def _group_information(self, information: list[Document]) -> list[list[Document]]:
+ ordered_keys: list[tuple[Any, ...]] = []
+ groups: dict[tuple[Any, ...], list[Document]] = {}
for info in information:
- if info.metadata.get("page", self.DEFAULT_PAGE_NR) not in distinct_pages:
- distinct_pages.append(info.metadata.get("page", self.DEFAULT_PAGE_NR))
-
- grouped = []
- for page in distinct_pages:
- group = []
- for compare_info in information:
- if compare_info.metadata.get("page", self.DEFAULT_PAGE_NR) == page:
- group.append(compare_info)
- if (
- self._chunker_settings
- and len(" ".join([item.page_content for item in group])) < self._chunker_settings.max_size
- ):
- continue
- grouped.append(group)
-
- summary_tasks = [self._asummarize_page(info_group, config) for info_group in tqdm(grouped)]
-
- return await gather(*summary_tasks)
+ key = self._group_key(info)
+ if key not in groups:
+ ordered_keys.append(key)
+ groups[key] = []
+ groups[key].append(info)
+ return [groups[key] for key in ordered_keys]
+
+ async def _summarize_groups(
+ self,
+ grouped: list[list[Document]],
+ config: Optional[RunnableConfig],
+ *,
+ max_concurrency: int,
+ ) -> list[Document]:
+ if max_concurrency == 1:
+ summaries: list[Document] = []
+ for info_group in tqdm(grouped):
+ summaries.append(await self._asummarize_page(info_group, config))
+ return summaries
+
+ semaphore = asyncio.Semaphore(max_concurrency)
+ results: list[Document | None] = [None] * len(grouped)
+
+ async def _run(idx: int, info_group: list[Document]) -> tuple[int, Document]:
+ async with semaphore:
+ return idx, await self._asummarize_page(info_group, config)
+
+ tasks = [asyncio.create_task(_run(idx, info_group)) for idx, info_group in enumerate(grouped)]
+ with tqdm(total=len(tasks)) as pbar:
+ for task in asyncio.as_completed(tasks):
+ idx, summary = await task
+ results[idx] = summary
+ pbar.update(1)
+
+ return [summary for summary in results if summary is not None]
diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py
index d0b1e061..c4c4c2ee 100644
--- a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py
+++ b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py
@@ -44,6 +44,24 @@ def __init__(
self._semaphore = semaphore
self._retry_decorator_settings = create_retry_decorator_settings(summarizer_settings, retry_decorator_settings)
+ @staticmethod
+ def _parse_max_concurrency(config: RunnableConfig) -> Optional[int]:
+ """Parse max concurrency from a RunnableConfig.
+
+ Returns
+ -------
+ Optional[int]
+ An integer >= 1 if configured and valid, otherwise None.
+ """
+ max_concurrency = config.get("max_concurrency")
+ if max_concurrency is None:
+ return None
+
+ try:
+ return max(1, int(max_concurrency))
+ except (TypeError, ValueError):
+ return None
+
async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] = None) -> SummarizerOutput:
"""
Asynchronously invokes the summarization process on the given query.
@@ -77,9 +95,8 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig]
langchain_documents = self._chunker.split_documents([document])
logger.debug("Summarizing %d chunk(s)...", len(langchain_documents))
- # Fan out with concurrency, bounded by your semaphore inside _summarize_chunk
- tasks = [asyncio.create_task(self._summarize_chunk(doc.page_content, config)) for doc in langchain_documents]
- outputs = await asyncio.gather(*tasks)
+ max_concurrency = self._parse_max_concurrency(config)
+ outputs = await self._summarize_documents(langchain_documents, config, max_concurrency=max_concurrency)
if len(outputs) == 1:
return outputs[0]
@@ -93,6 +110,34 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig]
)
return await self._summarize_chunk(merged, config)
+ async def _summarize_documents(
+ self,
+ documents: list[Document],
+ config: RunnableConfig,
+ *,
+ max_concurrency: Optional[int],
+ ) -> list[SummarizerOutput]:
+ """Summarize a set of already-chunked documents.
+
+ Notes
+ -----
+ This optionally limits task fan-out using a per-call semaphore (max_concurrency).
+ The actual LLM call concurrency is always bounded by the instance semaphore held
+ inside `_summarize_chunk`.
+ """
+ if max_concurrency == 1:
+ return [await self._summarize_chunk(doc.page_content, config) for doc in documents]
+
+ limiter: asyncio.Semaphore | None = asyncio.Semaphore(max_concurrency) if max_concurrency is not None else None
+
+ async def _run(doc: Document) -> SummarizerOutput:
+ if limiter is None:
+ return await self._summarize_chunk(doc.page_content, config)
+ async with limiter:
+ return await self._summarize_chunk(doc.page_content, config)
+
+ return await asyncio.gather(*(_run(doc) for doc in documents))
+
def _create_chain(self) -> Runnable:
return self._langfuse_manager.get_base_prompt(self.__class__.__name__) | self._langfuse_manager.get_base_llm(
self.__class__.__name__
diff --git a/libs/admin-api-lib/tests/langchain_summarizer_test.py b/libs/admin-api-lib/tests/langchain_summarizer_test.py
new file mode 100644
index 00000000..b51cf98c
--- /dev/null
+++ b/libs/admin-api-lib/tests/langchain_summarizer_test.py
@@ -0,0 +1,57 @@
+import asyncio
+
+import pytest
+from langchain_core.documents import Document
+
+from admin_api_lib.impl.settings.summarizer_settings import SummarizerSettings
+from admin_api_lib.impl.summarizer.langchain_summarizer import LangchainSummarizer
+from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings
+from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore
+
+
+class _StaticChunker:
+ def __init__(self, docs: list[Document]):
+ self._docs = docs
+
+ def split_documents(self, _docs: list[Document]) -> list[Document]:
+ return self._docs
+
+
+class _ConcurrencyTrackingSummarizer(LangchainSummarizer):
+ def __init__(self, docs: list[Document]):
+ super().__init__(
+ langfuse_manager=object(), # type: ignore[arg-type]
+ chunker=_StaticChunker(docs), # type: ignore[arg-type]
+ semaphore=AsyncThreadsafeSemaphore(100),
+ summarizer_settings=SummarizerSettings(),
+ retry_decorator_settings=RetryDecoratorSettings(),
+ )
+ self.in_flight = 0
+ self.max_in_flight = 0
+
+ async def _summarize_chunk(self, text: str, config): # type: ignore[override]
+ self.in_flight += 1
+ self.max_in_flight = max(self.max_in_flight, self.in_flight)
+ await asyncio.sleep(0.01)
+ self.in_flight -= 1
+ return text
+
+
+@pytest.mark.asyncio
+async def test_langchain_summarizer_respects_max_concurrency_one():
+ docs = [Document(page_content=f"chunk-{idx}") for idx in range(5)]
+ summarizer = _ConcurrencyTrackingSummarizer(docs)
+
+ await summarizer.ainvoke("input", config={"max_concurrency": 1})
+
+ assert summarizer.max_in_flight == 1
+
+
+@pytest.mark.asyncio
+async def test_langchain_summarizer_respects_max_concurrency_limit():
+ docs = [Document(page_content=f"chunk-{idx}") for idx in range(8)]
+ summarizer = _ConcurrencyTrackingSummarizer(docs)
+
+ await summarizer.ainvoke("input", config={"max_concurrency": 2})
+
+ assert summarizer.max_in_flight <= 2
diff --git a/libs/admin-api-lib/tests/page_summary_enhancer_test.py b/libs/admin-api-lib/tests/page_summary_enhancer_test.py
new file mode 100644
index 00000000..6ebdc533
--- /dev/null
+++ b/libs/admin-api-lib/tests/page_summary_enhancer_test.py
@@ -0,0 +1,154 @@
+import asyncio
+from unittest.mock import AsyncMock
+
+import pytest
+from langchain_core.documents import Document
+
+from admin_api_lib.impl.information_enhancer.page_summary_enhancer import PageSummaryEnhancer
+from rag_core_lib.impl.data_types.content_type import ContentType
+
+
+@pytest.mark.asyncio
+async def test_page_summary_enhancer_groups_by_document_url_for_non_numeric_pages():
+ summarizer = AsyncMock()
+ summarizer.ainvoke = AsyncMock(return_value="summary")
+ enhancer = PageSummaryEnhancer(summarizer)
+
+ docs = [
+ Document(
+ page_content="page-a chunk-1",
+ metadata={
+ "id": "a1",
+ "related": [],
+ "type": ContentType.TEXT.value,
+ "page": "Unknown Title",
+ "document_url": "https://example.com/a",
+ },
+ ),
+ Document(
+ page_content="page-a chunk-2",
+ metadata={
+ "id": "a2",
+ "related": [],
+ "type": ContentType.TEXT.value,
+ "page": "Unknown Title",
+ "document_url": "https://example.com/a",
+ },
+ ),
+ Document(
+ page_content="page-b chunk-1",
+ metadata={
+ "id": "b1",
+ "related": [],
+ "type": ContentType.TEXT.value,
+ "page": "Unknown Title",
+ "document_url": "https://example.com/b",
+ },
+ ),
+ ]
+
+ summaries = await enhancer.ainvoke(docs)
+
+ assert summarizer.ainvoke.call_count == 2
+ assert len(summaries) == 2
+
+ assert summaries[0].metadata["document_url"] == "https://example.com/a"
+ assert set(summaries[0].metadata["related"]) == {"a1", "a2"}
+ assert summaries[0].metadata["type"] == ContentType.SUMMARY.value
+
+ assert summaries[1].metadata["document_url"] == "https://example.com/b"
+ assert set(summaries[1].metadata["related"]) == {"b1"}
+ assert summaries[1].metadata["type"] == ContentType.SUMMARY.value
+
+
+@pytest.mark.asyncio
+async def test_page_summary_enhancer_keeps_page_number_separation_for_paged_documents():
+ summarizer = AsyncMock()
+ summarizer.ainvoke = AsyncMock(return_value="summary")
+ enhancer = PageSummaryEnhancer(summarizer)
+
+ docs = [
+ Document(
+ page_content="page-1 chunk",
+ metadata={
+ "id": "p1",
+ "related": [],
+ "type": ContentType.TEXT.value,
+ "page": 1,
+ "document_url": "http://file.local/doc.pdf",
+ },
+ ),
+ Document(
+ page_content="page-2 chunk",
+ metadata={
+ "id": "p2",
+ "related": [],
+ "type": ContentType.TEXT.value,
+ "page": 2,
+ "document_url": "http://file.local/doc.pdf",
+ },
+ ),
+ ]
+
+ summaries = await enhancer.ainvoke(docs)
+
+ assert summarizer.ainvoke.call_count == 2
+ assert len(summaries) == 2
+ assert set(summaries[0].metadata["related"]) == {"p1"}
+ assert set(summaries[1].metadata["related"]) == {"p2"}
+
+
+class _ConcurrencyTrackingSummarizer:
+ def __init__(self) -> None:
+ self.in_flight = 0
+ self.max_in_flight = 0
+
+ async def ainvoke(self, _query: str, _config=None) -> str: # noqa: ANN001
+ self.in_flight += 1
+ self.max_in_flight = max(self.max_in_flight, self.in_flight)
+ await asyncio.sleep(0.01)
+ self.in_flight -= 1
+ return "summary"
+
+
+@pytest.mark.asyncio
+async def test_page_summary_enhancer_respects_max_concurrency_one():
+ summarizer = _ConcurrencyTrackingSummarizer()
+ enhancer = PageSummaryEnhancer(summarizer) # type: ignore[arg-type]
+
+ docs = [
+ Document(
+ page_content="page-a chunk",
+ metadata={
+ "id": "a1",
+ "related": [],
+ "type": ContentType.TEXT.value,
+ "page": "A",
+ "document_url": "https://example.com/a",
+ },
+ ),
+ Document(
+ page_content="page-b chunk",
+ metadata={
+ "id": "b1",
+ "related": [],
+ "type": ContentType.TEXT.value,
+ "page": "B",
+ "document_url": "https://example.com/b",
+ },
+ ),
+ Document(
+ page_content="page-c chunk",
+ metadata={
+ "id": "c1",
+ "related": [],
+ "type": ContentType.TEXT.value,
+ "page": "C",
+ "document_url": "https://example.com/c",
+ },
+ ),
+ ]
+
+ await enhancer.ainvoke(docs, config={"max_concurrency": 1})
+
+ assert summarizer.max_in_flight == 1
diff --git a/libs/extractor-api-lib/src/extractor_api_lib/dependency_container.py b/libs/extractor-api-lib/src/extractor_api_lib/dependency_container.py
index 16101921..cafb003b 100644
--- a/libs/extractor-api-lib/src/extractor_api_lib/dependency_container.py
+++ b/libs/extractor-api-lib/src/extractor_api_lib/dependency_container.py
@@ -1,7 +1,13 @@
"""Module for dependency injection container for managing application dependencies."""
from dependency_injector.containers import DeclarativeContainer
-from dependency_injector.providers import Factory, List, Singleton # noqa: WOT001
+from dependency_injector.providers import ( # noqa: WOT001
+ Configuration,
+ List,
+ Object,
+ Selector,
+ Singleton,
+)
from extractor_api_lib.impl.api_endpoints.general_file_extractor import (
GeneralFileExtractor,
@@ -41,11 +47,16 @@
from extractor_api_lib.impl.mapper.sitemap_document2information_piece import (
SitemapLangchainDocument2InformationPiece,
)
+from extractor_api_lib.impl.settings.sitemap_settings import SitemapSettings
from extractor_api_lib.impl.settings.s3_settings import S3Settings
from extractor_api_lib.impl.table_converter.dataframe2markdown import DataFrame2Markdown
from extractor_api_lib.impl.utils.sitemap_extractor_utils import (
- custom_sitemap_metadata_parser_function,
- custom_sitemap_parser_function,
+ astro_sitemap_metadata_parser_function,
+ astro_sitemap_parser_function,
+ docusaurus_sitemap_metadata_parser_function,
+ docusaurus_sitemap_parser_function,
+ generic_sitemap_metadata_parser_function,
+ generic_sitemap_parser_function,
)
@@ -55,8 +66,20 @@ class DependencyContainer(DeclarativeContainer):
# Settings
settings_s3 = S3Settings()
- sitemap_parsing_function = Factory(lambda: custom_sitemap_parser_function)
- sitemap_meta_function = Factory(lambda: custom_sitemap_metadata_parser_function)
+ sitemap_selector_config = Configuration(pydantic_settings=[SitemapSettings()])
+
+ sitemap_parsing_function = Selector(
+ sitemap_selector_config.parser,
+ docusaurus=Object(docusaurus_sitemap_parser_function),
+ astro=Object(astro_sitemap_parser_function),
+ generic=Object(generic_sitemap_parser_function),
+ )
+ sitemap_meta_function = Selector(
+ sitemap_selector_config.parser,
+ docusaurus=Object(docusaurus_sitemap_metadata_parser_function),
+ astro=Object(astro_sitemap_metadata_parser_function),
+ generic=Object(generic_sitemap_metadata_parser_function),
+ )
database_converter = Singleton(DataFrame2Markdown)
file_service = Singleton(S3Service, settings_s3)
diff --git a/libs/extractor-api-lib/src/extractor_api_lib/impl/extractors/sitemap_extractor.py b/libs/extractor-api-lib/src/extractor_api_lib/impl/extractors/sitemap_extractor.py
index 8710585d..94c72dbb 100644
--- a/libs/extractor-api-lib/src/extractor_api_lib/impl/extractors/sitemap_extractor.py
+++ b/libs/extractor-api-lib/src/extractor_api_lib/impl/extractors/sitemap_extractor.py
@@ -4,6 +4,7 @@
from langchain_community.document_loaders import SitemapLoader
import asyncio
import json
+import logging
from extractor_api_lib.impl.types.extractor_types import ExtractorTypes
from extractor_api_lib.models.dataclasses.internal_information_piece import InternalInformationPiece
@@ -12,6 +13,16 @@
from extractor_api_lib.impl.mapper.sitemap_document2information_piece import (
SitemapLangchainDocument2InformationPiece,
)
+from extractor_api_lib.impl.utils.sitemap_extractor_utils import (
+ astro_sitemap_metadata_parser_function,
+ astro_sitemap_parser_function,
+ docusaurus_sitemap_metadata_parser_function,
+ docusaurus_sitemap_parser_function,
+ generic_sitemap_metadata_parser_function,
+ generic_sitemap_parser_function,
+)
+
+logger = logging.getLogger(__name__)
class SitemapExtractor(InformationExtractor):
@@ -50,6 +61,27 @@ def mapper(self) -> SitemapLangchainDocument2InformationPiece:
"""Get the mapper instance."""
return self._mapper
+ @staticmethod
+ def _select_parser_functions(
+ parser_override: Optional[str],
+ ) -> tuple[Optional[callable], Optional[callable]]:
+ mapping = {
+ "docusaurus": (docusaurus_sitemap_parser_function, docusaurus_sitemap_metadata_parser_function),
+ "astro": (astro_sitemap_parser_function, astro_sitemap_metadata_parser_function),
+ "generic": (generic_sitemap_parser_function, generic_sitemap_metadata_parser_function),
+ }
+
+ if not parser_override:
+ return None, None
+
+ normalized = str(parser_override).strip().lower()
+
+ if normalized not in mapping:
+ logger.warning("Unknown sitemap_parser '%s'. Falling back to generic.", parser_override)
+ normalized = "generic"
+
+ return mapping[normalized]
+
async def aextract_content(
self,
extraction_parameters: ExtractionParameters,
@@ -67,18 +99,24 @@ async def aextract_content(
list[InternalInformationPiece]
A list of information pieces extracted from Sitemap.
"""
- sitemap_loader_parameters = self._parse_sitemap_loader_parameters(extraction_parameters)
+ sitemap_loader_parameters, parser_override = self._parse_sitemap_loader_parameters(extraction_parameters)
if "document_name" in sitemap_loader_parameters:
sitemap_loader_parameters.pop("document_name", None)
- # Only pass custom functions if they are provided
- if self._parsing_function is not None:
- # Get the actual function from the provider
- sitemap_loader_parameters["parsing_function"] = self._parsing_function
- if self._meta_function is not None:
- # Get the actual function from the provider
- sitemap_loader_parameters["meta_function"] = self._meta_function
+ parsing_function = self._parsing_function
+ meta_function = self._meta_function
+
+ override_parsing_function, override_meta_function = self._select_parser_functions(parser_override)
+ if override_parsing_function is not None:
+ parsing_function = override_parsing_function
+ if override_meta_function is not None:
+ meta_function = override_meta_function
+
+ if parsing_function is not None:
+ sitemap_loader_parameters["parsing_function"] = parsing_function
+ if meta_function is not None:
+ sitemap_loader_parameters["meta_function"] = meta_function
document_loader = SitemapLoader(**sitemap_loader_parameters)
documents = []
@@ -92,7 +130,9 @@ def load_documents():
raise ValueError(f"Failed to load documents from Sitemap: {e}")
return [self._mapper.map_document2informationpiece(x, extraction_parameters.document_name) for x in documents]
- def _parse_sitemap_loader_parameters(self, extraction_parameters: ExtractionParameters) -> dict:
+ def _parse_sitemap_loader_parameters(
+ self, extraction_parameters: ExtractionParameters
+ ) -> tuple[dict, Optional[str]]:
"""
Parse the extraction parameters to extract sitemap loader parameters.
@@ -107,7 +147,11 @@ def _parse_sitemap_loader_parameters(self, extraction_parameters: ExtractionPara
A dictionary containing the parsed sitemap loader parameters.
"""
sitemap_loader_parameters = {}
- for x in extraction_parameters.kwargs:
+ parser_override: Optional[str] = None
+ for x in extraction_parameters.kwargs or []:
+ if x.key in ("sitemap_parser", "parser"):
+ parser_override = str(x.value) if x.value is not None else None
+ continue
if x.key == "header_template" or x.key == "requests_kwargs":
try:
sitemap_loader_parameters[x.key] = json.loads(x.value)
@@ -120,4 +164,4 @@ def _parse_sitemap_loader_parameters(self, extraction_parameters: ExtractionPara
sitemap_loader_parameters[x.key] = x.value
else:
sitemap_loader_parameters[x.key] = int(x.value) if x.value.isdigit() else x.value
- return sitemap_loader_parameters
+ return sitemap_loader_parameters, parser_override
diff --git a/libs/extractor-api-lib/src/extractor_api_lib/impl/settings/sitemap_settings.py b/libs/extractor-api-lib/src/extractor_api_lib/impl/settings/sitemap_settings.py
new file mode 100644
index 00000000..9375493f
--- /dev/null
+++ b/libs/extractor-api-lib/src/extractor_api_lib/impl/settings/sitemap_settings.py
@@ -0,0 +1,18 @@
+"""Settings for sitemap extraction."""
+
+from typing import Literal
+
+from pydantic import Field
+from pydantic_settings import BaseSettings
+
+
+class SitemapSettings(BaseSettings):
+ """Controls sitemap HTML parsing defaults."""
+
+ class Config:
+ """Config class for reading Fields from env."""
+
+ env_prefix = "SITEMAP_"
+ case_sensitive = False
+
+ parser: Literal["docusaurus", "astro", "generic"] = Field(default="docusaurus")
diff --git a/libs/extractor-api-lib/src/extractor_api_lib/impl/utils/sitemap_extractor_utils.py b/libs/extractor-api-lib/src/extractor_api_lib/impl/utils/sitemap_extractor_utils.py
index fa738ad5..0be9fcf6 100644
--- a/libs/extractor-api-lib/src/extractor_api_lib/impl/utils/sitemap_extractor_utils.py
+++ b/libs/extractor-api-lib/src/extractor_api_lib/impl/utils/sitemap_extractor_utils.py
@@ -1,52 +1,184 @@
"""Module containing utility functions for sitemap extraction."""
-from bs4 import BeautifulSoup
from typing import Any, Union
+from urllib.parse import unquote, urlparse
+from bs4 import BeautifulSoup, Tag
-def custom_sitemap_parser_function(content: Union[str, BeautifulSoup]) -> str:
+
+def _as_soup(content: Union[str, BeautifulSoup]) -> BeautifulSoup:
+ if isinstance(content, BeautifulSoup):
+ return content
+ return BeautifulSoup(content, "html.parser")
+
+
+def _remove_non_content_elements(root: Tag) -> None:
+ for selector in ("script", "style", "noscript", "nav", "aside", "footer", "form"):
+ for element in root.find_all(selector):
+ element.decompose()
+
+
+def _extract_text(root: Tag) -> str:
+ _remove_non_content_elements(root)
+ return root.get_text(separator=" ", strip=True)
+
+
+def _select_docusaurus_root(soup: BeautifulSoup) -> Tag:
+ # Docusaurus v2 pages typically render the Markdown content inside
Doc content paragraph.
+Content
Astro content.
+Content
No h1 here.