-
Notifications
You must be signed in to change notification settings - Fork 8
refactor: Enhance sitemap extraction and summarization features #185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a98f37f
3acbfb5
94d7b8b
1fc4759
4cbed38
9316738
d9a89eb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| apiVersion: v1 | ||
| kind: ConfigMap | ||
| metadata: | ||
| name: {{ template "configmap.extractorSitemapName" . }} | ||
| data: | ||
| {{- range $key, $value := .Values.extractor.envs.sitemap }} | ||
| {{ $key }}: {{ $value | quote }} | ||
| {{- end }} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,9 @@ | ||
| """Module for enhancing the summary of pages by grouping information by page and summarizing each page.""" | ||
|
|
||
| from asyncio import gather | ||
| import asyncio | ||
| from hashlib import sha256 | ||
| from typing import Optional | ||
| from typing import Any | ||
|
|
||
| from langchain_core.documents import Document | ||
| from langchain_core.runnables import RunnableConfig | ||
|
|
@@ -25,8 +26,36 @@ class PageSummaryEnhancer(SummaryEnhancer): | |
| """ | ||
|
|
||
| BASE64_IMAGE_KEY = "base64_image" | ||
| DOCUMENT_URL_KEY = "document_url" | ||
| DEFAULT_PAGE_NR = 1 | ||
|
|
||
| @staticmethod | ||
| def _parse_max_concurrency(config: Optional[RunnableConfig]) -> int: | ||
| if not config: | ||
| return 1 | ||
| raw = config.get("max_concurrency") | ||
| if raw is None: | ||
| return 1 | ||
| try: | ||
| return max(1, int(raw)) | ||
| except (TypeError, ValueError): | ||
| return 1 | ||
|
|
||
| def _group_key(self, piece: Document) -> tuple[Any, ...]: | ||
| document_url = piece.metadata.get(self.DOCUMENT_URL_KEY) | ||
| page = piece.metadata.get("page", self.DEFAULT_PAGE_NR) | ||
|
|
||
| # For paged documents (PDF/docling/etc.) keep per-page summaries even if a shared document URL exists. | ||
| if isinstance(page, int) or (isinstance(page, str) and page != "Unknown Title"): | ||
| return ("page_number", document_url, page) | ||
|
Comment on lines
+49
to
+50
|
||
|
|
||
| # For sources like sitemaps/confluence, `page` can be a non-unique title (or missing), | ||
| # so group by the page URL when available to ensure one summary per page. | ||
| if document_url: | ||
| return ("document_url", document_url) | ||
|
|
||
| return ("page", page) | ||
|
|
||
| async def _asummarize_page(self, page_pieces: list[Document], config: Optional[RunnableConfig]) -> Document: | ||
| full_page_content = " ".join([piece.page_content for piece in page_pieces]) | ||
| summary = await self._summarizer.ainvoke(full_page_content, config) | ||
|
|
@@ -39,24 +68,46 @@ async def _asummarize_page(self, page_pieces: list[Document], config: Optional[R | |
| return Document(metadata=meta, page_content=summary) | ||
|
|
||
| async def _acreate_summary(self, information: list[Document], config: Optional[RunnableConfig]) -> list[Document]: | ||
| distinct_pages = [] | ||
| grouped = self._group_information(information) | ||
| max_concurrency = self._parse_max_concurrency(config) | ||
| return await self._summarize_groups(grouped, config, max_concurrency=max_concurrency) | ||
|
|
||
| def _group_information(self, information: list[Document]) -> list[list[Document]]: | ||
| ordered_keys: list[tuple[Any, ...]] = [] | ||
| groups: dict[tuple[Any, ...], list[Document]] = {} | ||
| for info in information: | ||
| if info.metadata.get("page", self.DEFAULT_PAGE_NR) not in distinct_pages: | ||
| distinct_pages.append(info.metadata.get("page", self.DEFAULT_PAGE_NR)) | ||
|
|
||
| grouped = [] | ||
| for page in distinct_pages: | ||
| group = [] | ||
| for compare_info in information: | ||
| if compare_info.metadata.get("page", self.DEFAULT_PAGE_NR) == page: | ||
| group.append(compare_info) | ||
| if ( | ||
| self._chunker_settings | ||
| and len(" ".join([item.page_content for item in group])) < self._chunker_settings.max_size | ||
| ): | ||
| continue | ||
| grouped.append(group) | ||
|
|
||
| summary_tasks = [self._asummarize_page(info_group, config) for info_group in tqdm(grouped)] | ||
|
|
||
| return await gather(*summary_tasks) | ||
| key = self._group_key(info) | ||
| if key not in groups: | ||
| ordered_keys.append(key) | ||
| groups[key] = [] | ||
| groups[key].append(info) | ||
| return [groups[key] for key in ordered_keys] | ||
|
|
||
| async def _summarize_groups( | ||
| self, | ||
| grouped: list[list[Document]], | ||
| config: Optional[RunnableConfig], | ||
| *, | ||
| max_concurrency: int, | ||
| ) -> list[Document]: | ||
| if max_concurrency == 1: | ||
| summaries: list[Document] = [] | ||
| for info_group in tqdm(grouped): | ||
| summaries.append(await self._asummarize_page(info_group, config)) | ||
| return summaries | ||
|
|
||
| semaphore = asyncio.Semaphore(max_concurrency) | ||
| results: list[Document | None] = [None] * len(grouped) | ||
|
|
||
| async def _run(idx: int, info_group: list[Document]) -> tuple[int, Document]: | ||
| async with semaphore: | ||
| return idx, await self._asummarize_page(info_group, config) | ||
|
|
||
| tasks = [asyncio.create_task(_run(idx, info_group)) for idx, info_group in enumerate(grouped)] | ||
| with tqdm(total=len(tasks)) as pbar: | ||
| for task in asyncio.as_completed(tasks): | ||
| idx, summary = await task | ||
| results[idx] = summary | ||
| pbar.update(1) | ||
|
|
||
| return [summary for summary in results if summary is not None] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,6 +44,24 @@ def __init__( | |
| self._semaphore = semaphore | ||
| self._retry_decorator_settings = create_retry_decorator_settings(summarizer_settings, retry_decorator_settings) | ||
|
|
||
| @staticmethod | ||
| def _parse_max_concurrency(config: RunnableConfig) -> Optional[int]: | ||
| """Parse max concurrency from a RunnableConfig. | ||
|
|
||
| Returns | ||
| ------- | ||
| Optional[int] | ||
| An integer >= 1 if configured and valid, otherwise None. | ||
| """ | ||
| max_concurrency = config.get("max_concurrency") | ||
| if max_concurrency is None: | ||
| return None | ||
|
|
||
| try: | ||
| return max(1, int(max_concurrency)) | ||
| except (TypeError, ValueError): | ||
| return None | ||
|
Comment on lines
+47
to
+63
|
||
|
|
||
| async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] = None) -> SummarizerOutput: | ||
| """ | ||
| Asynchronously invokes the summarization process on the given query. | ||
|
|
@@ -77,9 +95,8 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] | |
| langchain_documents = self._chunker.split_documents([document]) | ||
| logger.debug("Summarizing %d chunk(s)...", len(langchain_documents)) | ||
|
|
||
| # Fan out with concurrency, bounded by your semaphore inside _summarize_chunk | ||
| tasks = [asyncio.create_task(self._summarize_chunk(doc.page_content, config)) for doc in langchain_documents] | ||
| outputs = await asyncio.gather(*tasks) | ||
| max_concurrency = self._parse_max_concurrency(config) | ||
| outputs = await self._summarize_documents(langchain_documents, config, max_concurrency=max_concurrency) | ||
|
|
||
| if len(outputs) == 1: | ||
| return outputs[0] | ||
|
|
@@ -93,6 +110,34 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] | |
| ) | ||
| return await self._summarize_chunk(merged, config) | ||
|
|
||
| async def _summarize_documents( | ||
| self, | ||
| documents: list[Document], | ||
| config: RunnableConfig, | ||
| *, | ||
| max_concurrency: Optional[int], | ||
| ) -> list[SummarizerOutput]: | ||
| """Summarize a set of already-chunked documents. | ||
|
|
||
| Notes | ||
| ----- | ||
| This optionally limits task fan-out using a per-call semaphore (max_concurrency). | ||
| The actual LLM call concurrency is always bounded by the instance semaphore held | ||
| inside `_summarize_chunk`. | ||
| """ | ||
|
Comment on lines
+113
to
+127
|
||
| if max_concurrency == 1: | ||
| return [await self._summarize_chunk(doc.page_content, config) for doc in documents] | ||
|
|
||
| limiter: asyncio.Semaphore | None = asyncio.Semaphore(max_concurrency) if max_concurrency is not None else None | ||
|
|
||
| async def _run(doc: Document) -> SummarizerOutput: | ||
| if limiter is None: | ||
| return await self._summarize_chunk(doc.page_content, config) | ||
| async with limiter: | ||
| return await self._summarize_chunk(doc.page_content, config) | ||
|
|
||
| return await asyncio.gather(*(_run(doc) for doc in documents)) | ||
|
|
||
| def _create_chain(self) -> Runnable: | ||
| return self._langfuse_manager.get_base_prompt(self.__class__.__name__) | self._langfuse_manager.get_base_llm( | ||
| self.__class__.__name__ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| import asyncio | ||
|
|
||
| import pytest | ||
| from langchain_core.documents import Document | ||
|
|
||
| from admin_api_lib.impl.settings.summarizer_settings import SummarizerSettings | ||
| from admin_api_lib.impl.summarizer.langchain_summarizer import LangchainSummarizer | ||
| from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings | ||
| from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore | ||
|
|
||
|
|
||
| class _StaticChunker: | ||
| def __init__(self, docs: list[Document]): | ||
| self._docs = docs | ||
|
|
||
| def split_documents(self, _docs: list[Document]) -> list[Document]: | ||
| return self._docs | ||
|
|
||
|
|
||
| class _ConcurrencyTrackingSummarizer(LangchainSummarizer): | ||
| def __init__(self, docs: list[Document]): | ||
| super().__init__( | ||
| langfuse_manager=object(), # type: ignore[arg-type] | ||
| chunker=_StaticChunker(docs), # type: ignore[arg-type] | ||
| semaphore=AsyncThreadsafeSemaphore(100), | ||
| summarizer_settings=SummarizerSettings(), | ||
| retry_decorator_settings=RetryDecoratorSettings(), | ||
| ) | ||
| self.in_flight = 0 | ||
| self.max_in_flight = 0 | ||
|
|
||
| async def _summarize_chunk(self, text: str, config): # type: ignore[override] | ||
| self.in_flight += 1 | ||
| self.max_in_flight = max(self.max_in_flight, self.in_flight) | ||
| await asyncio.sleep(0.01) | ||
| self.in_flight -= 1 | ||
| return text | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_langchain_summarizer_respects_max_concurrency_one(): | ||
| docs = [Document(page_content=f"chunk-{idx}") for idx in range(5)] | ||
| summarizer = _ConcurrencyTrackingSummarizer(docs) | ||
|
|
||
| await summarizer.ainvoke("input", config={"max_concurrency": 1}) | ||
|
|
||
| assert summarizer.max_in_flight == 1 | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_langchain_summarizer_respects_max_concurrency_limit(): | ||
| docs = [Document(page_content=f"chunk-{idx}") for idx in range(8)] | ||
| summarizer = _ConcurrencyTrackingSummarizer(docs) | ||
|
|
||
| await summarizer.ainvoke("input", config={"max_concurrency": 2}) | ||
|
|
||
| assert summarizer.max_in_flight <= 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate import statement. The
from typing import Anyappears on both line 5 and line 6. Remove the duplicate import.