Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
366f814
Add streams tab
VaguelySerious Dec 8, 2025
8f98f3d
Merge branch 'main' into peter/show-streams
VaguelySerious Dec 8, 2025
f8dac39
[world] Add listStream endpoint
VaguelySerious Dec 8, 2025
05d1471
List streams endpoint
VaguelySerious Dec 8, 2025
3cb3350
Merge branch 'peter/list-streams' into peter/list-streams-ui
VaguelySerious Dec 8, 2025
a082c35
changes
VaguelySerious Dec 8, 2025
03bd629
Merge branch 'peter/list-streams' into peter/list-streams-ui
VaguelySerious Dec 8, 2025
7856720
Changesset
VaguelySerious Dec 8, 2025
1afb199
UI
VaguelySerious Dec 8, 2025
c4b7feb
Various UI fixes
VaguelySerious Dec 8, 2025
ed86a01
Merge branch 'main' into peter/list-streams
VaguelySerious Dec 8, 2025
864e012
Merge branch 'peter/list-streams' into peter/list-streams-ui
VaguelySerious Dec 8, 2025
c11e1fb
Undo
VaguelySerious Dec 8, 2025
bfbfe88
Revert "Undo"
VaguelySerious Dec 8, 2025
333de4c
Merge branch 'peter/list-streams' into peter/list-streams-ui
VaguelySerious Dec 8, 2025
6ce1398
Fix stream deserialization
VaguelySerious Dec 8, 2025
767ac12
Add test
VaguelySerious Dec 8, 2025
2c03d38
Merge branch 'peter/list-streams' into peter/list-streams-ui
VaguelySerious Dec 8, 2025
7fb59fb
Fix
VaguelySerious Dec 8, 2025
b7dda1d
Pretty print streams
VaguelySerious Dec 8, 2025
11cf85e
Auto-refresh stream list
VaguelySerious Dec 8, 2025
be4aff3
Rename replay run
VaguelySerious Dec 8, 2025
dd69152
Add wake-up run button
VaguelySerious Dec 8, 2025
bbdc8c9
Wake up from sleep
VaguelySerious Dec 8, 2025
3a86630
Changeset
VaguelySerious Dec 8, 2025
eda2cbd
Merge branch 'main' into peter/list-streams
VaguelySerious Dec 8, 2025
54d0ba1
rename
VaguelySerious Dec 8, 2025
81e7011
Merge branch 'peter/list-streams' into peter/list-streams-ui
VaguelySerious Dec 8, 2025
401d706
Merge branch 'peter/list-streams-ui' into peter/run-wakeup
VaguelySerious Dec 8, 2025
df92c03
One or the other
VaguelySerious Dec 8, 2025
61a76ad
Remove unused import
VaguelySerious Dec 9, 2025
804755d
Unify actions
VaguelySerious Dec 9, 2025
af8216f
Fixes
VaguelySerious Dec 9, 2025
6e4daab
Remove unused import
VaguelySerious Dec 9, 2025
3d147b2
Fixes
VaguelySerious Dec 9, 2025
d624dde
Merge branch 'peter/list-streams-ui' into peter/run-wakeup
VaguelySerious Dec 9, 2025
9c16423
Allow individual wakeups
VaguelySerious Dec 9, 2025
5ab8d1e
Fix
VaguelySerious Dec 9, 2025
8dd10a2
Merge branch 'main' into peter/run-wakeup
VaguelySerious Dec 9, 2025
289e576
Clean up
VaguelySerious Dec 9, 2025
dd85ccd
Wording
VaguelySerious Dec 9, 2025
4b87602
UX improvements
VaguelySerious Dec 9, 2025
60099e8
Fix data staleness
VaguelySerious Dec 10, 2025
5795c5a
Merge branch 'main' into peter/run-wakeup
VaguelySerious Dec 10, 2025
65243ab
Merge branch 'main' into peter/run-wakeup
VaguelySerious Dec 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/fine-moles-sit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@workflow/web-shared": patch
"@workflow/web": patch
---

Add buttons to wake up workflow from sleep or scheduling issues
33 changes: 33 additions & 0 deletions packages/web-shared/src/api/workflow-api-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import {
fetchStreams,
readStreamServerAction,
recreateRun as recreateRunServerAction,
wakeUpRun as wakeUpRunServerAction,
type StopSleepOptions,
type StopSleepResult,
reenqueueRun as reenqueueRunServerAction,
} from './workflow-server-actions';

const MAX_ITEMS = 1000;
Expand Down Expand Up @@ -1101,6 +1105,35 @@ export async function recreateRun(env: EnvMap, runId: string): Promise<string> {
return resultData;
}

/**
* Wake up a workflow run by re-enqueuing it
*/
export async function reenqueueRun(env: EnvMap, runId: string): Promise<void> {
const { error } = await unwrapServerActionResult(
reenqueueRunServerAction(env, runId)
);
if (error) {
throw error;
}
}

/**
* Wake up a workflow run by interrupting any pending sleep() calls
*/
export async function wakeUpRun(
env: EnvMap,
runId: string,
options?: StopSleepOptions
): Promise<StopSleepResult> {
const { error, result: resultData } = await unwrapServerActionResult(
wakeUpRunServerAction(env, runId, options)
);
if (error) {
throw error;
}
return resultData;
}

function isServerActionError(value: unknown): value is ServerActionError {
return (
typeof value === 'object' &&
Expand Down
126 changes: 126 additions & 0 deletions packages/web-shared/src/api/workflow-server-actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,132 @@ export async function recreateRun(
}
}

/**
* Re-enqueue a workflow run.
*
* This re-enqueues the workflow orchestration layer. It's a no-op unless the workflow
* got stuck due to an implementation issue in the World. Useful for debugging custom Worlds.
*/
export async function reenqueueRun(
worldEnv: EnvMap,
runId: string
): Promise<ServerActionResult<void>> {
try {
const world = getWorldFromEnv({ ...worldEnv });
const run = await world.runs.get(runId);
const deploymentId = run.deploymentId;

await world.queue(
`__wkf_workflow_${run.workflowName}`,
{
runId,
},
{
deploymentId,
}
);

return createResponse(undefined);
} catch (error) {
return createServerActionError<void>(error, 'reenqueueRun', { runId });
}
}

export interface StopSleepResult {
/** Number of pending sleeps that were stopped */
stoppedCount: number;
}

export interface StopSleepOptions {
/**
* Optional list of specific correlation IDs to target.
* If provided, only these sleep calls will be interrupted.
* If not provided, all pending sleep calls will be interrupted.
*/
correlationIds?: string[];
}

/**
* Wake up a workflow run by interrupting pending sleep() calls.
*
* This finds wait_created events without matching wait_completed events,
* creates wait_completed events for them, and then re-enqueues the run.
*
* @param worldEnv - Environment configuration for the World
* @param runId - The run ID to wake up
* @param options - Optional settings to narrow down targeting (specific correlation IDs)
*/
export async function wakeUpRun(
worldEnv: EnvMap,
runId: string,
options?: StopSleepOptions
): Promise<ServerActionResult<StopSleepResult>> {
try {
const world = getWorldFromEnv({ ...worldEnv });
const run = await world.runs.get(runId);
const deploymentId = run.deploymentId;

// Fetch all events for the run
const eventsResult = await world.events.list({
runId,
pagination: { limit: 1000 },
resolveData: 'none',
});

// Find wait_created events without matching wait_completed events
const waitCreatedEvents = eventsResult.data.filter(
(e) => e.eventType === 'wait_created'
);
const waitCompletedCorrelationIds = new Set(
eventsResult.data
.filter((e) => e.eventType === 'wait_completed')
.map((e) => e.correlationId)
);

let pendingWaits = waitCreatedEvents.filter(
(e) => !waitCompletedCorrelationIds.has(e.correlationId)
);

// If specific correlation IDs are provided, filter to only those
if (options?.correlationIds && options.correlationIds.length > 0) {
const targetCorrelationIds = new Set(options.correlationIds);
pendingWaits = pendingWaits.filter(
(e) => e.correlationId && targetCorrelationIds.has(e.correlationId)
);
}

// Create wait_completed events for each pending wait
for (const waitEvent of pendingWaits) {
if (waitEvent.correlationId) {
await world.events.create(runId, {
eventType: 'wait_completed',
correlationId: waitEvent.correlationId,
});
}
}

// Re-enqueue the run to wake it up
if (pendingWaits.length > 0) {
await world.queue(
`__wkf_workflow_${run.workflowName}`,
{
runId,
},
{
deploymentId,
}
);
}

return createResponse({ stoppedCount: pendingWaits.length });
} catch (error) {
return createServerActionError<StopSleepResult>(error, 'wakeUpRun', {
runId,
correlationIds: options?.correlationIds,
});
}
}

export async function readStreamServerAction(
env: EnvMap,
streamId: string,
Expand Down
13 changes: 12 additions & 1 deletion packages/web-shared/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,28 @@ export {
parseStepName,
parseWorkflowName,
} from '@workflow/core/parse-name';

export type { Event, Hook, Step, WorkflowRun } from '@workflow/world';

export * from './api/workflow-api-client';
export type { EnvMap } from './api/workflow-server-actions';

export type { EventAnalysis } from './lib/event-analysis';
export {
analyzeEvents,
hasPendingHooksFromEvents,
hasPendingSleepsFromEvents,
hasPendingStepsFromEvents,
isTerminalStatus,
shouldShowReenqueueButton,
} from './lib/event-analysis';
export type { StreamStep } from './lib/utils';
export {
extractConversation,
formatDuration,
identifyStreamSteps,
isDoStreamStep,
} from './lib/utils';

export { RunTraceView } from './run-trace-view';
export { ConversationView } from './sidebar/conversation-view';
export { StreamViewer } from './stream-viewer';
Expand Down
Loading
Loading