mirror of
https://github.com/awizemann/scarf.git
synced 2026-05-10 18:44:45 +00:00
fix: Harden ACP session stability and recover messages on reconnection
Sessions were silently dying and losing chat history because:
- Pipe write errors (EPIPE) were completely undetected — broken pipe
writes via Task.detached { handle.write() } failed silently, leaving
the app unaware the subprocess had crashed
- Reconnection fell back to newSession() when loadSession() failed,
creating a blank session and permanently losing all conversation context
- No message reconciliation after reconnect — DB-persisted messages
were never re-fetched, so the UI stayed stale/incomplete
- Keepalive sent bare "\n" which caused json.loads("") parse errors
in the ACP library every 30 seconds, destabilizing the connection
- TERM=xterm-256color was set on a pipe-based subprocess, risking
terminal escape sequence pollution in the JSON-RPC stream
Fixes:
- Replace FileHandle.write() with POSIX Darwin.write() + SIGPIPE
suppression for immediate broken-pipe detection at all write sites
- Send valid JSON-RPC notification {"jsonrpc":"2.0","method":"$/ping"}
as keepalive instead of bare newlines
- Never fall back to newSession() during reconnection — try
resumeSession then loadSession, fail visibly if both fail
- Add reconcileWithDB() to merge DB-persisted messages with local
state after successful reconnection
- Finalize streaming messages immediately on disconnect so partial
content is preserved before reconnection begins
- Use SIGINT instead of SIGTERM for graceful Python subprocess shutdown
- Remove TERM env var from ACP subprocess environment
- Consolidate disconnect cleanup into single idempotent method
- Add isHandlingDisconnect guard against double-handling
- Increase reconnect attempts from 3 to 5 with capped backoff
- Add "Reconnect" button to toolbar error state
Also: bump version to 1.5.1, set deployment target to macOS 14.6
(Sonoma), and update README with rich chat/ACP features, process
controls, skill editing, and corrected system requirements.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -407,7 +407,7 @@
|
||||
CODE_SIGN_ENTITLEMENTS = scarf/scarf.entitlements;
|
||||
CODE_SIGN_STYLE = Automatic;
|
||||
COMBINE_HIDPI_IMAGES = YES;
|
||||
CURRENT_PROJECT_VERSION = 4;
|
||||
CURRENT_PROJECT_VERSION = 5;
|
||||
DEVELOPMENT_TEAM = 3Q6X2L86C4;
|
||||
ENABLE_APP_SANDBOX = NO;
|
||||
ENABLE_HARDENED_RUNTIME = YES;
|
||||
@@ -421,7 +421,8 @@
|
||||
"$(inherited)",
|
||||
"@executable_path/../Frameworks",
|
||||
);
|
||||
MARKETING_VERSION = 1.5.0;
|
||||
MACOSX_DEPLOYMENT_TARGET = 14.6;
|
||||
MARKETING_VERSION = 1.5.1;
|
||||
PRODUCT_BUNDLE_IDENTIFIER = com.scarf;
|
||||
PRODUCT_NAME = "$(TARGET_NAME)";
|
||||
REGISTER_APP_GROUPS = YES;
|
||||
@@ -443,7 +444,7 @@
|
||||
CODE_SIGN_ENTITLEMENTS = scarf/scarf.entitlements;
|
||||
CODE_SIGN_STYLE = Automatic;
|
||||
COMBINE_HIDPI_IMAGES = YES;
|
||||
CURRENT_PROJECT_VERSION = 4;
|
||||
CURRENT_PROJECT_VERSION = 5;
|
||||
DEVELOPMENT_TEAM = 3Q6X2L86C4;
|
||||
ENABLE_APP_SANDBOX = NO;
|
||||
ENABLE_HARDENED_RUNTIME = YES;
|
||||
@@ -457,7 +458,8 @@
|
||||
"$(inherited)",
|
||||
"@executable_path/../Frameworks",
|
||||
);
|
||||
MARKETING_VERSION = 1.5.0;
|
||||
MACOSX_DEPLOYMENT_TARGET = 14.6;
|
||||
MARKETING_VERSION = 1.5.1;
|
||||
PRODUCT_BUNDLE_IDENTIFIER = com.scarf;
|
||||
PRODUCT_NAME = "$(TARGET_NAME)";
|
||||
REGISTER_APP_GROUPS = YES;
|
||||
|
||||
@@ -10,6 +10,7 @@ actor ACPClient {
|
||||
private var stdinPipe: Pipe?
|
||||
private var stdoutPipe: Pipe?
|
||||
private var stderrPipe: Pipe?
|
||||
private var stdinFd: Int32 = -1
|
||||
|
||||
private var nextRequestId = 1
|
||||
private var pendingRequests: [Int: CheckedContinuation<AnyCodable?, Error>] = [:]
|
||||
@@ -45,6 +46,9 @@ actor ACPClient {
|
||||
func start() async throws {
|
||||
guard process == nil else { return }
|
||||
|
||||
// Ignore SIGPIPE so broken-pipe writes return EPIPE instead of crashing
|
||||
signal(SIGPIPE, SIG_IGN)
|
||||
|
||||
// Create the event stream BEFORE anything else so no events are lost
|
||||
let (stream, continuation) = AsyncStream.makeStream(of: ACPEvent.self)
|
||||
self._eventStream = stream
|
||||
@@ -62,8 +66,9 @@ actor ACPClient {
|
||||
proc.standardOutput = stdout
|
||||
proc.standardError = stderr
|
||||
|
||||
// ACP uses JSON-RPC over pipes — do NOT set TERM to avoid terminal escape pollution
|
||||
var env = ProcessInfo.processInfo.environment
|
||||
env["TERM"] = "xterm-256color"
|
||||
env.removeValue(forKey: "TERM")
|
||||
proc.environment = env
|
||||
|
||||
proc.terminationHandler = { [weak self] proc in
|
||||
@@ -85,6 +90,7 @@ actor ACPClient {
|
||||
self.stdinPipe = stdin
|
||||
self.stdoutPipe = stdout
|
||||
self.stderrPipe = stderr
|
||||
self.stdinFd = stdin.fileHandleForWriting.fileDescriptor
|
||||
self.isConnected = true
|
||||
|
||||
// Start reading stdout BEFORE sending initialize (so we catch the response)
|
||||
@@ -123,10 +129,21 @@ actor ACPClient {
|
||||
}
|
||||
pendingRequests.removeAll()
|
||||
|
||||
if let process, process.isRunning {
|
||||
process.terminate()
|
||||
}
|
||||
// Close stdin first so the subprocess sees EOF and can shut down gracefully
|
||||
stdinPipe?.fileHandleForWriting.closeFile()
|
||||
|
||||
if let process, process.isRunning {
|
||||
// SIGINT for graceful Python shutdown (raises KeyboardInterrupt cleanly)
|
||||
process.interrupt()
|
||||
// Watchdog: force-kill if still running after 2 seconds
|
||||
let watchdogProcess = process
|
||||
Task.detached {
|
||||
try? await Task.sleep(nanoseconds: 2_000_000_000)
|
||||
if watchdogProcess.isRunning {
|
||||
watchdogProcess.terminate()
|
||||
}
|
||||
}
|
||||
}
|
||||
stdinPipe?.fileHandleForReading.closeFile()
|
||||
stdoutPipe?.fileHandleForReading.closeFile()
|
||||
stderrPipe?.fileHandleForReading.closeFile()
|
||||
@@ -135,6 +152,7 @@ actor ACPClient {
|
||||
stdinPipe = nil
|
||||
stdoutPipe = nil
|
||||
stderrPipe = nil
|
||||
stdinFd = -1
|
||||
isConnected = false
|
||||
currentSessionId = nil
|
||||
statusMessage = "Disconnected"
|
||||
@@ -153,12 +171,21 @@ actor ACPClient {
|
||||
}
|
||||
}
|
||||
|
||||
/// Valid JSON-RPC notification used as a keepalive probe.
|
||||
/// Sending bare newlines causes `json.loads("")` errors in the ACP library.
|
||||
private static let keepalivePayload: Data = {
|
||||
let json = #"{"jsonrpc":"2.0","method":"$/ping"}"# + "\n"
|
||||
return Data(json.utf8)
|
||||
}()
|
||||
|
||||
private func sendKeepalive() {
|
||||
guard let pipe = stdinPipe else { return }
|
||||
let handle = pipe.fileHandleForWriting
|
||||
Task.detached {
|
||||
// Empty newline — JSON-RPC parser skips it, but triggers EPIPE if process is dead
|
||||
handle.write(Data("\n".utf8))
|
||||
let fd = stdinFd
|
||||
guard fd >= 0 else { return }
|
||||
Task.detached { [weak self] in
|
||||
let ok = Self.safeWrite(fd: fd, data: Self.keepalivePayload)
|
||||
if !ok {
|
||||
await self?.handleWriteFailed()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -291,10 +318,11 @@ actor ACPClient {
|
||||
|
||||
defer { timeoutTask?.cancel() }
|
||||
|
||||
let fd = stdinFd
|
||||
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<AnyCodable?, Error>) in
|
||||
pendingRequests[requestId] = continuation
|
||||
|
||||
guard let pipe = stdinPipe else {
|
||||
guard fd >= 0 else {
|
||||
pendingRequests.removeValue(forKey: requestId)
|
||||
continuation.resume(throwing: ACPClientError.notConnected)
|
||||
return
|
||||
@@ -304,8 +332,12 @@ actor ACPClient {
|
||||
payload.append(contentsOf: "\n".utf8)
|
||||
// Write in a detached task to avoid blocking the actor's executor.
|
||||
// The continuation is already stored; the response arrives via the read loop.
|
||||
let handle = pipe.fileHandleForWriting
|
||||
Task.detached { handle.write(payload) }
|
||||
Task.detached { [weak self] in
|
||||
let ok = Self.safeWrite(fd: fd, data: payload)
|
||||
if !ok {
|
||||
await self?.handleWriteFailedForRequest(id: requestId)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -317,12 +349,17 @@ actor ACPClient {
|
||||
}
|
||||
|
||||
private func writeJSON(_ dict: [String: Any]) {
|
||||
guard let pipe = stdinPipe,
|
||||
let fd = stdinFd
|
||||
guard fd >= 0,
|
||||
let data = try? JSONSerialization.data(withJSONObject: dict) else { return }
|
||||
var payload = data
|
||||
payload.append(contentsOf: "\n".utf8)
|
||||
let handle = pipe.fileHandleForWriting
|
||||
Task.detached { handle.write(payload) }
|
||||
Task.detached { [weak self] in
|
||||
let ok = Self.safeWrite(fd: fd, data: payload)
|
||||
if !ok {
|
||||
await self?.handleWriteFailed()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Read Loop
|
||||
@@ -402,9 +439,12 @@ actor ACPClient {
|
||||
}
|
||||
}
|
||||
|
||||
private func handleReadLoopEnded() {
|
||||
guard isConnected else { return } // idempotent with handleTermination
|
||||
logger.warning("ACP read loop ended unexpectedly — cleaning up")
|
||||
// MARK: - Disconnect Cleanup
|
||||
|
||||
/// Single idempotent cleanup path for all disconnect scenarios.
|
||||
private func performDisconnectCleanup(reason: String) {
|
||||
guard isConnected else { return }
|
||||
logger.warning("ACP disconnecting: \(reason)")
|
||||
isConnected = false
|
||||
statusMessage = "Connection lost"
|
||||
for (_, continuation) in pendingRequests {
|
||||
@@ -415,16 +455,41 @@ actor ACPClient {
|
||||
eventContinuation = nil
|
||||
}
|
||||
|
||||
private func handleReadLoopEnded() {
|
||||
performDisconnectCleanup(reason: "read loop ended (EOF)")
|
||||
}
|
||||
|
||||
private func handleTermination(exitCode: Int32) {
|
||||
logger.info("hermes acp process terminated with code \(exitCode)")
|
||||
statusMessage = "Process exited (\(exitCode))"
|
||||
isConnected = false
|
||||
for (_, continuation) in pendingRequests {
|
||||
performDisconnectCleanup(reason: "process exited (\(exitCode))")
|
||||
}
|
||||
|
||||
private func handleWriteFailed() {
|
||||
performDisconnectCleanup(reason: "write failed (broken pipe)")
|
||||
}
|
||||
|
||||
private func handleWriteFailedForRequest(id: Int) {
|
||||
if let continuation = pendingRequests.removeValue(forKey: id) {
|
||||
continuation.resume(throwing: ACPClientError.processTerminated)
|
||||
}
|
||||
pendingRequests.removeAll()
|
||||
eventContinuation?.finish()
|
||||
eventContinuation = nil
|
||||
performDisconnectCleanup(reason: "write failed (broken pipe)")
|
||||
}
|
||||
|
||||
// MARK: - Safe POSIX Write
|
||||
|
||||
/// Write data to a file descriptor using POSIX write(), returning false on error.
|
||||
/// Handles partial writes and returns false on EPIPE or other errors.
|
||||
private static func safeWrite(fd: Int32, data: Data) -> Bool {
|
||||
data.withUnsafeBytes { buf in
|
||||
guard let base = buf.baseAddress else { return false }
|
||||
var written = 0
|
||||
let total = buf.count
|
||||
while written < total {
|
||||
let result = Darwin.write(fd, base.advanced(by: written), total - written)
|
||||
if result <= 0 { return false }
|
||||
written += result
|
||||
}
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -26,12 +26,14 @@ final class ChatViewModel {
|
||||
private var acpPromptTask: Task<Void, Never>?
|
||||
private var healthMonitorTask: Task<Void, Never>?
|
||||
private var reconnectTask: Task<Void, Never>?
|
||||
private var isHandlingDisconnect = false
|
||||
var isACPConnected: Bool { acpClient != nil && hasActiveProcess }
|
||||
var acpStatus: String = ""
|
||||
var acpError: String?
|
||||
|
||||
private static let maxReconnectAttempts = 3
|
||||
private static let maxReconnectAttempts = 5
|
||||
private static let reconnectBaseDelay: UInt64 = 1_000_000_000 // 1 second
|
||||
private static let maxReconnectDelay: UInt64 = 16_000_000_000 // 16 seconds
|
||||
|
||||
var hermesBinaryExists: Bool {
|
||||
FileManager.default.fileExists(atPath: HermesPaths.hermesBinary)
|
||||
@@ -295,9 +297,13 @@ final class ChatViewModel {
|
||||
}
|
||||
|
||||
private func handleConnectionDied() {
|
||||
guard acpClient != nil else { return } // already handled
|
||||
guard acpClient != nil, !isHandlingDisconnect else { return }
|
||||
isHandlingDisconnect = true
|
||||
logger.warning("ACP connection died")
|
||||
|
||||
// Finalize any in-progress streaming message before reconnection
|
||||
richChatViewModel.finalizeOnDisconnect()
|
||||
|
||||
// Save session ID for reconnection before cleaning up
|
||||
let savedSessionId = richChatViewModel.sessionId
|
||||
|
||||
@@ -317,6 +323,7 @@ final class ChatViewModel {
|
||||
// Attempt auto-reconnect if we have a session to restore
|
||||
guard let savedSessionId else {
|
||||
showConnectionFailure()
|
||||
isHandlingDisconnect = false
|
||||
return
|
||||
}
|
||||
attemptReconnect(sessionId: savedSessionId)
|
||||
@@ -337,7 +344,10 @@ final class ChatViewModel {
|
||||
|
||||
// Backoff delay (skip on first attempt for fast recovery)
|
||||
if attempt > 1 {
|
||||
let delay = Self.reconnectBaseDelay * UInt64(1 << (attempt - 1))
|
||||
let delay = min(
|
||||
Self.reconnectBaseDelay * UInt64(1 << (attempt - 1)),
|
||||
Self.maxReconnectDelay
|
||||
)
|
||||
try? await Task.sleep(nanoseconds: delay)
|
||||
guard !Task.isCancelled else { return }
|
||||
}
|
||||
@@ -348,23 +358,31 @@ final class ChatViewModel {
|
||||
|
||||
let cwd = NSHomeDirectory()
|
||||
let resolvedSessionId: String
|
||||
|
||||
// Try resumeSession first (designed for reconnection), then loadSession.
|
||||
// NEVER fall back to newSession — that loses all conversation context.
|
||||
do {
|
||||
resolvedSessionId = try await client.loadSession(cwd: cwd, sessionId: sessionId)
|
||||
resolvedSessionId = try await client.resumeSession(cwd: cwd, sessionId: sessionId)
|
||||
} catch {
|
||||
logger.info("Session \(sessionId) not loadable, creating new: \(error.localizedDescription)")
|
||||
resolvedSessionId = try await client.newSession(cwd: cwd)
|
||||
logger.info("session/resume failed, trying session/load: \(error.localizedDescription)")
|
||||
resolvedSessionId = try await client.loadSession(cwd: cwd, sessionId: sessionId)
|
||||
}
|
||||
|
||||
// Success — wire up the new client
|
||||
self.acpClient = client
|
||||
self.hasActiveProcess = true
|
||||
richChatViewModel.setSessionId(resolvedSessionId)
|
||||
|
||||
// Reconcile in-memory messages with what Hermes persisted to DB
|
||||
await richChatViewModel.reconcileWithDB(sessionId: resolvedSessionId)
|
||||
|
||||
acpStatus = "Reconnected (\(resolvedSessionId.prefix(12)))"
|
||||
acpError = nil
|
||||
|
||||
startACPEventLoop(client: client)
|
||||
startHealthMonitor(client: client)
|
||||
|
||||
isHandlingDisconnect = false
|
||||
logger.info("Reconnected successfully on attempt \(attempt)")
|
||||
return
|
||||
} catch {
|
||||
@@ -377,6 +395,7 @@ final class ChatViewModel {
|
||||
// All attempts exhausted
|
||||
guard !Task.isCancelled else { return }
|
||||
showConnectionFailure()
|
||||
isHandlingDisconnect = false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -400,6 +419,7 @@ final class ChatViewModel {
|
||||
}
|
||||
acpClient = nil
|
||||
hasActiveProcess = false
|
||||
isHandlingDisconnect = false
|
||||
}
|
||||
|
||||
/// Respond to a permission request from the ACP agent.
|
||||
|
||||
@@ -281,6 +281,64 @@ final class RichChatViewModel {
|
||||
streamingToolCalls = []
|
||||
}
|
||||
|
||||
// MARK: - Disconnect Recovery
|
||||
|
||||
/// Finalize streaming state on disconnect, before reconnection attempts begin.
|
||||
/// Saves partial content as a permanent message without adding a system message.
|
||||
func finalizeOnDisconnect() {
|
||||
finalizeStreamingMessage()
|
||||
isAgentWorking = false
|
||||
pendingPermission = nil
|
||||
buildMessageGroups()
|
||||
}
|
||||
|
||||
/// Reconcile in-memory messages with DB state after a successful reconnection.
|
||||
/// Merges DB-persisted messages with any local-only messages (e.g., user messages
|
||||
/// that the ACP process may not have persisted before crashing).
|
||||
func reconcileWithDB(sessionId: String) async {
|
||||
let opened = await dataService.open()
|
||||
guard opened else { return }
|
||||
|
||||
var dbMessages = await dataService.fetchMessages(sessionId: sessionId)
|
||||
|
||||
// If we have an origin session (CLI session continued via ACP),
|
||||
// include those messages too
|
||||
if let origin = originSessionId, origin != sessionId {
|
||||
let originMessages = await dataService.fetchMessages(sessionId: origin)
|
||||
if !originMessages.isEmpty {
|
||||
dbMessages = originMessages + dbMessages
|
||||
dbMessages.sort { ($0.timestamp ?? .distantPast) < ($1.timestamp ?? .distantPast) }
|
||||
}
|
||||
}
|
||||
|
||||
let session = await dataService.fetchSession(id: sessionId)
|
||||
await dataService.close()
|
||||
|
||||
// Find local-only user messages not yet in DB.
|
||||
// Local messages have negative IDs; DB messages have positive IDs.
|
||||
let dbUserContents = Set(dbMessages.filter(\.isUser).map(\.content))
|
||||
let localOnlyMessages = messages.filter { msg in
|
||||
msg.id < 0 && msg.isUser && !dbUserContents.contains(msg.content)
|
||||
}
|
||||
|
||||
// Build reconciled list: DB messages + unmatched local user messages
|
||||
var reconciled = dbMessages
|
||||
for localMsg in localOnlyMessages {
|
||||
if let ts = localMsg.timestamp,
|
||||
let insertIdx = reconciled.firstIndex(where: { ($0.timestamp ?? .distantPast) > ts }) {
|
||||
reconciled.insert(localMsg, at: insertIdx)
|
||||
} else {
|
||||
reconciled.append(localMsg)
|
||||
}
|
||||
}
|
||||
|
||||
messages = reconciled
|
||||
currentSession = session
|
||||
let minId = reconciled.map(\.id).min() ?? 0
|
||||
nextLocalId = min(minId - 1, -1)
|
||||
buildMessageGroups()
|
||||
}
|
||||
|
||||
// MARK: - Load History from DB (for resumed sessions)
|
||||
|
||||
/// Load message history from the DB, optionally combining an origin session
|
||||
|
||||
@@ -40,6 +40,14 @@ struct ChatView: View {
|
||||
.foregroundStyle(.red)
|
||||
.lineLimit(1)
|
||||
.help(error)
|
||||
if let sid = viewModel.richChatViewModel.sessionId {
|
||||
Button("Reconnect") {
|
||||
viewModel.resumeSession(sid)
|
||||
}
|
||||
.font(.caption)
|
||||
.buttonStyle(.bordered)
|
||||
.controlSize(.small)
|
||||
}
|
||||
} else if !viewModel.acpStatus.isEmpty {
|
||||
Circle()
|
||||
.fill(.yellow)
|
||||
|
||||
Reference in New Issue
Block a user