mirror of
https://github.com/awizemann/scarf.git
synced 2026-05-08 02:14:37 +00:00
fix: Harden ACP session stability and recover messages on reconnection
Sessions were silently dying and losing chat history because:
- Pipe write errors (EPIPE) were completely undetected — broken pipe
writes via Task.detached { handle.write() } failed silently, leaving
the app unaware the subprocess had crashed
- Reconnection fell back to newSession() when loadSession() failed,
creating a blank session and permanently losing all conversation context
- No message reconciliation after reconnect — DB-persisted messages
were never re-fetched, so the UI stayed stale/incomplete
- Keepalive sent bare "\n" which caused json.loads("") parse errors
in the ACP library every 30 seconds, destabilizing the connection
- TERM=xterm-256color was set on a pipe-based subprocess, risking
terminal escape sequence pollution in the JSON-RPC stream
Fixes:
- Replace FileHandle.write() with POSIX Darwin.write() + SIGPIPE
suppression for immediate broken-pipe detection at all write sites
- Send valid JSON-RPC notification {"jsonrpc":"2.0","method":"$/ping"}
as keepalive instead of bare newlines
- Never fall back to newSession() during reconnection — try
resumeSession then loadSession, fail visibly if both fail
- Add reconcileWithDB() to merge DB-persisted messages with local
state after successful reconnection
- Finalize streaming messages immediately on disconnect so partial
content is preserved before reconnection begins
- Use SIGINT instead of SIGTERM for graceful Python subprocess shutdown
- Remove TERM env var from ACP subprocess environment
- Consolidate disconnect cleanup into single idempotent method
- Add isHandlingDisconnect guard against double-handling
- Increase reconnect attempts from 3 to 5 with capped backoff
- Add "Reconnect" button to toolbar error state
Also: bump version to 1.5.1, set deployment target to macOS 14.6
(Sonoma), and update README with rich chat/ACP features, process
controls, skill editing, and corrected system requirements.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -10,7 +10,7 @@
|
|||||||
</p>
|
</p>
|
||||||
|
|
||||||
<p align="center">
|
<p align="center">
|
||||||
<img src="https://img.shields.io/badge/macOS-26.2+-blue" alt="macOS">
|
<img src="https://img.shields.io/badge/macOS-14.6+%20Sonoma-blue" alt="macOS">
|
||||||
<img src="https://img.shields.io/badge/Swift-6-orange" alt="Swift">
|
<img src="https://img.shields.io/badge/Swift-6-orange" alt="Swift">
|
||||||
<img src="https://img.shields.io/badge/license-MIT-green" alt="License">
|
<img src="https://img.shields.io/badge/license-MIT-green" alt="License">
|
||||||
<br><br>
|
<br><br>
|
||||||
@@ -22,22 +22,23 @@
|
|||||||
- **Dashboard** — System health, token usage, cost tracking, recent sessions with live refresh
|
- **Dashboard** — System health, token usage, cost tracking, recent sessions with live refresh
|
||||||
- **Insights** — Usage analytics with token breakdown (including reasoning tokens), cost tracking, model/platform stats, top tools bar chart, activity heatmaps, notable sessions, and time period filtering (7/30/90 days or all time)
|
- **Insights** — Usage analytics with token breakdown (including reasoning tokens), cost tracking, model/platform stats, top tools bar chart, activity heatmaps, notable sessions, and time period filtering (7/30/90 days or all time)
|
||||||
- **Sessions Browser** — Full conversation history with message rendering, model reasoning/thinking display, tool call inspection, full-text search, rename, delete, and JSONL export. Subagent sessions are filtered from the main list and accessible via parent session drill-down
|
- **Sessions Browser** — Full conversation history with message rendering, model reasoning/thinking display, tool call inspection, full-text search, rename, delete, and JSONL export. Subagent sessions are filtered from the main list and accessible via parent session drill-down
|
||||||
- **Activity Feed** — Recent tool execution log with filtering by kind and session, detail inspector with pretty-printed arguments
|
- **Activity Feed** — Recent tool execution log with filtering by kind and session, detail inspector with pretty-printed arguments and tool output display
|
||||||
- **Live Chat** — Embedded terminal running `hermes chat` with full ANSI color and Rich formatting via [SwiftTerm](https://github.com/migueldeicaza/SwiftTerm), session persistence across navigation, resume/continue previous sessions, and voice mode controls
|
- **Live Chat** — Two modes: **Rich Chat** streams responses in real-time via the Agent Client Protocol (ACP) with iMessage-style bubbles, markdown rendering, tool call visualization, thinking/reasoning display, and permission request dialogs; **Terminal** runs `hermes chat` with full ANSI color and Rich formatting via [SwiftTerm](https://github.com/migueldeicaza/SwiftTerm). Both modes support session persistence, resume/continue previous sessions, auto-reconnection with session recovery, and voice mode controls
|
||||||
- **Memory Viewer/Editor** — View and edit Hermes's MEMORY.md and USER.md with live file-watcher refresh, external memory provider awareness (Honcho, Supermemory, etc.), and profile-scoped memory support with profile picker
|
- **Memory Viewer/Editor** — View and edit Hermes's MEMORY.md and USER.md with live file-watcher refresh, external memory provider awareness (Honcho, Supermemory, etc.), and profile-scoped memory support with profile picker
|
||||||
- **Skills Browser** — Browse all installed skills by category with file content viewer, file switcher, and required config warnings for skills that need specific settings
|
- **Skills Browser** — Browse and edit installed skills by category with file content viewer, file switcher, and required config warnings for skills that need specific settings
|
||||||
- **Tools Manager** — Enable/disable toolsets per platform (CLI, Telegram, Discord, Slack, WhatsApp, Signal, Email, Home Assistant, Webhook, Matrix, Feishu, Mattermost) with toggle switches and segmented platform picker, MCP server status
|
- **Tools Manager** — Enable/disable toolsets per platform (CLI, Telegram, Discord, Slack, WhatsApp, Signal, Email, Home Assistant, Webhook, Matrix, Feishu, Mattermost) with toggle switches and segmented platform picker, MCP server status
|
||||||
- **Gateway Control** — Start/stop/restart the messaging gateway, view platform connection status, manage user pairing (approve/revoke)
|
- **Gateway Control** — Start/stop/restart the messaging gateway, view platform connection status, manage user pairing (approve/revoke)
|
||||||
- **Cron Manager** — View scheduled jobs with pre-run scripts, delivery failure tracking, timeout info, and `[SILENT]` job indicators
|
- **Cron Manager** — View scheduled jobs with pre-run scripts, delivery failure tracking, timeout info, and `[SILENT]` job indicators
|
||||||
- **Log Viewer** — Real-time log tailing for agent.log, errors.log, and gateway.log with level filtering and text search
|
- **Log Viewer** — Real-time log tailing for agent.log, errors.log, and gateway.log with level filtering and text search
|
||||||
- **Project Dashboards** — Custom, agent-generated dashboards for any project. Define stat boxes, charts, tables, progress bars, checklists, rich text, and embedded web views in a simple JSON file — Scarf renders them with live refresh. Let your Hermes agent build and maintain project-specific visualizations automatically
|
- **Project Dashboards** — Custom, agent-generated dashboards for any project. Define stat boxes, charts, tables, progress bars, checklists, rich text, and embedded web views in a simple JSON file — Scarf renders them with live refresh. Let your Hermes agent build and maintain project-specific visualizations automatically
|
||||||
- **Settings** — Structured config editor for all Hermes settings including model/provider selection, browser backend, reasoning effort, approval mode, cost display, Docker environment, command allowlist, credential management, and more
|
- **Settings** — Structured config editor for all Hermes settings including model/provider selection, browser backend, reasoning effort, approval mode, cost display, Docker environment, command allowlist, credential management, and more
|
||||||
|
- **Hermes Process Control** — Start, stop, and restart the Hermes agent directly from Scarf
|
||||||
- **Menu Bar** — Status icon showing Hermes running state with quick actions
|
- **Menu Bar** — Status icon showing Hermes running state with quick actions
|
||||||
|
|
||||||
## Requirements
|
## Requirements
|
||||||
|
|
||||||
- macOS 26.2+
|
- macOS 14.6+ (Sonoma)
|
||||||
- Xcode 26.3+
|
- Xcode 16.0+
|
||||||
- [Hermes agent](https://github.com/hermes-ai/hermes-agent) v0.6.0+ installed at `~/.hermes/` (v0.8.0 recommended for full feature support)
|
- [Hermes agent](https://github.com/hermes-ai/hermes-agent) v0.6.0+ installed at `~/.hermes/` (v0.8.0 recommended for full feature support)
|
||||||
|
|
||||||
### Compatibility
|
### Compatibility
|
||||||
@@ -91,7 +92,7 @@ scarf/
|
|||||||
Sessions/ Conversation browser with rename, delete, export
|
Sessions/ Conversation browser with rename, delete, export
|
||||||
Activity/ Tool execution feed with inspector
|
Activity/ Tool execution feed with inspector
|
||||||
Projects/ Agent-generated project dashboards with widget rendering
|
Projects/ Agent-generated project dashboards with widget rendering
|
||||||
Chat/ Embedded terminal via SwiftTerm with voice controls
|
Chat/ Rich ACP chat and embedded terminal with voice controls
|
||||||
Memory/ Memory viewer and editor
|
Memory/ Memory viewer and editor
|
||||||
Skills/ Skill browser by category
|
Skills/ Skill browser by category
|
||||||
Tools/ Toolset management per platform
|
Tools/ Toolset management per platform
|
||||||
@@ -115,6 +116,7 @@ Scarf reads Hermes data directly from `~/.hermes/`:
|
|||||||
| `logs/*.log` | Text | Read-only |
|
| `logs/*.log` | Text | Read-only |
|
||||||
| `gateway_state.json` | JSON | Read-only |
|
| `gateway_state.json` | JSON | Read-only |
|
||||||
| `skills/` | Directory tree | Read-only |
|
| `skills/` | Directory tree | Read-only |
|
||||||
|
| `hermes acp` | ACP subprocess (JSON-RPC stdio) | Real-time chat |
|
||||||
| `hermes chat` | Terminal subprocess | Interactive |
|
| `hermes chat` | Terminal subprocess | Interactive |
|
||||||
| `hermes tools` | CLI commands | Enable/Disable |
|
| `hermes tools` | CLI commands | Enable/Disable |
|
||||||
| `hermes sessions` | CLI commands | Rename/Delete/Export |
|
| `hermes sessions` | CLI commands | Rename/Delete/Export |
|
||||||
@@ -137,7 +139,7 @@ Everything else uses system frameworks: SQLite3 C API, Foundation JSON, Attribut
|
|||||||
|
|
||||||
Scarf watches `~/.hermes/` for file changes and queries the SQLite database for sessions, messages, and analytics. Views refresh automatically when Hermes writes new data.
|
Scarf watches `~/.hermes/` for file changes and queries the SQLite database for sessions, messages, and analytics. Views refresh automatically when Hermes writes new data.
|
||||||
|
|
||||||
The Chat tab spawns `hermes chat` as a subprocess in a pseudo-terminal, giving you the full interactive CLI experience with proper ANSI rendering. Sessions persist across navigation — switch tabs and come back without losing your conversation.
|
The Chat tab has two modes. **Rich Chat** communicates with Hermes via the Agent Client Protocol (ACP) — a JSON-RPC connection over stdio — streaming responses in real-time with automatic reconnection and session recovery on connection loss. **Terminal** mode spawns `hermes chat` in a pseudo-terminal for the full interactive CLI experience with proper ANSI rendering. Sessions persist across navigation in both modes — switch tabs and come back without losing your conversation.
|
||||||
|
|
||||||
Management actions (renaming sessions, toggling tools, editing memory) call the Hermes CLI or write directly to the appropriate files, keeping Scarf and Hermes in sync.
|
Management actions (renaming sessions, toggling tools, editing memory) call the Hermes CLI or write directly to the appropriate files, keeping Scarf and Hermes in sync.
|
||||||
|
|
||||||
|
|||||||
@@ -407,7 +407,7 @@
|
|||||||
CODE_SIGN_ENTITLEMENTS = scarf/scarf.entitlements;
|
CODE_SIGN_ENTITLEMENTS = scarf/scarf.entitlements;
|
||||||
CODE_SIGN_STYLE = Automatic;
|
CODE_SIGN_STYLE = Automatic;
|
||||||
COMBINE_HIDPI_IMAGES = YES;
|
COMBINE_HIDPI_IMAGES = YES;
|
||||||
CURRENT_PROJECT_VERSION = 4;
|
CURRENT_PROJECT_VERSION = 5;
|
||||||
DEVELOPMENT_TEAM = 3Q6X2L86C4;
|
DEVELOPMENT_TEAM = 3Q6X2L86C4;
|
||||||
ENABLE_APP_SANDBOX = NO;
|
ENABLE_APP_SANDBOX = NO;
|
||||||
ENABLE_HARDENED_RUNTIME = YES;
|
ENABLE_HARDENED_RUNTIME = YES;
|
||||||
@@ -421,7 +421,8 @@
|
|||||||
"$(inherited)",
|
"$(inherited)",
|
||||||
"@executable_path/../Frameworks",
|
"@executable_path/../Frameworks",
|
||||||
);
|
);
|
||||||
MARKETING_VERSION = 1.5.0;
|
MACOSX_DEPLOYMENT_TARGET = 14.6;
|
||||||
|
MARKETING_VERSION = 1.5.1;
|
||||||
PRODUCT_BUNDLE_IDENTIFIER = com.scarf;
|
PRODUCT_BUNDLE_IDENTIFIER = com.scarf;
|
||||||
PRODUCT_NAME = "$(TARGET_NAME)";
|
PRODUCT_NAME = "$(TARGET_NAME)";
|
||||||
REGISTER_APP_GROUPS = YES;
|
REGISTER_APP_GROUPS = YES;
|
||||||
@@ -443,7 +444,7 @@
|
|||||||
CODE_SIGN_ENTITLEMENTS = scarf/scarf.entitlements;
|
CODE_SIGN_ENTITLEMENTS = scarf/scarf.entitlements;
|
||||||
CODE_SIGN_STYLE = Automatic;
|
CODE_SIGN_STYLE = Automatic;
|
||||||
COMBINE_HIDPI_IMAGES = YES;
|
COMBINE_HIDPI_IMAGES = YES;
|
||||||
CURRENT_PROJECT_VERSION = 4;
|
CURRENT_PROJECT_VERSION = 5;
|
||||||
DEVELOPMENT_TEAM = 3Q6X2L86C4;
|
DEVELOPMENT_TEAM = 3Q6X2L86C4;
|
||||||
ENABLE_APP_SANDBOX = NO;
|
ENABLE_APP_SANDBOX = NO;
|
||||||
ENABLE_HARDENED_RUNTIME = YES;
|
ENABLE_HARDENED_RUNTIME = YES;
|
||||||
@@ -457,7 +458,8 @@
|
|||||||
"$(inherited)",
|
"$(inherited)",
|
||||||
"@executable_path/../Frameworks",
|
"@executable_path/../Frameworks",
|
||||||
);
|
);
|
||||||
MARKETING_VERSION = 1.5.0;
|
MACOSX_DEPLOYMENT_TARGET = 14.6;
|
||||||
|
MARKETING_VERSION = 1.5.1;
|
||||||
PRODUCT_BUNDLE_IDENTIFIER = com.scarf;
|
PRODUCT_BUNDLE_IDENTIFIER = com.scarf;
|
||||||
PRODUCT_NAME = "$(TARGET_NAME)";
|
PRODUCT_NAME = "$(TARGET_NAME)";
|
||||||
REGISTER_APP_GROUPS = YES;
|
REGISTER_APP_GROUPS = YES;
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ actor ACPClient {
|
|||||||
private var stdinPipe: Pipe?
|
private var stdinPipe: Pipe?
|
||||||
private var stdoutPipe: Pipe?
|
private var stdoutPipe: Pipe?
|
||||||
private var stderrPipe: Pipe?
|
private var stderrPipe: Pipe?
|
||||||
|
private var stdinFd: Int32 = -1
|
||||||
|
|
||||||
private var nextRequestId = 1
|
private var nextRequestId = 1
|
||||||
private var pendingRequests: [Int: CheckedContinuation<AnyCodable?, Error>] = [:]
|
private var pendingRequests: [Int: CheckedContinuation<AnyCodable?, Error>] = [:]
|
||||||
@@ -45,6 +46,9 @@ actor ACPClient {
|
|||||||
func start() async throws {
|
func start() async throws {
|
||||||
guard process == nil else { return }
|
guard process == nil else { return }
|
||||||
|
|
||||||
|
// Ignore SIGPIPE so broken-pipe writes return EPIPE instead of crashing
|
||||||
|
signal(SIGPIPE, SIG_IGN)
|
||||||
|
|
||||||
// Create the event stream BEFORE anything else so no events are lost
|
// Create the event stream BEFORE anything else so no events are lost
|
||||||
let (stream, continuation) = AsyncStream.makeStream(of: ACPEvent.self)
|
let (stream, continuation) = AsyncStream.makeStream(of: ACPEvent.self)
|
||||||
self._eventStream = stream
|
self._eventStream = stream
|
||||||
@@ -62,8 +66,9 @@ actor ACPClient {
|
|||||||
proc.standardOutput = stdout
|
proc.standardOutput = stdout
|
||||||
proc.standardError = stderr
|
proc.standardError = stderr
|
||||||
|
|
||||||
|
// ACP uses JSON-RPC over pipes — do NOT set TERM to avoid terminal escape pollution
|
||||||
var env = ProcessInfo.processInfo.environment
|
var env = ProcessInfo.processInfo.environment
|
||||||
env["TERM"] = "xterm-256color"
|
env.removeValue(forKey: "TERM")
|
||||||
proc.environment = env
|
proc.environment = env
|
||||||
|
|
||||||
proc.terminationHandler = { [weak self] proc in
|
proc.terminationHandler = { [weak self] proc in
|
||||||
@@ -85,6 +90,7 @@ actor ACPClient {
|
|||||||
self.stdinPipe = stdin
|
self.stdinPipe = stdin
|
||||||
self.stdoutPipe = stdout
|
self.stdoutPipe = stdout
|
||||||
self.stderrPipe = stderr
|
self.stderrPipe = stderr
|
||||||
|
self.stdinFd = stdin.fileHandleForWriting.fileDescriptor
|
||||||
self.isConnected = true
|
self.isConnected = true
|
||||||
|
|
||||||
// Start reading stdout BEFORE sending initialize (so we catch the response)
|
// Start reading stdout BEFORE sending initialize (so we catch the response)
|
||||||
@@ -123,10 +129,21 @@ actor ACPClient {
|
|||||||
}
|
}
|
||||||
pendingRequests.removeAll()
|
pendingRequests.removeAll()
|
||||||
|
|
||||||
if let process, process.isRunning {
|
// Close stdin first so the subprocess sees EOF and can shut down gracefully
|
||||||
process.terminate()
|
|
||||||
}
|
|
||||||
stdinPipe?.fileHandleForWriting.closeFile()
|
stdinPipe?.fileHandleForWriting.closeFile()
|
||||||
|
|
||||||
|
if let process, process.isRunning {
|
||||||
|
// SIGINT for graceful Python shutdown (raises KeyboardInterrupt cleanly)
|
||||||
|
process.interrupt()
|
||||||
|
// Watchdog: force-kill if still running after 2 seconds
|
||||||
|
let watchdogProcess = process
|
||||||
|
Task.detached {
|
||||||
|
try? await Task.sleep(nanoseconds: 2_000_000_000)
|
||||||
|
if watchdogProcess.isRunning {
|
||||||
|
watchdogProcess.terminate()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
stdinPipe?.fileHandleForReading.closeFile()
|
stdinPipe?.fileHandleForReading.closeFile()
|
||||||
stdoutPipe?.fileHandleForReading.closeFile()
|
stdoutPipe?.fileHandleForReading.closeFile()
|
||||||
stderrPipe?.fileHandleForReading.closeFile()
|
stderrPipe?.fileHandleForReading.closeFile()
|
||||||
@@ -135,6 +152,7 @@ actor ACPClient {
|
|||||||
stdinPipe = nil
|
stdinPipe = nil
|
||||||
stdoutPipe = nil
|
stdoutPipe = nil
|
||||||
stderrPipe = nil
|
stderrPipe = nil
|
||||||
|
stdinFd = -1
|
||||||
isConnected = false
|
isConnected = false
|
||||||
currentSessionId = nil
|
currentSessionId = nil
|
||||||
statusMessage = "Disconnected"
|
statusMessage = "Disconnected"
|
||||||
@@ -153,12 +171,21 @@ actor ACPClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Valid JSON-RPC notification used as a keepalive probe.
|
||||||
|
/// Sending bare newlines causes `json.loads("")` errors in the ACP library.
|
||||||
|
private static let keepalivePayload: Data = {
|
||||||
|
let json = #"{"jsonrpc":"2.0","method":"$/ping"}"# + "\n"
|
||||||
|
return Data(json.utf8)
|
||||||
|
}()
|
||||||
|
|
||||||
private func sendKeepalive() {
|
private func sendKeepalive() {
|
||||||
guard let pipe = stdinPipe else { return }
|
let fd = stdinFd
|
||||||
let handle = pipe.fileHandleForWriting
|
guard fd >= 0 else { return }
|
||||||
Task.detached {
|
Task.detached { [weak self] in
|
||||||
// Empty newline — JSON-RPC parser skips it, but triggers EPIPE if process is dead
|
let ok = Self.safeWrite(fd: fd, data: Self.keepalivePayload)
|
||||||
handle.write(Data("\n".utf8))
|
if !ok {
|
||||||
|
await self?.handleWriteFailed()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -291,10 +318,11 @@ actor ACPClient {
|
|||||||
|
|
||||||
defer { timeoutTask?.cancel() }
|
defer { timeoutTask?.cancel() }
|
||||||
|
|
||||||
|
let fd = stdinFd
|
||||||
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<AnyCodable?, Error>) in
|
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<AnyCodable?, Error>) in
|
||||||
pendingRequests[requestId] = continuation
|
pendingRequests[requestId] = continuation
|
||||||
|
|
||||||
guard let pipe = stdinPipe else {
|
guard fd >= 0 else {
|
||||||
pendingRequests.removeValue(forKey: requestId)
|
pendingRequests.removeValue(forKey: requestId)
|
||||||
continuation.resume(throwing: ACPClientError.notConnected)
|
continuation.resume(throwing: ACPClientError.notConnected)
|
||||||
return
|
return
|
||||||
@@ -304,8 +332,12 @@ actor ACPClient {
|
|||||||
payload.append(contentsOf: "\n".utf8)
|
payload.append(contentsOf: "\n".utf8)
|
||||||
// Write in a detached task to avoid blocking the actor's executor.
|
// Write in a detached task to avoid blocking the actor's executor.
|
||||||
// The continuation is already stored; the response arrives via the read loop.
|
// The continuation is already stored; the response arrives via the read loop.
|
||||||
let handle = pipe.fileHandleForWriting
|
Task.detached { [weak self] in
|
||||||
Task.detached { handle.write(payload) }
|
let ok = Self.safeWrite(fd: fd, data: payload)
|
||||||
|
if !ok {
|
||||||
|
await self?.handleWriteFailedForRequest(id: requestId)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -317,12 +349,17 @@ actor ACPClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private func writeJSON(_ dict: [String: Any]) {
|
private func writeJSON(_ dict: [String: Any]) {
|
||||||
guard let pipe = stdinPipe,
|
let fd = stdinFd
|
||||||
|
guard fd >= 0,
|
||||||
let data = try? JSONSerialization.data(withJSONObject: dict) else { return }
|
let data = try? JSONSerialization.data(withJSONObject: dict) else { return }
|
||||||
var payload = data
|
var payload = data
|
||||||
payload.append(contentsOf: "\n".utf8)
|
payload.append(contentsOf: "\n".utf8)
|
||||||
let handle = pipe.fileHandleForWriting
|
Task.detached { [weak self] in
|
||||||
Task.detached { handle.write(payload) }
|
let ok = Self.safeWrite(fd: fd, data: payload)
|
||||||
|
if !ok {
|
||||||
|
await self?.handleWriteFailed()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// MARK: - Read Loop
|
// MARK: - Read Loop
|
||||||
@@ -402,9 +439,12 @@ actor ACPClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func handleReadLoopEnded() {
|
// MARK: - Disconnect Cleanup
|
||||||
guard isConnected else { return } // idempotent with handleTermination
|
|
||||||
logger.warning("ACP read loop ended unexpectedly — cleaning up")
|
/// Single idempotent cleanup path for all disconnect scenarios.
|
||||||
|
private func performDisconnectCleanup(reason: String) {
|
||||||
|
guard isConnected else { return }
|
||||||
|
logger.warning("ACP disconnecting: \(reason)")
|
||||||
isConnected = false
|
isConnected = false
|
||||||
statusMessage = "Connection lost"
|
statusMessage = "Connection lost"
|
||||||
for (_, continuation) in pendingRequests {
|
for (_, continuation) in pendingRequests {
|
||||||
@@ -415,16 +455,41 @@ actor ACPClient {
|
|||||||
eventContinuation = nil
|
eventContinuation = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func handleReadLoopEnded() {
|
||||||
|
performDisconnectCleanup(reason: "read loop ended (EOF)")
|
||||||
|
}
|
||||||
|
|
||||||
private func handleTermination(exitCode: Int32) {
|
private func handleTermination(exitCode: Int32) {
|
||||||
logger.info("hermes acp process terminated with code \(exitCode)")
|
performDisconnectCleanup(reason: "process exited (\(exitCode))")
|
||||||
statusMessage = "Process exited (\(exitCode))"
|
}
|
||||||
isConnected = false
|
|
||||||
for (_, continuation) in pendingRequests {
|
private func handleWriteFailed() {
|
||||||
|
performDisconnectCleanup(reason: "write failed (broken pipe)")
|
||||||
|
}
|
||||||
|
|
||||||
|
private func handleWriteFailedForRequest(id: Int) {
|
||||||
|
if let continuation = pendingRequests.removeValue(forKey: id) {
|
||||||
continuation.resume(throwing: ACPClientError.processTerminated)
|
continuation.resume(throwing: ACPClientError.processTerminated)
|
||||||
}
|
}
|
||||||
pendingRequests.removeAll()
|
performDisconnectCleanup(reason: "write failed (broken pipe)")
|
||||||
eventContinuation?.finish()
|
}
|
||||||
eventContinuation = nil
|
|
||||||
|
// MARK: - Safe POSIX Write
|
||||||
|
|
||||||
|
/// Write data to a file descriptor using POSIX write(), returning false on error.
|
||||||
|
/// Handles partial writes and returns false on EPIPE or other errors.
|
||||||
|
private static func safeWrite(fd: Int32, data: Data) -> Bool {
|
||||||
|
data.withUnsafeBytes { buf in
|
||||||
|
guard let base = buf.baseAddress else { return false }
|
||||||
|
var written = 0
|
||||||
|
let total = buf.count
|
||||||
|
while written < total {
|
||||||
|
let result = Darwin.write(fd, base.advanced(by: written), total - written)
|
||||||
|
if result <= 0 { return false }
|
||||||
|
written += result
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -26,12 +26,14 @@ final class ChatViewModel {
|
|||||||
private var acpPromptTask: Task<Void, Never>?
|
private var acpPromptTask: Task<Void, Never>?
|
||||||
private var healthMonitorTask: Task<Void, Never>?
|
private var healthMonitorTask: Task<Void, Never>?
|
||||||
private var reconnectTask: Task<Void, Never>?
|
private var reconnectTask: Task<Void, Never>?
|
||||||
|
private var isHandlingDisconnect = false
|
||||||
var isACPConnected: Bool { acpClient != nil && hasActiveProcess }
|
var isACPConnected: Bool { acpClient != nil && hasActiveProcess }
|
||||||
var acpStatus: String = ""
|
var acpStatus: String = ""
|
||||||
var acpError: String?
|
var acpError: String?
|
||||||
|
|
||||||
private static let maxReconnectAttempts = 3
|
private static let maxReconnectAttempts = 5
|
||||||
private static let reconnectBaseDelay: UInt64 = 1_000_000_000 // 1 second
|
private static let reconnectBaseDelay: UInt64 = 1_000_000_000 // 1 second
|
||||||
|
private static let maxReconnectDelay: UInt64 = 16_000_000_000 // 16 seconds
|
||||||
|
|
||||||
var hermesBinaryExists: Bool {
|
var hermesBinaryExists: Bool {
|
||||||
FileManager.default.fileExists(atPath: HermesPaths.hermesBinary)
|
FileManager.default.fileExists(atPath: HermesPaths.hermesBinary)
|
||||||
@@ -295,9 +297,13 @@ final class ChatViewModel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private func handleConnectionDied() {
|
private func handleConnectionDied() {
|
||||||
guard acpClient != nil else { return } // already handled
|
guard acpClient != nil, !isHandlingDisconnect else { return }
|
||||||
|
isHandlingDisconnect = true
|
||||||
logger.warning("ACP connection died")
|
logger.warning("ACP connection died")
|
||||||
|
|
||||||
|
// Finalize any in-progress streaming message before reconnection
|
||||||
|
richChatViewModel.finalizeOnDisconnect()
|
||||||
|
|
||||||
// Save session ID for reconnection before cleaning up
|
// Save session ID for reconnection before cleaning up
|
||||||
let savedSessionId = richChatViewModel.sessionId
|
let savedSessionId = richChatViewModel.sessionId
|
||||||
|
|
||||||
@@ -317,6 +323,7 @@ final class ChatViewModel {
|
|||||||
// Attempt auto-reconnect if we have a session to restore
|
// Attempt auto-reconnect if we have a session to restore
|
||||||
guard let savedSessionId else {
|
guard let savedSessionId else {
|
||||||
showConnectionFailure()
|
showConnectionFailure()
|
||||||
|
isHandlingDisconnect = false
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
attemptReconnect(sessionId: savedSessionId)
|
attemptReconnect(sessionId: savedSessionId)
|
||||||
@@ -337,7 +344,10 @@ final class ChatViewModel {
|
|||||||
|
|
||||||
// Backoff delay (skip on first attempt for fast recovery)
|
// Backoff delay (skip on first attempt for fast recovery)
|
||||||
if attempt > 1 {
|
if attempt > 1 {
|
||||||
let delay = Self.reconnectBaseDelay * UInt64(1 << (attempt - 1))
|
let delay = min(
|
||||||
|
Self.reconnectBaseDelay * UInt64(1 << (attempt - 1)),
|
||||||
|
Self.maxReconnectDelay
|
||||||
|
)
|
||||||
try? await Task.sleep(nanoseconds: delay)
|
try? await Task.sleep(nanoseconds: delay)
|
||||||
guard !Task.isCancelled else { return }
|
guard !Task.isCancelled else { return }
|
||||||
}
|
}
|
||||||
@@ -348,23 +358,31 @@ final class ChatViewModel {
|
|||||||
|
|
||||||
let cwd = NSHomeDirectory()
|
let cwd = NSHomeDirectory()
|
||||||
let resolvedSessionId: String
|
let resolvedSessionId: String
|
||||||
|
|
||||||
|
// Try resumeSession first (designed for reconnection), then loadSession.
|
||||||
|
// NEVER fall back to newSession — that loses all conversation context.
|
||||||
do {
|
do {
|
||||||
resolvedSessionId = try await client.loadSession(cwd: cwd, sessionId: sessionId)
|
resolvedSessionId = try await client.resumeSession(cwd: cwd, sessionId: sessionId)
|
||||||
} catch {
|
} catch {
|
||||||
logger.info("Session \(sessionId) not loadable, creating new: \(error.localizedDescription)")
|
logger.info("session/resume failed, trying session/load: \(error.localizedDescription)")
|
||||||
resolvedSessionId = try await client.newSession(cwd: cwd)
|
resolvedSessionId = try await client.loadSession(cwd: cwd, sessionId: sessionId)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Success — wire up the new client
|
// Success — wire up the new client
|
||||||
self.acpClient = client
|
self.acpClient = client
|
||||||
self.hasActiveProcess = true
|
self.hasActiveProcess = true
|
||||||
richChatViewModel.setSessionId(resolvedSessionId)
|
richChatViewModel.setSessionId(resolvedSessionId)
|
||||||
|
|
||||||
|
// Reconcile in-memory messages with what Hermes persisted to DB
|
||||||
|
await richChatViewModel.reconcileWithDB(sessionId: resolvedSessionId)
|
||||||
|
|
||||||
acpStatus = "Reconnected (\(resolvedSessionId.prefix(12)))"
|
acpStatus = "Reconnected (\(resolvedSessionId.prefix(12)))"
|
||||||
acpError = nil
|
acpError = nil
|
||||||
|
|
||||||
startACPEventLoop(client: client)
|
startACPEventLoop(client: client)
|
||||||
startHealthMonitor(client: client)
|
startHealthMonitor(client: client)
|
||||||
|
|
||||||
|
isHandlingDisconnect = false
|
||||||
logger.info("Reconnected successfully on attempt \(attempt)")
|
logger.info("Reconnected successfully on attempt \(attempt)")
|
||||||
return
|
return
|
||||||
} catch {
|
} catch {
|
||||||
@@ -377,6 +395,7 @@ final class ChatViewModel {
|
|||||||
// All attempts exhausted
|
// All attempts exhausted
|
||||||
guard !Task.isCancelled else { return }
|
guard !Task.isCancelled else { return }
|
||||||
showConnectionFailure()
|
showConnectionFailure()
|
||||||
|
isHandlingDisconnect = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -400,6 +419,7 @@ final class ChatViewModel {
|
|||||||
}
|
}
|
||||||
acpClient = nil
|
acpClient = nil
|
||||||
hasActiveProcess = false
|
hasActiveProcess = false
|
||||||
|
isHandlingDisconnect = false
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Respond to a permission request from the ACP agent.
|
/// Respond to a permission request from the ACP agent.
|
||||||
|
|||||||
@@ -281,6 +281,64 @@ final class RichChatViewModel {
|
|||||||
streamingToolCalls = []
|
streamingToolCalls = []
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MARK: - Disconnect Recovery
|
||||||
|
|
||||||
|
/// Finalize streaming state on disconnect, before reconnection attempts begin.
|
||||||
|
/// Saves partial content as a permanent message without adding a system message.
|
||||||
|
func finalizeOnDisconnect() {
|
||||||
|
finalizeStreamingMessage()
|
||||||
|
isAgentWorking = false
|
||||||
|
pendingPermission = nil
|
||||||
|
buildMessageGroups()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reconcile in-memory messages with DB state after a successful reconnection.
|
||||||
|
/// Merges DB-persisted messages with any local-only messages (e.g., user messages
|
||||||
|
/// that the ACP process may not have persisted before crashing).
|
||||||
|
func reconcileWithDB(sessionId: String) async {
|
||||||
|
let opened = await dataService.open()
|
||||||
|
guard opened else { return }
|
||||||
|
|
||||||
|
var dbMessages = await dataService.fetchMessages(sessionId: sessionId)
|
||||||
|
|
||||||
|
// If we have an origin session (CLI session continued via ACP),
|
||||||
|
// include those messages too
|
||||||
|
if let origin = originSessionId, origin != sessionId {
|
||||||
|
let originMessages = await dataService.fetchMessages(sessionId: origin)
|
||||||
|
if !originMessages.isEmpty {
|
||||||
|
dbMessages = originMessages + dbMessages
|
||||||
|
dbMessages.sort { ($0.timestamp ?? .distantPast) < ($1.timestamp ?? .distantPast) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let session = await dataService.fetchSession(id: sessionId)
|
||||||
|
await dataService.close()
|
||||||
|
|
||||||
|
// Find local-only user messages not yet in DB.
|
||||||
|
// Local messages have negative IDs; DB messages have positive IDs.
|
||||||
|
let dbUserContents = Set(dbMessages.filter(\.isUser).map(\.content))
|
||||||
|
let localOnlyMessages = messages.filter { msg in
|
||||||
|
msg.id < 0 && msg.isUser && !dbUserContents.contains(msg.content)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build reconciled list: DB messages + unmatched local user messages
|
||||||
|
var reconciled = dbMessages
|
||||||
|
for localMsg in localOnlyMessages {
|
||||||
|
if let ts = localMsg.timestamp,
|
||||||
|
let insertIdx = reconciled.firstIndex(where: { ($0.timestamp ?? .distantPast) > ts }) {
|
||||||
|
reconciled.insert(localMsg, at: insertIdx)
|
||||||
|
} else {
|
||||||
|
reconciled.append(localMsg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
messages = reconciled
|
||||||
|
currentSession = session
|
||||||
|
let minId = reconciled.map(\.id).min() ?? 0
|
||||||
|
nextLocalId = min(minId - 1, -1)
|
||||||
|
buildMessageGroups()
|
||||||
|
}
|
||||||
|
|
||||||
// MARK: - Load History from DB (for resumed sessions)
|
// MARK: - Load History from DB (for resumed sessions)
|
||||||
|
|
||||||
/// Load message history from the DB, optionally combining an origin session
|
/// Load message history from the DB, optionally combining an origin session
|
||||||
|
|||||||
@@ -40,6 +40,14 @@ struct ChatView: View {
|
|||||||
.foregroundStyle(.red)
|
.foregroundStyle(.red)
|
||||||
.lineLimit(1)
|
.lineLimit(1)
|
||||||
.help(error)
|
.help(error)
|
||||||
|
if let sid = viewModel.richChatViewModel.sessionId {
|
||||||
|
Button("Reconnect") {
|
||||||
|
viewModel.resumeSession(sid)
|
||||||
|
}
|
||||||
|
.font(.caption)
|
||||||
|
.buttonStyle(.bordered)
|
||||||
|
.controlSize(.small)
|
||||||
|
}
|
||||||
} else if !viewModel.acpStatus.isEmpty {
|
} else if !viewModel.acpStatus.isEmpty {
|
||||||
Circle()
|
Circle()
|
||||||
.fill(.yellow)
|
.fill(.yellow)
|
||||||
|
|||||||
Reference in New Issue
Block a user