Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 105 additions & 10 deletions core/src/main/java/com/google/adk/flows/llmflows/Contents.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import com.google.adk.agents.InvocationContext;
import com.google.adk.agents.LlmAgent;
import com.google.adk.events.Event;
import com.google.adk.events.EventCompaction;
import com.google.adk.models.LlmRequest;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.genai.types.Content;
import com.google.genai.types.FunctionCall;
import com.google.genai.types.FunctionResponse;
Expand All @@ -36,6 +38,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -99,24 +102,25 @@ private ImmutableList<Content> getCurrentTurnContents(
private ImmutableList<Content> getContents(
Optional<String> currentBranch, List<Event> events, String agentName, String modelName) {
List<Event> filteredEvents = new ArrayList<>();
boolean hasCompactEvent = false;

// Filter the events, leaving the contents and the function calls and responses from the current
// agent.
for (Event event : events) {
// Skip events without content, or generated neither by user nor by model or has empty text.
// E.g. events purely for mutating session states.
if (event.content().isEmpty()) {
if (event.actions().compaction().isPresent()) {
// Always include the compaction event for the later processCompactionEvent call.
// The compaction event is used to filter out normal events that are covered by the
// compaction event.
hasCompactEvent = true;
filteredEvents.add(event);
continue;
}
var content = event.content().get();
if (content.role().isEmpty()
|| content.role().get().isEmpty()
|| content.parts().isEmpty()
|| content.parts().get().isEmpty()
|| content.parts().get().get(0).text().map(String::isEmpty).orElse(false)) {

// Skip events without content, or generated neither by user nor by model or has empty text.
// E.g. events purely for mutating session states.
if (isEmptyContent(event)) {
continue;
}

if (!isEventBelongsToBranch(currentBranch, event)) {
continue;
}
Expand All @@ -133,6 +137,10 @@ private ImmutableList<Content> getContents(
}
}

if (hasCompactEvent) {
filteredEvents = processCompactionEvent(filteredEvents);
}

List<Event> resultEvents = rearrangeEventsForLatestFunctionResponse(filteredEvents);
resultEvents = rearrangeEventsForAsyncFunctionResponsesInHistory(resultEvents, modelName);

Expand All @@ -142,6 +150,93 @@ private ImmutableList<Content> getContents(
.collect(toImmutableList());
}

/**
* Check if an event has missing or empty content.
*
* <p>This can happen to the events that only changed session state. When both content and
* transcriptions are empty, the event will be considered as empty. The content is considered
* empty if none of its parts contain text, inline data, file data, function call, or function
* response. Parts with only thoughts are also considered empty.
*
* @param event the event to check.
* @return {@code true} if the event is considered to have empty content, {@code false} otherwise.
*/
private boolean isEmptyContent(Event event) {
if (event.content().isEmpty()) {
return true;
}
var content = event.content().get();
return (content.role().isEmpty()
|| content.role().get().isEmpty()
|| content.parts().isEmpty()
|| content.parts().get().isEmpty()
|| content.parts().get().get(0).text().map(String::isEmpty).orElse(false));
}

/**
* Filters events that are covered by compaction events by identifying compacted ranges and
* filters out events that are covered by compaction summaries
*
* <p>Example of input
*
* <pre>
* [
* event_1(timestamp=1),
* event_2(timestamp=2),
* compaction_1(event_1, event_2, timestamp=3, content=summary_1_2, startTime=1, endTime=2),
* event_3(timestamp=4),
* compaction_2(event_2, event_3, timestamp=5, content=summary_2_3, startTime=2, endTime=3),
* event_4(timestamp=6)
* ]
* </pre>
*
* Will result in the following events output
*
* <pre>
* [
* compaction_1,
* compaction_2
* event_4
* ]
* </pre>
*
* Compaction events are always strictly in order based on event timestamp.
*
* @param events the list of event to filter.
* @return a new list with compaction applied.
*/
private List<Event> processCompactionEvent(List<Event> events) {
List<Event> result = new ArrayList<>();
ListIterator<Event> iter = events.listIterator(events.size());
Long lastCompactionStartTime = null;

while (iter.hasPrevious()) {
Event event = iter.previous();
EventCompaction compaction = event.actions().compaction().orElse(null);
if (compaction == null) {
if (lastCompactionStartTime == null || event.timestamp() < lastCompactionStartTime) {
result.add(event);
}
continue;
}
// Create a new event for the compaction event in the result.
result.add(
Event.builder()
.timestamp(compaction.endTimestamp())
.author("model")
.content(compaction.compactedContent())
.branch(event.branch())
.invocationId(event.invocationId())
.actions(event.actions())
.build());
lastCompactionStartTime =
lastCompactionStartTime == null
? compaction.startTimestamp()
: Long.min(lastCompactionStartTime, compaction.startTimestamp());
}
return Lists.reverse(result);
}

/** Whether the event is a reply from another agent. */
private static boolean isOtherAgentReply(String agentName, Event event) {
return !agentName.isEmpty()
Expand Down
79 changes: 77 additions & 2 deletions core/src/main/java/com/google/adk/runner/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@
import com.google.adk.events.EventActions;
import com.google.adk.flows.llmflows.ResumabilityConfig;
import com.google.adk.memory.BaseMemoryService;
import com.google.adk.models.Model;
import com.google.adk.plugins.BasePlugin;
import com.google.adk.plugins.PluginManager;
import com.google.adk.sessions.BaseSessionService;
import com.google.adk.sessions.InMemorySessionService;
import com.google.adk.sessions.Session;
import com.google.adk.summarizer.EventsCompactionConfig;
import com.google.adk.summarizer.LlmEventSummarizer;
import com.google.adk.summarizer.SlidingWindowEventCompactor;
import com.google.adk.tools.BaseTool;
import com.google.adk.tools.FunctionTool;
import com.google.adk.utils.CollectionUtils;
Expand Down Expand Up @@ -68,6 +72,7 @@ public class Runner {
@Nullable private final BaseMemoryService memoryService;
private final PluginManager pluginManager;
private final ResumabilityConfig resumabilityConfig;
@Nullable private final EventsCompactionConfig eventsCompactionConfig;

/** Builder for {@link Runner}. */
public static class Builder {
Expand All @@ -78,6 +83,7 @@ public static class Builder {
@Nullable private BaseMemoryService memoryService = null;
private List<BasePlugin> plugins = ImmutableList.of();
private ResumabilityConfig resumabilityConfig = new ResumabilityConfig();
@Nullable private EventsCompactionConfig eventsCompactionConfig;

@CanIgnoreReturnValue
public Builder agent(BaseAgent agent) {
Expand Down Expand Up @@ -121,6 +127,12 @@ public Builder resumabilityConfig(ResumabilityConfig resumabilityConfig) {
return this;
}

@CanIgnoreReturnValue
public Builder eventsCompactionConfig(EventsCompactionConfig eventsCompactionConfig) {
this.eventsCompactionConfig = eventsCompactionConfig;
return this;
}

public Runner build() {
if (agent == null) {
throw new IllegalStateException("Agent must be provided.");
Expand All @@ -141,7 +153,8 @@ public Runner build() {
sessionService,
memoryService,
plugins,
resumabilityConfig);
resumabilityConfig,
eventsCompactionConfig);
}
}

Expand Down Expand Up @@ -208,13 +221,43 @@ public Runner(
@Nullable BaseMemoryService memoryService,
List<BasePlugin> plugins,
ResumabilityConfig resumabilityConfig) {
this(
agent,
appName,
artifactService,
sessionService,
memoryService,
plugins,
resumabilityConfig,
null);
}

/**
* Creates a new {@code Runner} with a list of plugins and resumability config.
*
* @deprecated Use {@link Runner.Builder} instead.
*/
@Deprecated
protected Runner(
BaseAgent agent,
String appName,
BaseArtifactService artifactService,
BaseSessionService sessionService,
@Nullable BaseMemoryService memoryService,
List<BasePlugin> plugins,
ResumabilityConfig resumabilityConfig,
@Nullable EventsCompactionConfig eventsCompactionConfig) {
this.agent = agent;
this.appName = appName;
this.artifactService = artifactService;
this.sessionService = sessionService;
this.memoryService = memoryService;
this.pluginManager = new PluginManager(plugins);
this.resumabilityConfig = resumabilityConfig;
this.eventsCompactionConfig =
Optional.ofNullable(eventsCompactionConfig)
.map(c -> createEventsCompactionConfig(agent, c))
.orElse(null);
}

/**
Expand Down Expand Up @@ -493,7 +536,10 @@ public Flowable<Event> runAsync(
Completable.defer(
() ->
pluginManager.runAfterRunCallback(
contextWithUpdatedSession)));
contextWithUpdatedSession)))
.concatWith(
Completable.defer(
() -> compactEvents(updatedSession)));
});
}))
.doOnError(
Expand All @@ -509,6 +555,13 @@ public Flowable<Event> runAsync(
}
}

private Completable compactEvents(Session session) {
return Optional.ofNullable(eventsCompactionConfig)
.map(SlidingWindowEventCompactor::new)
.map(c -> c.compact(session, sessionService))
.orElse(Completable.complete());
}

private void copySessionStates(Session source, Session target) {
// TODO: remove this hack when deprecating all runAsync with Session.
for (var entry : source.state().entrySet()) {
Expand Down Expand Up @@ -740,5 +793,27 @@ private boolean hasLiveRequestQueueParameter(FunctionTool functionTool) {
.anyMatch(parameter -> parameter.getType().equals(LiveRequestQueue.class));
}

/**
* Creates a new {@link EventsCompactionConfig} based on the given configuration. If the {@link
* com.google.adk.summarizer.BaseEventSummarizer} is missing, it will be default to the {@link
* LlmEventSummarizer} using the same model as the LLM base agent.
*/
private static EventsCompactionConfig createEventsCompactionConfig(
BaseAgent agent, EventsCompactionConfig config) {
return new EventsCompactionConfig(
config.compactionInterval(),
config.overlapSize(),
config
.summarizer()
.or(
() ->
Optional.of(agent)
.filter(LlmAgent.class::isInstance)
.map(LlmAgent.class::cast)
.map(LlmAgent::resolvedModel)
.flatMap(Model::model)
.map(LlmEventSummarizer::new)));
}

// TODO: run statelessly
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,27 @@
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class performs events compaction in a sliding window fashion based on the {@link
* EventsCompactionConfig}.
*/
public final class SlidingWindowEventCompactor implements EventCompactor {

private static final Logger logger = LoggerFactory.getLogger(SlidingWindowEventCompactor.class);

private final EventsCompactionConfig config;
private final BaseEventSummarizer summarizer;

public SlidingWindowEventCompactor(EventsCompactionConfig config) {
this.config = config;
// TODO default to LLM summarizer
this.summarizer = config.summarizer().orElseThrow();
this.summarizer =
config
.summarizer()
.orElseThrow(
() -> new IllegalArgumentException("Summarizer is required for event compaction."));
}

/**
Expand Down Expand Up @@ -80,6 +87,8 @@ public SlidingWindowEventCompactor(EventsCompactionConfig config) {
*/
@Override
public Completable compact(Session session, BaseSessionService sessionService) {
logger.debug("Running event compaction for session {}", session.id());

return Completable.fromMaybe(
getCompactionEvents(session)
.flatMap(summarizer::summarizeEvents)
Expand Down
Loading