Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions chat/src/components/chat-provider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,9 @@ export function ChatProvider({ children }: PropsWithChildren) {
description: message,
});
} finally {
// Remove optimistic draft message if still present (may have been replaced by server response via SSE).
setMessages((prev) => prev.filter((m) => !isDraftMessage(m)));
if (type === "user") {
setMessages((prevMessages) =>
prevMessages.filter((m) => !isDraftMessage(m))
);
setLoading(false);
}
}
Expand Down
32 changes: 29 additions & 3 deletions cmd/attach/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,33 @@ func WriteRawInputOverHTTP(ctx context.Context, url string, msg string) error {
return nil
}

func runAttach(remoteUrl string) error {
func checkACPMode(remoteURL string) (bool, error) {
resp, err := http.Get(remoteURL + "/status")
if err != nil {
return false, xerrors.Errorf("failed to check server status: %w", err)
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode != http.StatusOK {
return false, xerrors.Errorf("unexpected %d response from server: %s", resp.StatusCode, resp.Status)
}

var status httpapi.StatusResponse
if err := json.NewDecoder(resp.Body).Decode(&status.Body); err != nil {
return false, xerrors.Errorf("failed to decode server status: %w", err)
}

return status.Body.Transport == httpapi.TransportACP, nil
}

func runAttach(remoteURL string) error {
// Check if server is running in ACP mode (attach not supported)
if isACP, err := checkACPMode(remoteURL); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "WARN: Unable to check server: %s", err.Error())
} else if isACP {
return xerrors.New("attach is not yet supported in ACP mode")
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stdin := int(os.Stdin.Fd())
Expand All @@ -152,7 +178,7 @@ func runAttach(remoteUrl string) error {
readScreenErrCh := make(chan error, 1)
go func() {
defer close(readScreenErrCh)
if err := ReadScreenOverHTTP(ctx, remoteUrl+"/internal/screen", screenCh); err != nil {
if err := ReadScreenOverHTTP(ctx, remoteURL+"/internal/screen", screenCh); err != nil {
if errors.Is(err, context.Canceled) {
return
}
Expand All @@ -175,7 +201,7 @@ func runAttach(remoteUrl string) error {
if input == "\x03" {
continue
}
if err := WriteRawInputOverHTTP(ctx, remoteUrl+"/message", input); err != nil {
if err := WriteRawInputOverHTTP(ctx, remoteURL+"/message", input); err != nil {
writeRawInputErrCh <- xerrors.Errorf("failed to write raw input: %w", err)
return
}
Expand Down
94 changes: 69 additions & 25 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/coder/agentapi/lib/httpapi"
"github.com/coder/agentapi/lib/logctx"
"github.com/coder/agentapi/lib/msgfmt"
st "github.com/coder/agentapi/lib/screentracker"
"github.com/coder/agentapi/lib/termexec"
)

Expand Down Expand Up @@ -104,11 +105,33 @@ func runServer(ctx context.Context, logger *slog.Logger, argsToPass []string) er
}

printOpenAPI := viper.GetBool(FlagPrintOpenAPI)
experimentalACP := viper.GetBool(FlagExperimentalACP)

if printOpenAPI && experimentalACP {
return xerrors.Errorf("flags --%s and --%s are mutually exclusive", FlagPrintOpenAPI, FlagExperimentalACP)
}

var agentIO st.AgentIO
transport := "pty"
var process *termexec.Process
var acpResult *httpapi.SetupACPResult

if printOpenAPI {
process = nil
agentIO = nil
} else if experimentalACP {
var err error
acpResult, err = httpapi.SetupACP(ctx, httpapi.SetupACPConfig{
Program: agent,
ProgramArgs: argsToPass[1:],
})
if err != nil {
return xerrors.Errorf("failed to setup ACP: %w", err)
}
acpIO := acpResult.AgentIO
agentIO = acpIO
transport = "acp"
} else {
process, err = httpapi.SetupProcess(ctx, httpapi.SetupProcessConfig{
proc, err := httpapi.SetupProcess(ctx, httpapi.SetupProcessConfig{
Program: agent,
ProgramArgs: argsToPass[1:],
TerminalWidth: termWidth,
Expand All @@ -118,11 +141,14 @@ func runServer(ctx context.Context, logger *slog.Logger, argsToPass []string) er
if err != nil {
return xerrors.Errorf("failed to setup process: %w", err)
}
process = proc
agentIO = proc
}
port := viper.GetInt(FlagPort)
srv, err := httpapi.NewServer(ctx, httpapi.ServerConfig{
AgentType: agentType,
Process: process,
AgentIO: agentIO,
Transport: httpapi.Transport(transport),
Port: port,
ChatBasePath: viper.GetString(FlagChatBasePath),
AllowedHosts: viper.GetStringSlice(FlagAllowedHosts),
Expand All @@ -138,19 +164,35 @@ func runServer(ctx context.Context, logger *slog.Logger, argsToPass []string) er
}
logger.Info("Starting server on port", "port", port)
processExitCh := make(chan error, 1)
go func() {
defer close(processExitCh)
if err := process.Wait(); err != nil {
if errors.Is(err, termexec.ErrNonZeroExitCode) {
processExitCh <- xerrors.Errorf("========\n%s\n========\n: %w", strings.TrimSpace(process.ReadScreen()), err)
} else {
processExitCh <- xerrors.Errorf("failed to wait for process: %w", err)
// Wait for process exit in PTY mode
if process != nil {
go func() {
defer close(processExitCh)
if err := process.Wait(); err != nil {
if errors.Is(err, termexec.ErrNonZeroExitCode) {
processExitCh <- xerrors.Errorf("========\n%s\n========\n: %w", strings.TrimSpace(process.ReadScreen()), err)
} else {
processExitCh <- xerrors.Errorf("failed to wait for process: %w", err)
}
}
}
if err := srv.Stop(ctx); err != nil {
logger.Error("Failed to stop server", "error", err)
}
}()
if err := srv.Stop(ctx); err != nil {
logger.Error("Failed to stop server", "error", err)
}
}()
}
// Wait for process exit in ACP mode
if acpResult != nil {
go func() {
defer close(processExitCh)
defer close(acpResult.Done) // Signal cleanup goroutine to exit
if err := acpResult.Wait(); err != nil {
processExitCh <- xerrors.Errorf("ACP process exited: %w", err)
}
if err := srv.Stop(ctx); err != nil {
logger.Error("Failed to stop server", "error", err)
}
}()
}
if err := srv.Start(); err != nil && err != context.Canceled && err != http.ErrServerClosed {
return xerrors.Errorf("failed to start server: %w", err)
}
Expand Down Expand Up @@ -180,16 +222,17 @@ type flagSpec struct {
}

const (
FlagType = "type"
FlagPort = "port"
FlagPrintOpenAPI = "print-openapi"
FlagChatBasePath = "chat-base-path"
FlagTermWidth = "term-width"
FlagTermHeight = "term-height"
FlagAllowedHosts = "allowed-hosts"
FlagAllowedOrigins = "allowed-origins"
FlagExit = "exit"
FlagInitialPrompt = "initial-prompt"
FlagType = "type"
FlagPort = "port"
FlagPrintOpenAPI = "print-openapi"
FlagChatBasePath = "chat-base-path"
FlagTermWidth = "term-width"
FlagTermHeight = "term-height"
FlagAllowedHosts = "allowed-hosts"
FlagAllowedOrigins = "allowed-origins"
FlagExit = "exit"
FlagInitialPrompt = "initial-prompt"
FlagExperimentalACP = "experimental-acp"
)

func CreateServerCmd() *cobra.Command {
Expand Down Expand Up @@ -228,6 +271,7 @@ func CreateServerCmd() *cobra.Command {
// localhost:3284 is the default origin when you open the chat interface in your browser. localhost:3000 and 3001 are used during development.
{FlagAllowedOrigins, "o", []string{"http://localhost:3284", "http://localhost:3000", "http://localhost:3001"}, "HTTP allowed origins. Use '*' for all, comma-separated list via flag, space-separated list via AGENTAPI_ALLOWED_ORIGINS env var", "stringSlice"},
{FlagInitialPrompt, "I", "", "Initial prompt for the agent. Recommended only if the agent doesn't support initial prompt in interaction mode. Will be read from stdin if piped (e.g., echo 'prompt' | agentapi server -- my-agent)", "string"},
{FlagExperimentalACP, "", false, "Use experimental ACP transport instead of PTY", "bool"},
}

for _, spec := range flagSpecs {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/ActiveState/termtest/xpty v0.6.0
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
github.com/charmbracelet/bubbletea v1.3.4
github.com/coder/acp-go-sdk v0.6.3
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225
github.com/coder/quartz v0.1.2
github.com/danielgtaylor/huma/v2 v2.32.0
Expand All @@ -15,6 +16,7 @@ require (
github.com/spf13/viper v1.20.1
github.com/stretchr/testify v1.11.1
github.com/tmaxmax/go-sse v0.10.0
go.uber.org/goleak v1.3.0
golang.org/x/term v0.30.0
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ github.com/ckaznocha/intrange v0.3.1 h1:j1onQyXvHUsPWujDH6WIjhyH26gkRt/txNlV7Lsp
github.com/ckaznocha/intrange v0.3.1/go.mod h1:QVepyz1AkUoFQkpEqksSYpNpUo3c5W7nWh/s6SHIJJk=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/coder/acp-go-sdk v0.6.3 h1:LsXQytehdjKIYJnoVWON/nf7mqbiarnyuyE3rrjBsXQ=
github.com/coder/acp-go-sdk v0.6.3/go.mod h1:yKzM/3R9uELp4+nBAwwtkS0aN1FOFjo11CNPy37yFko=
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225 h1:tRIViZ5JRmzdOEo5wUWngaGEFBG8OaE1o2GIHN5ujJ8=
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225/go.mod h1:rNLVpYgEVeu1Zk29K64z6Od8RBP9DwqCu9OfCzh8MR4=
github.com/coder/paralleltestctx v0.0.1 h1:eauyehej1XYTGwgzGWMTjeRIVgOpU6XLPNVb2oi6kDs=
Expand Down
17 changes: 17 additions & 0 deletions lib/httpapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,22 @@ func (m MessageType) Schema(r huma.Registry) *huma.Schema {
return util.OpenAPISchema(r, "MessageType", MessageTypeValues)
}

type Transport string

const (
TransportPTY Transport = "pty"
TransportACP Transport = "acp"
)

var TransportValues = []Transport{
TransportPTY,
TransportACP,
}

func (tr Transport) Schema(r huma.Registry) *huma.Schema {
return util.OpenAPISchema(r, "Transport", TransportValues)
}

// Message represents a message
type Message struct {
Id int `json:"id" doc:"Unique identifier for the message. This identifier also represents the order of the message in the conversation history."`
Expand All @@ -38,6 +54,7 @@ type StatusResponse struct {
Body struct {
Status AgentStatus `json:"status" doc:"Current agent status. 'running' means that the agent is processing a message, 'stable' means that the agent is idle and waiting for input."`
AgentType mf.AgentType `json:"agent_type" doc:"Type of the agent being used by the server."`
Transport Transport `json:"transport" doc:"Backend transport being used ('acp' or 'pty')."`
}
}

Expand Down
63 changes: 41 additions & 22 deletions lib/httpapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
mf "github.com/coder/agentapi/lib/msgfmt"
st "github.com/coder/agentapi/lib/screentracker"
"github.com/coder/agentapi/lib/termexec"
"github.com/coder/agentapi/x/acpio"
"github.com/coder/quartz"
"github.com/danielgtaylor/huma/v2"
"github.com/danielgtaylor/huma/v2/adapters/humachi"
Expand All @@ -42,12 +43,13 @@ type Server struct {
mu sync.RWMutex
logger *slog.Logger
conversation st.Conversation
agentio *termexec.Process
agentio st.AgentIO
agentType mf.AgentType
emitter *EventEmitter
chatBasePath string
tempDir string
clock quartz.Clock
transport Transport
}

func (s *Server) NormalizeSchema(schema any) any {
Expand Down Expand Up @@ -98,7 +100,8 @@ const snapshotInterval = 25 * time.Millisecond

type ServerConfig struct {
AgentType mf.AgentType
Process *termexec.Process
AgentIO st.AgentIO
Transport Transport
Port int
ChatBasePath string
AllowedHosts []string
Expand Down Expand Up @@ -252,18 +255,32 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
initialPrompt = FormatMessage(config.AgentType, config.InitialPrompt)
}

conversation := st.NewPTY(ctx, st.PTYConversationConfig{
AgentType: config.AgentType,
AgentIO: config.Process,
Clock: config.Clock,
SnapshotInterval: snapshotInterval,
ScreenStabilityLength: 2 * time.Second,
FormatMessage: formatMessage,
ReadyForInitialPrompt: isAgentReadyForInitialPrompt,
FormatToolCall: formatToolCall,
InitialPrompt: initialPrompt,
Logger: logger,
}, emitter)
var conversation st.Conversation
if config.Transport == TransportACP {
// For ACP, cast AgentIO to *acpio.ACPAgentIO
acpIO, ok := config.AgentIO.(*acpio.ACPAgentIO)
if !ok {
return nil, fmt.Errorf("ACP transport requires ACPAgentIO")
}
conversation = acpio.NewACPConversation(ctx, acpIO, logger, initialPrompt, emitter, config.Clock)
} else {
proc, ok := config.AgentIO.(*termexec.Process)
if !ok && config.AgentIO != nil {
return nil, fmt.Errorf("PTY transport requires termexec.Process")
}
conversation = st.NewPTY(ctx, st.PTYConversationConfig{
AgentType: config.AgentType,
AgentIO: proc,
Clock: config.Clock,
SnapshotInterval: snapshotInterval,
ScreenStabilityLength: 2 * time.Second,
FormatMessage: formatMessage,
ReadyForInitialPrompt: isAgentReadyForInitialPrompt,
FormatToolCall: formatToolCall,
InitialPrompt: initialPrompt,
Logger: logger,
}, emitter)
}

// Create temporary directory for uploads
tempDir, err := os.MkdirTemp("", "agentapi-uploads-")
Expand All @@ -278,24 +295,25 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
port: config.Port,
conversation: conversation,
logger: logger,
agentio: config.Process,
agentio: config.AgentIO,
agentType: config.AgentType,
emitter: emitter,
chatBasePath: strings.TrimSuffix(config.ChatBasePath, "/"),
tempDir: tempDir,
clock: config.Clock,
transport: config.Transport,
}

// Register API routes
s.registerRoutes()

// Start the conversation polling loop if we have a process.
// Process is nil only when --print-openapi is used (no agent runs).
// The process is already running at this point - termexec.StartProcess()
// blocks until the PTY is created and the process is active. Agent
// readiness (waiting for the prompt) is handled asynchronously inside
// conversation.Start() via ReadyForInitialPrompt.
if config.Process != nil {
// Start the conversation polling loop if we have an agent IO.
// AgentIO is nil only when --print-openapi is used (no agent runs).
// For PTY transport, the process is already running at this point -
// termexec.StartProcess() blocks until the PTY is created and the process
// is active. Agent readiness (waiting for the prompt) is handled
// asynchronously inside conversation.Start() via ReadyForInitialPrompt.
if config.AgentIO != nil {
s.conversation.Start(ctx)
}

Expand Down Expand Up @@ -417,6 +435,7 @@ func (s *Server) getStatus(ctx context.Context, input *struct{}) (*StatusRespons
resp := &StatusResponse{}
resp.Body.Status = agentStatus
resp.Body.AgentType = s.agentType
resp.Body.Transport = s.transport

return resp, nil
}
Expand Down
Loading