mirror of
https://github.com/awizemann/scarf.git
synced 2026-05-08 02:14:37 +00:00
feat: Add ACP real-time chat with stable connection management
Implement a rich chat interface powered by the Hermes ACP (Agent Communication Protocol) over JSON-RPC stdio pipes, with comprehensive connection stability: - ACPClient actor: manages hermes acp subprocess lifecycle, JSON-RPC transport, event streaming via AsyncStream, and session management - ACPMessages: full event parsing for message chunks, thought chunks, tool calls, permission requests, and prompt completion - RichChatViewModel: streaming message display with live updates, tool result rendering, and message grouping - ChatViewModel: ACP session orchestration, auto-start on first message, and terminal mode fallback Connection stability fixes: - Non-blocking pipe writes via Task.detached to prevent actor deadlock - Read loop cleanup (handleReadLoopEnded) finishes event stream and fails pending requests on EOF instead of hanging silently - 30s request timeouts on control messages via watchdog Task pattern - Keepalive: writes \n to stdin every 30s to detect dead processes via EPIPE before the next user action - Health monitor: polls process.isRunning every 5s as belt-and-suspenders - Auto-reconnect: retries up to 3 times with exponential backoff (1s/2s/4s), restores session, only shows error after all retries fail - connectionLost event displays system message in chat on failure - Proper stderr pipe management: stored task reference, closed in stop() - Idempotent cleanup across handleReadLoopEnded, handleTermination, and handleConnectionDied via actor serialization and nil guards Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,246 @@
|
||||
import Foundation
|
||||
|
||||
// MARK: - JSON-RPC Transport
|
||||
|
||||
struct ACPRequest: Encodable {
|
||||
let jsonrpc = "2.0"
|
||||
let id: Int
|
||||
let method: String
|
||||
let params: [String: AnyCodable]
|
||||
}
|
||||
|
||||
struct ACPRawMessage: Decodable {
|
||||
let jsonrpc: String?
|
||||
let id: Int?
|
||||
let method: String?
|
||||
let result: AnyCodable?
|
||||
let error: ACPError?
|
||||
let params: AnyCodable?
|
||||
|
||||
var isResponse: Bool { id != nil && method == nil }
|
||||
var isNotification: Bool { method != nil && id == nil }
|
||||
var isRequest: Bool { method != nil && id != nil }
|
||||
}
|
||||
|
||||
struct ACPError: Decodable, Sendable {
|
||||
let code: Int
|
||||
let message: String
|
||||
}
|
||||
|
||||
// MARK: - AnyCodable (for dynamic JSON)
|
||||
|
||||
struct AnyCodable: Codable, Sendable {
|
||||
let value: Any
|
||||
|
||||
init(_ value: Any) { self.value = value }
|
||||
|
||||
init(from decoder: Decoder) throws {
|
||||
let container = try decoder.singleValueContainer()
|
||||
if container.decodeNil() {
|
||||
value = NSNull()
|
||||
} else if let bool = try? container.decode(Bool.self) {
|
||||
value = bool
|
||||
} else if let int = try? container.decode(Int.self) {
|
||||
value = int
|
||||
} else if let double = try? container.decode(Double.self) {
|
||||
value = double
|
||||
} else if let string = try? container.decode(String.self) {
|
||||
value = string
|
||||
} else if let array = try? container.decode([AnyCodable].self) {
|
||||
value = array.map(\.value)
|
||||
} else if let dict = try? container.decode([String: AnyCodable].self) {
|
||||
value = dict.mapValues(\.value)
|
||||
} else {
|
||||
value = NSNull()
|
||||
}
|
||||
}
|
||||
|
||||
func encode(to encoder: Encoder) throws {
|
||||
var container = encoder.singleValueContainer()
|
||||
switch value {
|
||||
case is NSNull:
|
||||
try container.encodeNil()
|
||||
case let bool as Bool:
|
||||
try container.encode(bool)
|
||||
case let int as Int:
|
||||
try container.encode(int)
|
||||
case let double as Double:
|
||||
try container.encode(double)
|
||||
case let string as String:
|
||||
try container.encode(string)
|
||||
case let array as [Any]:
|
||||
try container.encode(array.map { AnyCodable($0) })
|
||||
case let dict as [String: Any]:
|
||||
try container.encode(dict.mapValues { AnyCodable($0) })
|
||||
default:
|
||||
try container.encodeNil()
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Accessors
|
||||
|
||||
var stringValue: String? { value as? String }
|
||||
var intValue: Int? { value as? Int }
|
||||
var dictValue: [String: Any]? { value as? [String: Any] }
|
||||
var arrayValue: [Any]? { value as? [Any] }
|
||||
}
|
||||
|
||||
// MARK: - ACP Events (parsed from session/update notifications)
|
||||
|
||||
enum ACPEvent: Sendable {
|
||||
case messageChunk(sessionId: String, text: String)
|
||||
case thoughtChunk(sessionId: String, text: String)
|
||||
case toolCallStart(sessionId: String, call: ACPToolCallEvent)
|
||||
case toolCallUpdate(sessionId: String, update: ACPToolCallUpdateEvent)
|
||||
case permissionRequest(sessionId: String, requestId: Int, request: ACPPermissionRequestEvent)
|
||||
case promptComplete(sessionId: String, response: ACPPromptResult)
|
||||
case availableCommands(sessionId: String, commands: [[String: Any]])
|
||||
case connectionLost(reason: String)
|
||||
case unknown(sessionId: String, type: String)
|
||||
}
|
||||
|
||||
struct ACPToolCallEvent: Sendable {
|
||||
let toolCallId: String
|
||||
let title: String
|
||||
let kind: String
|
||||
let status: String
|
||||
let content: String
|
||||
let rawInput: [String: Any]?
|
||||
|
||||
var functionName: String {
|
||||
// title format is "functionName: summary" or just "functionName"
|
||||
let parts = title.split(separator: ":", maxSplits: 1)
|
||||
return String(parts.first ?? Substring(title)).trimmingCharacters(in: .whitespaces)
|
||||
}
|
||||
|
||||
var argumentsSummary: String {
|
||||
let parts = title.split(separator: ":", maxSplits: 1)
|
||||
if parts.count > 1 {
|
||||
return String(parts[1]).trimmingCharacters(in: .whitespaces)
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
var argumentsJSON: String {
|
||||
guard let input = rawInput,
|
||||
let data = try? JSONSerialization.data(withJSONObject: input),
|
||||
let str = String(data: data, encoding: .utf8) else { return "{}" }
|
||||
return str
|
||||
}
|
||||
}
|
||||
|
||||
struct ACPToolCallUpdateEvent: Sendable {
|
||||
let toolCallId: String
|
||||
let kind: String
|
||||
let status: String
|
||||
let content: String
|
||||
let rawOutput: String?
|
||||
}
|
||||
|
||||
struct ACPPermissionRequestEvent: Sendable {
|
||||
let toolCallTitle: String
|
||||
let toolCallKind: String
|
||||
let options: [(optionId: String, name: String)]
|
||||
}
|
||||
|
||||
struct ACPPromptResult: Sendable {
|
||||
let stopReason: String
|
||||
let inputTokens: Int
|
||||
let outputTokens: Int
|
||||
let thoughtTokens: Int
|
||||
let cachedReadTokens: Int
|
||||
}
|
||||
|
||||
// MARK: - Event Parsing
|
||||
|
||||
enum ACPEventParser {
|
||||
static func parse(notification: ACPRawMessage) -> ACPEvent? {
|
||||
guard notification.method == "session/update",
|
||||
let params = notification.params?.dictValue,
|
||||
let sessionId = params["sessionId"] as? String,
|
||||
let update = params["update"] as? [String: Any],
|
||||
let updateType = update["sessionUpdate"] as? String else {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch updateType {
|
||||
case "agent_message_chunk":
|
||||
let text = extractContentText(from: update)
|
||||
return .messageChunk(sessionId: sessionId, text: text)
|
||||
|
||||
case "agent_thought_chunk":
|
||||
let text = extractContentText(from: update)
|
||||
return .thoughtChunk(sessionId: sessionId, text: text)
|
||||
|
||||
case "tool_call":
|
||||
let event = ACPToolCallEvent(
|
||||
toolCallId: update["toolCallId"] as? String ?? "",
|
||||
title: update["title"] as? String ?? "",
|
||||
kind: update["kind"] as? String ?? "other",
|
||||
status: update["status"] as? String ?? "pending",
|
||||
content: extractContentArrayText(from: update),
|
||||
rawInput: update["rawInput"] as? [String: Any]
|
||||
)
|
||||
return .toolCallStart(sessionId: sessionId, call: event)
|
||||
|
||||
case "tool_call_update":
|
||||
let event = ACPToolCallUpdateEvent(
|
||||
toolCallId: update["toolCallId"] as? String ?? "",
|
||||
kind: update["kind"] as? String ?? "other",
|
||||
status: update["status"] as? String ?? "completed",
|
||||
content: extractContentArrayText(from: update),
|
||||
rawOutput: update["rawOutput"] as? String
|
||||
)
|
||||
return .toolCallUpdate(sessionId: sessionId, update: event)
|
||||
|
||||
case "available_commands_update":
|
||||
let commands = update["availableCommands"] as? [[String: Any]] ?? []
|
||||
return .availableCommands(sessionId: sessionId, commands: commands)
|
||||
|
||||
default:
|
||||
return .unknown(sessionId: sessionId, type: updateType)
|
||||
}
|
||||
}
|
||||
|
||||
static func parsePermissionRequest(_ message: ACPRawMessage) -> ACPEvent? {
|
||||
guard message.method == "session/request_permission",
|
||||
let params = message.params?.dictValue,
|
||||
let sessionId = params["sessionId"] as? String,
|
||||
let requestId = message.id else { return nil }
|
||||
|
||||
let toolCall = params["toolCall"] as? [String: Any] ?? [:]
|
||||
let optionsRaw = params["options"] as? [[String: Any]] ?? []
|
||||
let options = optionsRaw.compactMap { opt -> (optionId: String, name: String)? in
|
||||
guard let id = opt["optionId"] as? String,
|
||||
let name = opt["name"] as? String else { return nil }
|
||||
return (optionId: id, name: name)
|
||||
}
|
||||
|
||||
let event = ACPPermissionRequestEvent(
|
||||
toolCallTitle: toolCall["title"] as? String ?? "",
|
||||
toolCallKind: toolCall["kind"] as? String ?? "other",
|
||||
options: options
|
||||
)
|
||||
return .permissionRequest(sessionId: sessionId, requestId: requestId, request: event)
|
||||
}
|
||||
|
||||
// MARK: - Content Extraction
|
||||
|
||||
private static func extractContentText(from update: [String: Any]) -> String {
|
||||
if let content = update["content"] as? [String: Any],
|
||||
let text = content["text"] as? String {
|
||||
return text
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
private static func extractContentArrayText(from update: [String: Any]) -> String {
|
||||
if let contentArray = update["content"] as? [[String: Any]] {
|
||||
return contentArray.compactMap { item -> String? in
|
||||
guard let inner = item["content"] as? [String: Any] else { return nil }
|
||||
return inner["text"] as? String
|
||||
}.joined(separator: "\n")
|
||||
}
|
||||
return ""
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,451 @@
|
||||
import Foundation
|
||||
import os
|
||||
|
||||
/// Manages a `hermes acp` subprocess and communicates via JSON-RPC over stdio.
|
||||
/// Provides an async event stream for real-time session updates.
|
||||
actor ACPClient {
|
||||
private let logger = Logger(subsystem: "com.scarf", category: "ACPClient")
|
||||
|
||||
private var process: Process?
|
||||
private var stdinPipe: Pipe?
|
||||
private var stdoutPipe: Pipe?
|
||||
private var stderrPipe: Pipe?
|
||||
|
||||
private var nextRequestId = 1
|
||||
private var pendingRequests: [Int: CheckedContinuation<AnyCodable?, Error>] = [:]
|
||||
private var readTask: Task<Void, Never>?
|
||||
private var stderrTask: Task<Void, Never>?
|
||||
private var keepaliveTask: Task<Void, Never>?
|
||||
private var eventContinuation: AsyncStream<ACPEvent>.Continuation?
|
||||
private var _eventStream: AsyncStream<ACPEvent>?
|
||||
|
||||
private(set) var isConnected = false
|
||||
private(set) var currentSessionId: String?
|
||||
private(set) var statusMessage = ""
|
||||
|
||||
/// Check if the underlying process is still alive and connected.
|
||||
var isHealthy: Bool {
|
||||
guard isConnected, let process else { return false }
|
||||
return process.isRunning
|
||||
}
|
||||
|
||||
// MARK: - Event Stream
|
||||
|
||||
/// Access the event stream. Must call `start()` first.
|
||||
var events: AsyncStream<ACPEvent> {
|
||||
guard let stream = _eventStream else {
|
||||
// Return an empty stream if not started
|
||||
return AsyncStream { $0.finish() }
|
||||
}
|
||||
return stream
|
||||
}
|
||||
|
||||
// MARK: - Lifecycle
|
||||
|
||||
func start() async throws {
|
||||
guard process == nil else { return }
|
||||
|
||||
// Create the event stream BEFORE anything else so no events are lost
|
||||
let (stream, continuation) = AsyncStream.makeStream(of: ACPEvent.self)
|
||||
self._eventStream = stream
|
||||
self.eventContinuation = continuation
|
||||
|
||||
let proc = Process()
|
||||
proc.executableURL = URL(fileURLWithPath: HermesPaths.hermesBinary)
|
||||
proc.arguments = ["acp"]
|
||||
|
||||
let stdin = Pipe()
|
||||
let stdout = Pipe()
|
||||
let stderr = Pipe()
|
||||
|
||||
proc.standardInput = stdin
|
||||
proc.standardOutput = stdout
|
||||
proc.standardError = stderr
|
||||
|
||||
var env = ProcessInfo.processInfo.environment
|
||||
env["TERM"] = "xterm-256color"
|
||||
proc.environment = env
|
||||
|
||||
proc.terminationHandler = { [weak self] proc in
|
||||
Task { await self?.handleTermination(exitCode: proc.terminationStatus) }
|
||||
}
|
||||
|
||||
statusMessage = "Starting hermes acp..."
|
||||
|
||||
do {
|
||||
try proc.run()
|
||||
} catch {
|
||||
statusMessage = "Failed to start: \(error.localizedDescription)"
|
||||
logger.error("Failed to start hermes acp: \(error.localizedDescription)")
|
||||
continuation.finish()
|
||||
throw error
|
||||
}
|
||||
|
||||
self.process = proc
|
||||
self.stdinPipe = stdin
|
||||
self.stdoutPipe = stdout
|
||||
self.stderrPipe = stderr
|
||||
self.isConnected = true
|
||||
|
||||
// Start reading stdout BEFORE sending initialize (so we catch the response)
|
||||
startReadLoop(stdout: stdout, stderr: stderr)
|
||||
logger.info("hermes acp process started (pid: \(proc.processIdentifier))")
|
||||
statusMessage = "Initializing..."
|
||||
|
||||
// Initialize the ACP connection
|
||||
let initParams: [String: AnyCodable] = [
|
||||
"protocolVersion": AnyCodable(1),
|
||||
"clientCapabilities": AnyCodable([String: Any]()),
|
||||
"clientInfo": AnyCodable([
|
||||
"name": "Scarf",
|
||||
"version": "1.0"
|
||||
] as [String: Any])
|
||||
]
|
||||
_ = try await sendRequest(method: "initialize", params: initParams)
|
||||
statusMessage = "Connected"
|
||||
logger.info("ACP connection initialized")
|
||||
startKeepalive()
|
||||
}
|
||||
|
||||
func stop() async {
|
||||
readTask?.cancel()
|
||||
readTask = nil
|
||||
stderrTask?.cancel()
|
||||
stderrTask = nil
|
||||
keepaliveTask?.cancel()
|
||||
keepaliveTask = nil
|
||||
eventContinuation?.finish()
|
||||
eventContinuation = nil
|
||||
_eventStream = nil
|
||||
|
||||
for (_, continuation) in pendingRequests {
|
||||
continuation.resume(throwing: CancellationError())
|
||||
}
|
||||
pendingRequests.removeAll()
|
||||
|
||||
if let process, process.isRunning {
|
||||
process.terminate()
|
||||
}
|
||||
stdinPipe?.fileHandleForWriting.closeFile()
|
||||
stdinPipe?.fileHandleForReading.closeFile()
|
||||
stdoutPipe?.fileHandleForReading.closeFile()
|
||||
stderrPipe?.fileHandleForReading.closeFile()
|
||||
|
||||
process = nil
|
||||
stdinPipe = nil
|
||||
stdoutPipe = nil
|
||||
stderrPipe = nil
|
||||
isConnected = false
|
||||
currentSessionId = nil
|
||||
statusMessage = "Disconnected"
|
||||
logger.info("ACP client stopped")
|
||||
}
|
||||
|
||||
// MARK: - Keepalive
|
||||
|
||||
private func startKeepalive() {
|
||||
keepaliveTask = Task { [weak self] in
|
||||
while !Task.isCancelled {
|
||||
try? await Task.sleep(nanoseconds: 30_000_000_000) // 30 seconds
|
||||
guard !Task.isCancelled else { break }
|
||||
await self?.sendKeepalive()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func sendKeepalive() {
|
||||
guard let pipe = stdinPipe else { return }
|
||||
let handle = pipe.fileHandleForWriting
|
||||
Task.detached {
|
||||
// Empty newline — JSON-RPC parser skips it, but triggers EPIPE if process is dead
|
||||
handle.write(Data("\n".utf8))
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Session Management
|
||||
|
||||
func newSession(cwd: String) async throws -> String {
|
||||
statusMessage = "Creating session..."
|
||||
let params: [String: AnyCodable] = [
|
||||
"cwd": AnyCodable(cwd),
|
||||
"mcpServers": AnyCodable([Any]())
|
||||
]
|
||||
let result = try await sendRequest(method: "session/new", params: params)
|
||||
guard let dict = result?.dictValue,
|
||||
let sessionId = dict["sessionId"] as? String else {
|
||||
throw ACPClientError.invalidResponse("Missing sessionId in session/new response")
|
||||
}
|
||||
currentSessionId = sessionId
|
||||
statusMessage = "Session ready"
|
||||
logger.info("Created new ACP session: \(sessionId)")
|
||||
return sessionId
|
||||
}
|
||||
|
||||
func loadSession(cwd: String, sessionId: String) async throws -> String {
|
||||
statusMessage = "Loading session \(sessionId.prefix(12))..."
|
||||
let params: [String: AnyCodable] = [
|
||||
"cwd": AnyCodable(cwd),
|
||||
"sessionId": AnyCodable(sessionId),
|
||||
"mcpServers": AnyCodable([Any]())
|
||||
]
|
||||
let result = try await sendRequest(method: "session/load", params: params)
|
||||
// ACP returns {} on success (no sessionId echoed), or an error if not found.
|
||||
// If we got here without throwing, the session was loaded. Use the ID we sent.
|
||||
let loadedId = (result?.dictValue?["sessionId"] as? String) ?? sessionId
|
||||
currentSessionId = loadedId
|
||||
statusMessage = "Session loaded"
|
||||
logger.info("Loaded ACP session: \(loadedId)")
|
||||
return loadedId
|
||||
}
|
||||
|
||||
func resumeSession(cwd: String, sessionId: String) async throws -> String {
|
||||
statusMessage = "Resuming session..."
|
||||
let params: [String: AnyCodable] = [
|
||||
"cwd": AnyCodable(cwd),
|
||||
"sessionId": AnyCodable(sessionId),
|
||||
"mcpServers": AnyCodable([Any]())
|
||||
]
|
||||
let result = try await sendRequest(method: "session/resume", params: params)
|
||||
guard let dict = result?.dictValue,
|
||||
let resumedId = dict["sessionId"] as? String else {
|
||||
throw ACPClientError.invalidResponse("Missing sessionId in session/resume response")
|
||||
}
|
||||
currentSessionId = resumedId
|
||||
statusMessage = "Session resumed"
|
||||
logger.info("Resumed ACP session: \(resumedId)")
|
||||
return resumedId
|
||||
}
|
||||
|
||||
// MARK: - Messaging
|
||||
|
||||
func sendPrompt(sessionId: String, text: String) async throws -> ACPPromptResult {
|
||||
statusMessage = "Sending prompt..."
|
||||
let messageId = UUID().uuidString
|
||||
let params: [String: AnyCodable] = [
|
||||
"sessionId": AnyCodable(sessionId),
|
||||
"messageId": AnyCodable(messageId),
|
||||
"prompt": AnyCodable([
|
||||
["type": "text", "text": text] as [String: Any]
|
||||
] as [Any])
|
||||
]
|
||||
let result = try await sendRequest(method: "session/prompt", params: params)
|
||||
let dict = result?.dictValue ?? [:]
|
||||
let usage = dict["usage"] as? [String: Any] ?? [:]
|
||||
|
||||
statusMessage = "Ready"
|
||||
return ACPPromptResult(
|
||||
stopReason: dict["stopReason"] as? String ?? "end_turn",
|
||||
inputTokens: usage["inputTokens"] as? Int ?? 0,
|
||||
outputTokens: usage["outputTokens"] as? Int ?? 0,
|
||||
thoughtTokens: usage["thoughtTokens"] as? Int ?? 0,
|
||||
cachedReadTokens: usage["cachedReadTokens"] as? Int ?? 0
|
||||
)
|
||||
}
|
||||
|
||||
func cancel(sessionId: String) async throws {
|
||||
let params: [String: AnyCodable] = [
|
||||
"sessionId": AnyCodable(sessionId)
|
||||
]
|
||||
_ = try await sendRequest(method: "session/cancel", params: params)
|
||||
statusMessage = "Cancelled"
|
||||
}
|
||||
|
||||
func respondToPermission(requestId: Int, optionId: String) {
|
||||
let response: [String: Any] = [
|
||||
"jsonrpc": "2.0",
|
||||
"id": requestId,
|
||||
"result": [
|
||||
"outcome": [
|
||||
"kind": optionId == "deny" ? "rejected" : "allowed",
|
||||
"optionId": optionId
|
||||
] as [String: Any]
|
||||
] as [String: Any]
|
||||
]
|
||||
writeJSON(response)
|
||||
}
|
||||
|
||||
// MARK: - JSON-RPC Transport
|
||||
|
||||
private func sendRequest(method: String, params: [String: AnyCodable]) async throws -> AnyCodable? {
|
||||
let requestId = nextRequestId
|
||||
nextRequestId += 1
|
||||
|
||||
let request = ACPRequest(id: requestId, method: method, params: params)
|
||||
|
||||
guard let data = try? JSONEncoder().encode(request) else {
|
||||
throw ACPClientError.encodingFailed
|
||||
}
|
||||
|
||||
logger.debug("Sending: \(method) (id: \(requestId))")
|
||||
|
||||
// session/prompt streams events and can run for minutes — no hard timeout.
|
||||
// Control messages get a 30s watchdog.
|
||||
let timeoutTask: Task<Void, Error>? = if method != "session/prompt" {
|
||||
Task { [weak self] in
|
||||
try await Task.sleep(nanoseconds: 30 * 1_000_000_000)
|
||||
await self?.timeoutRequest(id: requestId, method: method)
|
||||
}
|
||||
} else {
|
||||
nil
|
||||
}
|
||||
|
||||
defer { timeoutTask?.cancel() }
|
||||
|
||||
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<AnyCodable?, Error>) in
|
||||
pendingRequests[requestId] = continuation
|
||||
|
||||
guard let pipe = stdinPipe else {
|
||||
pendingRequests.removeValue(forKey: requestId)
|
||||
continuation.resume(throwing: ACPClientError.notConnected)
|
||||
return
|
||||
}
|
||||
|
||||
var payload = data
|
||||
payload.append(contentsOf: "\n".utf8)
|
||||
// Write in a detached task to avoid blocking the actor's executor.
|
||||
// The continuation is already stored; the response arrives via the read loop.
|
||||
let handle = pipe.fileHandleForWriting
|
||||
Task.detached { handle.write(payload) }
|
||||
}
|
||||
}
|
||||
|
||||
private func timeoutRequest(id: Int, method: String) {
|
||||
guard let continuation = pendingRequests.removeValue(forKey: id) else { return }
|
||||
logger.error("Request timed out: \(method) (id: \(id))")
|
||||
statusMessage = "Request timed out"
|
||||
continuation.resume(throwing: ACPClientError.requestTimeout(method: method))
|
||||
}
|
||||
|
||||
private func writeJSON(_ dict: [String: Any]) {
|
||||
guard let pipe = stdinPipe,
|
||||
let data = try? JSONSerialization.data(withJSONObject: dict) else { return }
|
||||
var payload = data
|
||||
payload.append(contentsOf: "\n".utf8)
|
||||
let handle = pipe.fileHandleForWriting
|
||||
Task.detached { handle.write(payload) }
|
||||
}
|
||||
|
||||
// MARK: - Read Loop
|
||||
|
||||
private func startReadLoop(stdout: Pipe, stderr: Pipe) {
|
||||
// Read stdout for JSON-RPC messages
|
||||
readTask = Task.detached { [weak self] in
|
||||
let handle = stdout.fileHandleForReading
|
||||
var buffer = Data()
|
||||
|
||||
while !Task.isCancelled {
|
||||
let chunk = handle.availableData
|
||||
if chunk.isEmpty { break } // EOF
|
||||
buffer.append(chunk)
|
||||
|
||||
while let newlineIndex = buffer.firstIndex(of: UInt8(ascii: "\n")) {
|
||||
let lineData = Data(buffer[buffer.startIndex..<newlineIndex])
|
||||
buffer = Data(buffer[buffer.index(after: newlineIndex)...])
|
||||
|
||||
guard !lineData.isEmpty else { continue }
|
||||
|
||||
if let lineStr = String(data: lineData, encoding: .utf8) {
|
||||
await self?.logger.debug("ACP recv: \(lineStr.prefix(200))")
|
||||
}
|
||||
|
||||
do {
|
||||
let message = try JSONDecoder().decode(ACPRawMessage.self, from: lineData)
|
||||
await self?.handleMessage(message)
|
||||
} catch {
|
||||
await self?.logger.warning("Failed to decode ACP message: \(error.localizedDescription)")
|
||||
}
|
||||
}
|
||||
}
|
||||
await self?.handleReadLoopEnded()
|
||||
}
|
||||
|
||||
// Read stderr in background for diagnostic logging
|
||||
stderrTask = Task.detached { [weak self] in
|
||||
let handle = stderr.fileHandleForReading
|
||||
while !Task.isCancelled {
|
||||
let data = handle.availableData
|
||||
if data.isEmpty { break }
|
||||
if let text = String(data: data, encoding: .utf8)?.trimmingCharacters(in: .whitespacesAndNewlines),
|
||||
!text.isEmpty {
|
||||
await self?.logger.info("ACP stderr: \(text.prefix(500))")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func handleMessage(_ message: ACPRawMessage) {
|
||||
if message.isResponse {
|
||||
if let requestId = message.id,
|
||||
let continuation = pendingRequests.removeValue(forKey: requestId) {
|
||||
if let error = message.error {
|
||||
logger.error("ACP RPC error (id: \(requestId)): \(error.message)")
|
||||
statusMessage = "Error: \(error.message)"
|
||||
continuation.resume(throwing: ACPClientError.rpcError(code: error.code, message: error.message))
|
||||
} else {
|
||||
logger.debug("ACP response (id: \(requestId))")
|
||||
continuation.resume(returning: message.result)
|
||||
}
|
||||
} else {
|
||||
logger.warning("ACP response for unknown request id: \(message.id ?? -1)")
|
||||
}
|
||||
} else if message.isNotification {
|
||||
if let event = ACPEventParser.parse(notification: message) {
|
||||
logger.debug("ACP event: \(String(describing: event).prefix(100))")
|
||||
eventContinuation?.yield(event)
|
||||
}
|
||||
} else if message.isRequest {
|
||||
if message.method == "session/request_permission",
|
||||
let event = ACPEventParser.parsePermissionRequest(message) {
|
||||
statusMessage = "Permission required"
|
||||
eventContinuation?.yield(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func handleReadLoopEnded() {
|
||||
guard isConnected else { return } // idempotent with handleTermination
|
||||
logger.warning("ACP read loop ended unexpectedly — cleaning up")
|
||||
isConnected = false
|
||||
statusMessage = "Connection lost"
|
||||
for (_, continuation) in pendingRequests {
|
||||
continuation.resume(throwing: ACPClientError.processTerminated)
|
||||
}
|
||||
pendingRequests.removeAll()
|
||||
eventContinuation?.finish()
|
||||
eventContinuation = nil
|
||||
}
|
||||
|
||||
private func handleTermination(exitCode: Int32) {
|
||||
logger.info("hermes acp process terminated with code \(exitCode)")
|
||||
statusMessage = "Process exited (\(exitCode))"
|
||||
isConnected = false
|
||||
for (_, continuation) in pendingRequests {
|
||||
continuation.resume(throwing: ACPClientError.processTerminated)
|
||||
}
|
||||
pendingRequests.removeAll()
|
||||
eventContinuation?.finish()
|
||||
eventContinuation = nil
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Errors
|
||||
|
||||
enum ACPClientError: Error, LocalizedError {
|
||||
case notConnected
|
||||
case encodingFailed
|
||||
case invalidResponse(String)
|
||||
case rpcError(code: Int, message: String)
|
||||
case processTerminated
|
||||
case requestTimeout(method: String)
|
||||
|
||||
var errorDescription: String? {
|
||||
switch self {
|
||||
case .notConnected: return "ACP client is not connected"
|
||||
case .encodingFailed: return "Failed to encode JSON-RPC request"
|
||||
case .invalidResponse(let msg): return "Invalid ACP response: \(msg)"
|
||||
case .rpcError(let code, let msg): return "ACP error \(code): \(msg)"
|
||||
case .processTerminated: return "ACP process terminated unexpectedly"
|
||||
case .requestTimeout(let method): return "ACP request '\(method)' timed out"
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,7 @@ actor HermesDataService {
|
||||
private var hasV07Schema = false
|
||||
|
||||
func open() -> Bool {
|
||||
if db != nil { return true }
|
||||
let path = HermesPaths.stateDB
|
||||
guard FileManager.default.fileExists(atPath: path) else { return false }
|
||||
let flags = SQLITE_OPEN_READONLY | SQLITE_OPEN_NOMUTEX
|
||||
@@ -219,6 +220,29 @@ actor HermesDataService {
|
||||
|
||||
// MARK: - Single-Row Queries
|
||||
|
||||
struct MessageFingerprint: Equatable, Sendable {
|
||||
let count: Int
|
||||
let maxId: Int
|
||||
let maxTimestamp: Double
|
||||
|
||||
static let empty = MessageFingerprint(count: 0, maxId: 0, maxTimestamp: 0)
|
||||
}
|
||||
|
||||
func fetchMessageFingerprint(sessionId: String) -> MessageFingerprint {
|
||||
guard let db else { return .empty }
|
||||
let sql = "SELECT COUNT(*), COALESCE(MAX(id), 0), COALESCE(MAX(timestamp), 0) FROM messages WHERE session_id = ?"
|
||||
var stmt: OpaquePointer?
|
||||
guard sqlite3_prepare_v2(db, sql, -1, &stmt, nil) == SQLITE_OK else { return .empty }
|
||||
defer { sqlite3_finalize(stmt) }
|
||||
sqlite3_bind_text(stmt, 1, sessionId, -1, sqliteTransient)
|
||||
guard sqlite3_step(stmt) == SQLITE_ROW else { return .empty }
|
||||
return MessageFingerprint(
|
||||
count: Int(sqlite3_column_int(stmt, 0)),
|
||||
maxId: Int(sqlite3_column_int(stmt, 1)),
|
||||
maxTimestamp: sqlite3_column_double(stmt, 2)
|
||||
)
|
||||
}
|
||||
|
||||
func fetchMessageCount(sessionId: String) -> Int {
|
||||
guard let db else { return 0 }
|
||||
let sql = "SELECT COUNT(*) FROM messages WHERE session_id = ?"
|
||||
@@ -241,6 +265,34 @@ actor HermesDataService {
|
||||
return sessionFromRow(stmt!)
|
||||
}
|
||||
|
||||
func fetchMostRecentlyActiveSessionId() -> String? {
|
||||
guard let db else { return nil }
|
||||
let sql = "SELECT session_id FROM messages ORDER BY timestamp DESC LIMIT 1"
|
||||
var stmt: OpaquePointer?
|
||||
guard sqlite3_prepare_v2(db, sql, -1, &stmt, nil) == SQLITE_OK else { return nil }
|
||||
defer { sqlite3_finalize(stmt) }
|
||||
guard sqlite3_step(stmt) == SQLITE_ROW else { return nil }
|
||||
return columnText(stmt!, 0)
|
||||
}
|
||||
|
||||
func fetchMostRecentlyStartedSessionId(after: Date? = nil) -> String? {
|
||||
guard let db else { return nil }
|
||||
let sql: String
|
||||
if after != nil {
|
||||
sql = "SELECT id FROM sessions WHERE parent_session_id IS NULL AND started_at > ? ORDER BY started_at DESC LIMIT 1"
|
||||
} else {
|
||||
sql = "SELECT id FROM sessions WHERE parent_session_id IS NULL ORDER BY started_at DESC LIMIT 1"
|
||||
}
|
||||
var stmt: OpaquePointer?
|
||||
guard sqlite3_prepare_v2(db, sql, -1, &stmt, nil) == SQLITE_OK else { return nil }
|
||||
defer { sqlite3_finalize(stmt) }
|
||||
if let after {
|
||||
sqlite3_bind_double(stmt, 1, after.timeIntervalSince1970)
|
||||
}
|
||||
guard sqlite3_step(stmt) == SQLITE_ROW else { return nil }
|
||||
return columnText(stmt!, 0)
|
||||
}
|
||||
|
||||
// MARK: - Stats
|
||||
|
||||
struct SessionStats: Sendable {
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import Foundation
|
||||
import AppKit
|
||||
import SwiftTerm
|
||||
import os
|
||||
|
||||
@Observable
|
||||
final class ChatViewModel {
|
||||
private let logger = Logger(subsystem: "com.scarf", category: "ChatViewModel")
|
||||
private let dataService = HermesDataService()
|
||||
private let fileService = HermesFileService()
|
||||
|
||||
@@ -15,24 +17,38 @@ final class ChatViewModel {
|
||||
var ttsEnabled = false
|
||||
var isRecording = false
|
||||
var displayMode: ChatDisplayMode = .richChat
|
||||
var activeSessionId: String?
|
||||
let richChatViewModel = RichChatViewModel()
|
||||
private var coordinator: Coordinator?
|
||||
|
||||
// ACP state
|
||||
private var acpClient: ACPClient?
|
||||
private var acpEventTask: Task<Void, Never>?
|
||||
private var acpPromptTask: Task<Void, Never>?
|
||||
private var healthMonitorTask: Task<Void, Never>?
|
||||
private var reconnectTask: Task<Void, Never>?
|
||||
var isACPConnected: Bool { acpClient != nil && hasActiveProcess }
|
||||
var acpStatus: String = ""
|
||||
var acpError: String?
|
||||
|
||||
private static let maxReconnectAttempts = 3
|
||||
private static let reconnectBaseDelay: UInt64 = 1_000_000_000 // 1 second
|
||||
|
||||
var hermesBinaryExists: Bool {
|
||||
FileManager.default.fileExists(atPath: HermesPaths.hermesBinary)
|
||||
}
|
||||
|
||||
// MARK: - Session Lifecycle
|
||||
|
||||
func startNewSession() {
|
||||
voiceEnabled = false
|
||||
ttsEnabled = false
|
||||
isRecording = false
|
||||
richChatViewModel.stopPolling()
|
||||
activeSessionId = nil
|
||||
launchTerminal(arguments: ["chat"])
|
||||
Task {
|
||||
try? await Task.sleep(for: .seconds(1.5))
|
||||
await discoverActiveSessionId()
|
||||
richChatViewModel.reset()
|
||||
|
||||
if displayMode == .richChat {
|
||||
startACPSession(resume: nil)
|
||||
} else {
|
||||
launchTerminal(arguments: ["chat"])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,35 +56,364 @@ final class ChatViewModel {
|
||||
voiceEnabled = false
|
||||
ttsEnabled = false
|
||||
isRecording = false
|
||||
richChatViewModel.stopPolling()
|
||||
activeSessionId = sessionId
|
||||
launchTerminal(arguments: ["chat", "--resume", sessionId])
|
||||
richChatViewModel.startPolling(sessionId: sessionId)
|
||||
richChatViewModel.reset()
|
||||
|
||||
if displayMode == .richChat {
|
||||
startACPSession(resume: sessionId)
|
||||
} else {
|
||||
richChatViewModel.setSessionId(sessionId)
|
||||
launchTerminal(arguments: ["chat", "--resume", sessionId])
|
||||
}
|
||||
}
|
||||
|
||||
func continueLastSession() {
|
||||
voiceEnabled = false
|
||||
ttsEnabled = false
|
||||
isRecording = false
|
||||
richChatViewModel.stopPolling()
|
||||
activeSessionId = nil
|
||||
launchTerminal(arguments: ["chat", "--continue"])
|
||||
if let mostRecent = recentSessions.first {
|
||||
activeSessionId = mostRecent.id
|
||||
richChatViewModel.startPolling(sessionId: mostRecent.id)
|
||||
richChatViewModel.reset()
|
||||
|
||||
if displayMode == .richChat {
|
||||
// Find most recent session and resume via ACP
|
||||
Task { @MainActor in
|
||||
let opened = await dataService.open()
|
||||
guard opened else { return }
|
||||
let sessionId = await dataService.fetchMostRecentlyActiveSessionId()
|
||||
await dataService.close()
|
||||
if let sessionId {
|
||||
startACPSession(resume: sessionId)
|
||||
} else {
|
||||
startACPSession(resume: nil)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Task {
|
||||
try? await Task.sleep(for: .seconds(1.5))
|
||||
await discoverActiveSessionId()
|
||||
launchTerminal(arguments: ["chat", "--continue"])
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Send Message
|
||||
|
||||
func sendText(_ text: String) {
|
||||
if displayMode == .richChat {
|
||||
if let client = acpClient {
|
||||
sendViaACP(client: client, text: text)
|
||||
} else {
|
||||
// Auto-start ACP and send the queued message
|
||||
autoStartACPAndSend(text: text)
|
||||
}
|
||||
} else if let tv = terminalView {
|
||||
sendToTerminal(tv, text: text + "\r")
|
||||
}
|
||||
}
|
||||
|
||||
/// Start ACP for the current or most recent session, then send the queued prompt.
|
||||
private func autoStartACPAndSend(text: String) {
|
||||
// Show the user message immediately
|
||||
richChatViewModel.addUserMessage(text: text)
|
||||
|
||||
Task { @MainActor in
|
||||
// Find a session to resume: prefer current sessionId, then most recent
|
||||
var sessionToResume = richChatViewModel.sessionId
|
||||
if sessionToResume == nil {
|
||||
let opened = await dataService.open()
|
||||
if opened {
|
||||
sessionToResume = await dataService.fetchMostRecentlyActiveSessionId()
|
||||
await dataService.close()
|
||||
}
|
||||
}
|
||||
|
||||
let client = ACPClient()
|
||||
self.acpClient = client
|
||||
|
||||
do {
|
||||
try await client.start()
|
||||
acpStatus = await client.statusMessage
|
||||
startACPEventLoop(client: client)
|
||||
startHealthMonitor(client: client)
|
||||
|
||||
let cwd = NSHomeDirectory()
|
||||
|
||||
hasActiveProcess = true
|
||||
|
||||
let resolvedSessionId: String
|
||||
if let existing = sessionToResume {
|
||||
acpStatus = "Loading session..."
|
||||
do {
|
||||
resolvedSessionId = try await client.loadSession(cwd: cwd, sessionId: existing)
|
||||
} catch {
|
||||
logger.info("Session \(existing) not found in ACP, creating new session")
|
||||
acpStatus = "Creating new session..."
|
||||
resolvedSessionId = try await client.newSession(cwd: cwd)
|
||||
}
|
||||
} else {
|
||||
acpStatus = "Creating session..."
|
||||
resolvedSessionId = try await client.newSession(cwd: cwd)
|
||||
}
|
||||
|
||||
richChatViewModel.setSessionId(resolvedSessionId)
|
||||
acpStatus = "Connected (\(resolvedSessionId.prefix(12)))"
|
||||
|
||||
// Now send the queued prompt
|
||||
sendViaACP(client: client, text: text)
|
||||
} catch {
|
||||
let msg = error.localizedDescription
|
||||
logger.error("Auto-start ACP failed: \(msg)")
|
||||
acpStatus = "Failed"
|
||||
acpError = msg
|
||||
hasActiveProcess = false
|
||||
acpClient = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func sendText(_ text: String) {
|
||||
guard let tv = terminalView else { return }
|
||||
sendToTerminal(tv, text: text + "\r")
|
||||
private func sendViaACP(client: ACPClient, text: String) {
|
||||
guard let sessionId = richChatViewModel.sessionId else {
|
||||
acpError = "No session ID — cannot send"
|
||||
return
|
||||
}
|
||||
|
||||
// Don't duplicate user message if autoStartACPAndSend already added it
|
||||
if richChatViewModel.messages.last?.isUser != true
|
||||
|| richChatViewModel.messages.last?.content != text {
|
||||
richChatViewModel.addUserMessage(text: text)
|
||||
}
|
||||
|
||||
acpStatus = "Agent working..."
|
||||
acpPromptTask = Task { @MainActor in
|
||||
do {
|
||||
let result = try await client.sendPrompt(sessionId: sessionId, text: text)
|
||||
acpStatus = "Ready"
|
||||
richChatViewModel.handleACPEvent(
|
||||
.promptComplete(sessionId: sessionId, response: result)
|
||||
)
|
||||
} catch is CancellationError {
|
||||
acpStatus = "Cancelled"
|
||||
} catch {
|
||||
let msg = error.localizedDescription
|
||||
logger.error("ACP prompt failed: \(msg)")
|
||||
acpStatus = "Error"
|
||||
acpError = msg
|
||||
richChatViewModel.handleACPEvent(
|
||||
.promptComplete(sessionId: sessionId, response: ACPPromptResult(
|
||||
stopReason: "error",
|
||||
inputTokens: 0, outputTokens: 0,
|
||||
thoughtTokens: 0, cachedReadTokens: 0
|
||||
))
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - ACP Session Management
|
||||
|
||||
private func startACPSession(resume sessionId: String?) {
|
||||
stopACP()
|
||||
acpError = nil
|
||||
acpStatus = "Starting..."
|
||||
|
||||
let client = ACPClient()
|
||||
self.acpClient = client
|
||||
|
||||
Task { @MainActor in
|
||||
do {
|
||||
// Start ACP process and event loop FIRST
|
||||
try await client.start()
|
||||
acpStatus = await client.statusMessage
|
||||
startACPEventLoop(client: client)
|
||||
startHealthMonitor(client: client)
|
||||
|
||||
let cwd = NSHomeDirectory()
|
||||
|
||||
// Mark active BEFORE setting session ID so .task(id:) sees isACPMode=true
|
||||
// and doesn't wipe messages with a DB refresh
|
||||
hasActiveProcess = true
|
||||
|
||||
let resolvedSessionId: String
|
||||
if let sessionId {
|
||||
acpStatus = "Loading session..."
|
||||
do {
|
||||
resolvedSessionId = try await client.loadSession(cwd: cwd, sessionId: sessionId)
|
||||
} catch {
|
||||
logger.info("Session \(sessionId) not found in ACP, creating new session with history")
|
||||
acpStatus = "Creating new session..."
|
||||
resolvedSessionId = try await client.newSession(cwd: cwd)
|
||||
}
|
||||
// Load messages from both origin CLI session and ACP session
|
||||
await richChatViewModel.loadSessionHistory(
|
||||
sessionId: sessionId,
|
||||
acpSessionId: resolvedSessionId
|
||||
)
|
||||
} else {
|
||||
acpStatus = "Creating session..."
|
||||
resolvedSessionId = try await client.newSession(cwd: cwd)
|
||||
}
|
||||
|
||||
richChatViewModel.setSessionId(resolvedSessionId)
|
||||
acpStatus = "Connected (\(resolvedSessionId.prefix(12)))"
|
||||
|
||||
// Refresh session list so the new ACP session appears in the Resume menu
|
||||
await loadRecentSessions()
|
||||
|
||||
logger.info("ACP session ready: \(resolvedSessionId)")
|
||||
} catch {
|
||||
let msg = error.localizedDescription
|
||||
logger.error("Failed to start ACP session: \(msg)")
|
||||
acpStatus = "Failed"
|
||||
acpError = msg
|
||||
hasActiveProcess = false
|
||||
acpClient = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func startACPEventLoop(client: ACPClient) {
|
||||
acpEventTask = Task { @MainActor [weak self] in
|
||||
let eventStream = await client.events
|
||||
for await event in eventStream {
|
||||
guard !Task.isCancelled else { break }
|
||||
self?.richChatViewModel.handleACPEvent(event)
|
||||
self?.acpStatus = await client.statusMessage
|
||||
}
|
||||
// Stream ended — if we weren't cancelled, the connection died
|
||||
if !Task.isCancelled {
|
||||
self?.handleConnectionDied()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func startHealthMonitor(client: ACPClient) {
|
||||
healthMonitorTask = Task { @MainActor [weak self] in
|
||||
while !Task.isCancelled {
|
||||
try? await Task.sleep(nanoseconds: 5_000_000_000)
|
||||
guard !Task.isCancelled else { break }
|
||||
let healthy = await client.isHealthy
|
||||
if !healthy {
|
||||
self?.handleConnectionDied()
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func handleConnectionDied() {
|
||||
guard acpClient != nil else { return } // already handled
|
||||
logger.warning("ACP connection died")
|
||||
|
||||
// Save session ID for reconnection before cleaning up
|
||||
let savedSessionId = richChatViewModel.sessionId
|
||||
|
||||
// Clean up the dead client
|
||||
acpPromptTask?.cancel()
|
||||
acpPromptTask = nil
|
||||
acpEventTask?.cancel()
|
||||
acpEventTask = nil
|
||||
healthMonitorTask?.cancel()
|
||||
healthMonitorTask = nil
|
||||
if let client = acpClient {
|
||||
Task { await client.stop() }
|
||||
}
|
||||
acpClient = nil
|
||||
hasActiveProcess = false
|
||||
|
||||
// Attempt auto-reconnect if we have a session to restore
|
||||
guard let savedSessionId else {
|
||||
showConnectionFailure()
|
||||
return
|
||||
}
|
||||
attemptReconnect(sessionId: savedSessionId)
|
||||
}
|
||||
|
||||
private func attemptReconnect(sessionId: String) {
|
||||
reconnectTask?.cancel()
|
||||
acpError = nil
|
||||
|
||||
reconnectTask = Task { @MainActor [weak self] in
|
||||
guard let self else { return }
|
||||
|
||||
for attempt in 1...Self.maxReconnectAttempts {
|
||||
guard !Task.isCancelled else { return }
|
||||
|
||||
acpStatus = "Reconnecting (\(attempt)/\(Self.maxReconnectAttempts))..."
|
||||
logger.info("Reconnect attempt \(attempt)/\(Self.maxReconnectAttempts) for session \(sessionId)")
|
||||
|
||||
// Backoff delay (skip on first attempt for fast recovery)
|
||||
if attempt > 1 {
|
||||
let delay = Self.reconnectBaseDelay * UInt64(1 << (attempt - 1))
|
||||
try? await Task.sleep(nanoseconds: delay)
|
||||
guard !Task.isCancelled else { return }
|
||||
}
|
||||
|
||||
let client = ACPClient()
|
||||
do {
|
||||
try await client.start()
|
||||
|
||||
let cwd = NSHomeDirectory()
|
||||
let resolvedSessionId: String
|
||||
do {
|
||||
resolvedSessionId = try await client.loadSession(cwd: cwd, sessionId: sessionId)
|
||||
} catch {
|
||||
logger.info("Session \(sessionId) not loadable, creating new: \(error.localizedDescription)")
|
||||
resolvedSessionId = try await client.newSession(cwd: cwd)
|
||||
}
|
||||
|
||||
// Success — wire up the new client
|
||||
self.acpClient = client
|
||||
self.hasActiveProcess = true
|
||||
richChatViewModel.setSessionId(resolvedSessionId)
|
||||
acpStatus = "Reconnected (\(resolvedSessionId.prefix(12)))"
|
||||
acpError = nil
|
||||
|
||||
startACPEventLoop(client: client)
|
||||
startHealthMonitor(client: client)
|
||||
|
||||
logger.info("Reconnected successfully on attempt \(attempt)")
|
||||
return
|
||||
} catch {
|
||||
logger.warning("Reconnect attempt \(attempt) failed: \(error.localizedDescription)")
|
||||
await client.stop()
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// All attempts exhausted
|
||||
guard !Task.isCancelled else { return }
|
||||
showConnectionFailure()
|
||||
}
|
||||
}
|
||||
|
||||
private func showConnectionFailure() {
|
||||
richChatViewModel.handleACPEvent(.connectionLost(reason: "The ACP process terminated unexpectedly"))
|
||||
acpStatus = "Connection lost"
|
||||
acpError = "Connection lost. Use the Session menu to reconnect."
|
||||
}
|
||||
|
||||
func stopACP() {
|
||||
reconnectTask?.cancel()
|
||||
reconnectTask = nil
|
||||
acpPromptTask?.cancel()
|
||||
acpPromptTask = nil
|
||||
acpEventTask?.cancel()
|
||||
acpEventTask = nil
|
||||
healthMonitorTask?.cancel()
|
||||
healthMonitorTask = nil
|
||||
if let client = acpClient {
|
||||
Task { await client.stop() }
|
||||
}
|
||||
acpClient = nil
|
||||
hasActiveProcess = false
|
||||
}
|
||||
|
||||
/// Respond to a permission request from the ACP agent.
|
||||
func respondToPermission(optionId: String) {
|
||||
guard let client = acpClient,
|
||||
let permission = richChatViewModel.pendingPermission else { return }
|
||||
Task {
|
||||
await client.respondToPermission(requestId: permission.requestId, optionId: optionId)
|
||||
}
|
||||
richChatViewModel.pendingPermission = nil
|
||||
}
|
||||
|
||||
// MARK: - Recent Sessions
|
||||
|
||||
func loadRecentSessions() async {
|
||||
let opened = await dataService.open()
|
||||
guard opened else { return }
|
||||
@@ -83,6 +428,8 @@ final class ChatViewModel {
|
||||
return session.id
|
||||
}
|
||||
|
||||
// MARK: - Voice (terminal mode only)
|
||||
|
||||
func toggleVoice() {
|
||||
guard let tv = terminalView else { return }
|
||||
if voiceEnabled {
|
||||
@@ -104,31 +451,12 @@ final class ChatViewModel {
|
||||
|
||||
func pushToTalk() {
|
||||
guard let tv = terminalView, voiceEnabled else { return }
|
||||
// Ctrl+B = ASCII 0x02
|
||||
let ctrlB: [UInt8] = [0x02]
|
||||
tv.send(source: tv, data: ctrlB[0..<1])
|
||||
isRecording.toggle()
|
||||
}
|
||||
|
||||
private func discoverActiveSessionId() async {
|
||||
// Capture the session that existed before launch so we can detect the new one
|
||||
let previousSessionId = recentSessions.first?.id
|
||||
for _ in 0..<8 {
|
||||
let opened = await dataService.open()
|
||||
guard opened else {
|
||||
try? await Task.sleep(for: .seconds(1))
|
||||
continue
|
||||
}
|
||||
let sessions = await dataService.fetchSessions(limit: 1)
|
||||
await dataService.close()
|
||||
if let newest = sessions.first, newest.id != previousSessionId {
|
||||
activeSessionId = newest.id
|
||||
richChatViewModel.startPolling(sessionId: newest.id)
|
||||
return
|
||||
}
|
||||
try? await Task.sleep(for: .seconds(1))
|
||||
}
|
||||
}
|
||||
// MARK: - Terminal Mode
|
||||
|
||||
private func sendToTerminal(_ tv: LocalProcessTerminalView, text: String) {
|
||||
let bytes = Array(text.utf8)
|
||||
@@ -136,6 +464,8 @@ final class ChatViewModel {
|
||||
}
|
||||
|
||||
private func launchTerminal(arguments: [String]) {
|
||||
stopACP()
|
||||
|
||||
if let existing = terminalView {
|
||||
existing.terminate()
|
||||
existing.removeFromSuperview()
|
||||
@@ -150,7 +480,6 @@ final class ChatViewModel {
|
||||
self?.hasActiveProcess = false
|
||||
self?.voiceEnabled = false
|
||||
self?.isRecording = false
|
||||
self?.richChatViewModel.stopPolling()
|
||||
Task { await self?.richChatViewModel.refreshMessages() }
|
||||
})
|
||||
terminal.processDelegate = coord
|
||||
|
||||
@@ -31,83 +31,397 @@ final class RichChatViewModel {
|
||||
var currentSession: HermesSession?
|
||||
var messageGroups: [MessageGroup] = []
|
||||
var isAgentWorking = false
|
||||
var pendingPermission: PendingPermission?
|
||||
|
||||
private var lastKnownCount = 0
|
||||
private var pollingTask: Task<Void, Never>?
|
||||
private var sessionId: String?
|
||||
var hasMessages: Bool { !messages.isEmpty }
|
||||
|
||||
func startPolling(sessionId: String) {
|
||||
self.sessionId = sessionId
|
||||
lastKnownCount = 0
|
||||
private(set) var sessionId: String?
|
||||
/// The original CLI session ID when resuming a CLI session via ACP.
|
||||
/// Used to combine old CLI messages with new ACP messages.
|
||||
private(set) var originSessionId: String?
|
||||
private var nextLocalId = -1
|
||||
private var streamingAssistantText = ""
|
||||
private var streamingThinkingText = ""
|
||||
private var streamingToolCalls: [HermesToolCall] = []
|
||||
|
||||
// DB polling state (used in terminal mode fallback)
|
||||
private var lastKnownFingerprint: HermesDataService.MessageFingerprint?
|
||||
private var debounceTask: Task<Void, Never>?
|
||||
private var resetTimestamp: Date?
|
||||
private var userSendPending = false
|
||||
private var activePollingTimer: Timer?
|
||||
|
||||
struct PendingPermission {
|
||||
let requestId: Int
|
||||
let title: String
|
||||
let kind: String
|
||||
let options: [(optionId: String, name: String)]
|
||||
}
|
||||
|
||||
// MARK: - Reset
|
||||
|
||||
func reset() {
|
||||
debounceTask?.cancel()
|
||||
stopActivePolling()
|
||||
Task { await dataService.close() }
|
||||
messages = []
|
||||
messageGroups = []
|
||||
currentSession = nil
|
||||
lastKnownFingerprint = nil
|
||||
sessionId = nil
|
||||
originSessionId = nil
|
||||
isAgentWorking = false
|
||||
userSendPending = false
|
||||
resetTimestamp = Date()
|
||||
nextLocalId = -1
|
||||
streamingAssistantText = ""
|
||||
streamingThinkingText = ""
|
||||
streamingToolCalls = []
|
||||
pendingPermission = nil
|
||||
}
|
||||
|
||||
pollingTask?.cancel()
|
||||
pollingTask = Task { [weak self] in
|
||||
while !Task.isCancelled {
|
||||
await self?.refreshMessages()
|
||||
try? await Task.sleep(for: .milliseconds(750))
|
||||
}
|
||||
func setSessionId(_ id: String?) {
|
||||
sessionId = id
|
||||
lastKnownFingerprint = nil
|
||||
}
|
||||
|
||||
func cleanup() async {
|
||||
stopActivePolling()
|
||||
debounceTask?.cancel()
|
||||
await dataService.close()
|
||||
}
|
||||
|
||||
// MARK: - ACP Event Handling
|
||||
|
||||
/// Add a user message immediately (before DB write) for instant UI feedback.
|
||||
func addUserMessage(text: String) {
|
||||
let id = nextLocalId
|
||||
nextLocalId -= 1
|
||||
let message = HermesMessage(
|
||||
id: id,
|
||||
sessionId: sessionId ?? "",
|
||||
role: "user",
|
||||
content: text,
|
||||
toolCallId: nil,
|
||||
toolCalls: [],
|
||||
toolName: nil,
|
||||
timestamp: Date(),
|
||||
tokenCount: nil,
|
||||
finishReason: nil,
|
||||
reasoning: nil
|
||||
)
|
||||
messages.append(message)
|
||||
isAgentWorking = true
|
||||
streamingAssistantText = ""
|
||||
streamingThinkingText = ""
|
||||
streamingToolCalls = []
|
||||
buildMessageGroups()
|
||||
}
|
||||
|
||||
/// Process a streaming ACP event and update the message list.
|
||||
func handleACPEvent(_ event: ACPEvent) {
|
||||
switch event {
|
||||
case .messageChunk(_, let text):
|
||||
appendMessageChunk(text: text)
|
||||
case .thoughtChunk(_, let text):
|
||||
appendThoughtChunk(text: text)
|
||||
case .toolCallStart(_, let call):
|
||||
handleToolCallStart(call)
|
||||
case .toolCallUpdate(_, let update):
|
||||
handleToolCallComplete(update)
|
||||
case .permissionRequest(_, let requestId, let request):
|
||||
pendingPermission = PendingPermission(
|
||||
requestId: requestId,
|
||||
title: request.toolCallTitle,
|
||||
kind: request.toolCallKind,
|
||||
options: request.options
|
||||
)
|
||||
case .promptComplete:
|
||||
handlePromptComplete()
|
||||
case .connectionLost(let reason):
|
||||
handleConnectionLost(reason: reason)
|
||||
case .availableCommands, .unknown:
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
func stopPolling() {
|
||||
pollingTask?.cancel()
|
||||
pollingTask = nil
|
||||
private func appendMessageChunk(text: String) {
|
||||
streamingAssistantText += text
|
||||
upsertStreamingMessage()
|
||||
}
|
||||
|
||||
private func appendThoughtChunk(text: String) {
|
||||
streamingThinkingText += text
|
||||
upsertStreamingMessage()
|
||||
}
|
||||
|
||||
private func handleToolCallStart(_ call: ACPToolCallEvent) {
|
||||
let toolCall = HermesToolCall(
|
||||
callId: call.toolCallId,
|
||||
functionName: call.functionName,
|
||||
arguments: call.argumentsJSON
|
||||
)
|
||||
streamingToolCalls.append(toolCall)
|
||||
upsertStreamingMessage()
|
||||
}
|
||||
|
||||
private func handleToolCallComplete(_ update: ACPToolCallUpdateEvent) {
|
||||
// Finalize the streaming assistant message (with its tool calls) as a permanent message
|
||||
finalizeStreamingMessage()
|
||||
|
||||
// Add tool result message
|
||||
let id = nextLocalId
|
||||
nextLocalId -= 1
|
||||
messages.append(HermesMessage(
|
||||
id: id,
|
||||
sessionId: sessionId ?? "",
|
||||
role: "tool",
|
||||
content: update.rawOutput ?? update.content,
|
||||
toolCallId: update.toolCallId,
|
||||
toolCalls: [],
|
||||
toolName: nil,
|
||||
timestamp: Date(),
|
||||
tokenCount: nil,
|
||||
finishReason: nil,
|
||||
reasoning: nil
|
||||
))
|
||||
buildMessageGroups()
|
||||
}
|
||||
|
||||
private func handlePromptComplete() {
|
||||
// Finalize any remaining streaming content
|
||||
finalizeStreamingMessage()
|
||||
isAgentWorking = false
|
||||
buildMessageGroups()
|
||||
}
|
||||
|
||||
func markAgentWorking() {
|
||||
isAgentWorking = true
|
||||
private func handleConnectionLost(reason: String) {
|
||||
finalizeStreamingMessage()
|
||||
let id = nextLocalId
|
||||
nextLocalId -= 1
|
||||
messages.append(HermesMessage(
|
||||
id: id,
|
||||
sessionId: sessionId ?? "",
|
||||
role: "system",
|
||||
content: "Connection lost: \(reason). Use the Session menu to start or resume a session.",
|
||||
toolCallId: nil,
|
||||
toolCalls: [],
|
||||
toolName: nil,
|
||||
timestamp: Date(),
|
||||
tokenCount: nil,
|
||||
finishReason: nil,
|
||||
reasoning: nil
|
||||
))
|
||||
isAgentWorking = false
|
||||
pendingPermission = nil
|
||||
buildMessageGroups()
|
||||
}
|
||||
|
||||
func refreshMessages() async {
|
||||
guard let sessionId else { return }
|
||||
// MARK: - Streaming Message Management
|
||||
|
||||
private static let streamingId = 0
|
||||
|
||||
/// Insert or update the in-progress streaming assistant message (id=0).
|
||||
private func upsertStreamingMessage() {
|
||||
let msg = HermesMessage(
|
||||
id: Self.streamingId,
|
||||
sessionId: sessionId ?? "",
|
||||
role: "assistant",
|
||||
content: streamingAssistantText,
|
||||
toolCallId: nil,
|
||||
toolCalls: streamingToolCalls,
|
||||
toolName: nil,
|
||||
timestamp: Date(),
|
||||
tokenCount: nil,
|
||||
finishReason: nil,
|
||||
reasoning: streamingThinkingText.isEmpty ? nil : streamingThinkingText
|
||||
)
|
||||
|
||||
if let idx = messages.firstIndex(where: { $0.id == Self.streamingId }) {
|
||||
messages[idx] = msg
|
||||
} else {
|
||||
messages.append(msg)
|
||||
}
|
||||
buildMessageGroups()
|
||||
}
|
||||
|
||||
/// Convert the streaming message (id=0) into a permanent message and reset streaming state.
|
||||
private func finalizeStreamingMessage() {
|
||||
guard let idx = messages.firstIndex(where: { $0.id == Self.streamingId }) else { return }
|
||||
|
||||
// Only finalize if there's actual content
|
||||
let hasContent = !streamingAssistantText.isEmpty
|
||||
|| !streamingThinkingText.isEmpty
|
||||
|| !streamingToolCalls.isEmpty
|
||||
|
||||
if hasContent {
|
||||
let id = nextLocalId
|
||||
nextLocalId -= 1
|
||||
messages[idx] = HermesMessage(
|
||||
id: id,
|
||||
sessionId: sessionId ?? "",
|
||||
role: "assistant",
|
||||
content: streamingAssistantText,
|
||||
toolCallId: nil,
|
||||
toolCalls: streamingToolCalls,
|
||||
toolName: nil,
|
||||
timestamp: Date(),
|
||||
tokenCount: nil,
|
||||
finishReason: streamingToolCalls.isEmpty ? "stop" : nil,
|
||||
reasoning: streamingThinkingText.isEmpty ? nil : streamingThinkingText
|
||||
)
|
||||
} else {
|
||||
// Remove empty streaming placeholder
|
||||
messages.remove(at: idx)
|
||||
}
|
||||
|
||||
// Reset streaming state for next chunk
|
||||
streamingAssistantText = ""
|
||||
streamingThinkingText = ""
|
||||
streamingToolCalls = []
|
||||
}
|
||||
|
||||
// MARK: - Load History from DB (for resumed sessions)
|
||||
|
||||
/// Load message history from the DB, optionally combining an origin session
|
||||
/// (e.g., CLI session) with the current ACP session.
|
||||
func loadSessionHistory(sessionId: String, acpSessionId: String? = nil) async {
|
||||
self.sessionId = sessionId
|
||||
let opened = await dataService.open()
|
||||
guard opened else { return }
|
||||
|
||||
let count = await dataService.fetchMessageCount(sessionId: sessionId)
|
||||
var allMessages = await dataService.fetchMessages(sessionId: sessionId)
|
||||
let session = await dataService.fetchSession(id: sessionId)
|
||||
|
||||
if count != lastKnownCount {
|
||||
// If the ACP session is different from the origin, load its messages too
|
||||
// and combine them chronologically
|
||||
if let acpId = acpSessionId, acpId != sessionId {
|
||||
originSessionId = sessionId
|
||||
self.sessionId = acpId
|
||||
let acpMessages = await dataService.fetchMessages(sessionId: acpId)
|
||||
if !acpMessages.isEmpty {
|
||||
allMessages.append(contentsOf: acpMessages)
|
||||
allMessages.sort { ($0.timestamp ?? .distantPast) < ($1.timestamp ?? .distantPast) }
|
||||
}
|
||||
}
|
||||
|
||||
messages = allMessages
|
||||
currentSession = session
|
||||
let minId = allMessages.map(\.id).min() ?? 0
|
||||
nextLocalId = min(minId - 1, -1)
|
||||
buildMessageGroups()
|
||||
}
|
||||
|
||||
// MARK: - DB Polling (terminal mode fallback)
|
||||
|
||||
func markAgentWorking() {
|
||||
isAgentWorking = true
|
||||
userSendPending = true
|
||||
startActivePolling()
|
||||
}
|
||||
|
||||
func scheduleRefresh() {
|
||||
debounceTask?.cancel()
|
||||
debounceTask = Task { @MainActor [weak self] in
|
||||
try? await Task.sleep(for: .milliseconds(100))
|
||||
guard !Task.isCancelled else { return }
|
||||
await self?.refreshMessages()
|
||||
}
|
||||
}
|
||||
|
||||
func refreshMessages() async {
|
||||
let opened = await dataService.open()
|
||||
guard opened else { return }
|
||||
|
||||
if sessionId == nil {
|
||||
if let resetTime = resetTimestamp {
|
||||
if let candidate = await dataService.fetchMostRecentlyStartedSessionId(after: resetTime) {
|
||||
sessionId = candidate
|
||||
}
|
||||
}
|
||||
if sessionId == nil {
|
||||
sessionId = await dataService.fetchMostRecentlyActiveSessionId()
|
||||
}
|
||||
}
|
||||
|
||||
guard let sessionId else { return }
|
||||
|
||||
let fingerprint = await dataService.fetchMessageFingerprint(sessionId: sessionId)
|
||||
|
||||
if fingerprint != lastKnownFingerprint {
|
||||
let fetched = await dataService.fetchMessages(sessionId: sessionId)
|
||||
let session = await dataService.fetchSession(id: sessionId)
|
||||
lastKnownCount = count
|
||||
lastKnownFingerprint = fingerprint
|
||||
|
||||
messages = fetched
|
||||
currentSession = session
|
||||
buildMessageGroups()
|
||||
|
||||
if let last = fetched.last {
|
||||
if last.isAssistant && last.toolCalls.isEmpty {
|
||||
isAgentWorking = false
|
||||
} else if last.isUser {
|
||||
isAgentWorking = false
|
||||
let derivedWorking = deriveAgentWorking(from: fetched)
|
||||
if userSendPending {
|
||||
if fetched.last?.isUser == true {
|
||||
userSendPending = false
|
||||
}
|
||||
isAgentWorking = true
|
||||
} else {
|
||||
let wasWorking = isAgentWorking
|
||||
isAgentWorking = derivedWorking
|
||||
if wasWorking && !derivedWorking {
|
||||
stopActivePolling()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let session = await dataService.fetchSession(id: sessionId)
|
||||
currentSession = session
|
||||
}
|
||||
|
||||
await dataService.close()
|
||||
}
|
||||
|
||||
private func startActivePolling() {
|
||||
stopActivePolling()
|
||||
activePollingTimer = Timer.scheduledTimer(withTimeInterval: 0.5, repeats: true) { [weak self] _ in
|
||||
Task { @MainActor [weak self] in
|
||||
await self?.refreshMessages()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func stopActivePolling() {
|
||||
activePollingTimer?.invalidate()
|
||||
activePollingTimer = nil
|
||||
}
|
||||
|
||||
private func deriveAgentWorking(from fetched: [HermesMessage]) -> Bool {
|
||||
guard let last = fetched.last else { return false }
|
||||
if last.isUser { return true }
|
||||
if last.isToolResult { return true }
|
||||
if last.isAssistant {
|
||||
if !last.toolCalls.isEmpty {
|
||||
let allCallIds = Set(last.toolCalls.map(\.callId))
|
||||
let resultCallIds = Set(fetched.compactMap { $0.isToolResult ? $0.toolCallId : nil })
|
||||
return !allCallIds.subtracting(resultCallIds).isEmpty
|
||||
}
|
||||
return last.finishReason == nil
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// MARK: - Message Grouping
|
||||
|
||||
private func buildMessageGroups() {
|
||||
var groups: [MessageGroup] = []
|
||||
var currentUser: HermesMessage?
|
||||
var currentAssistant: [HermesMessage] = []
|
||||
var currentToolResults: [String: HermesMessage] = [:]
|
||||
var groupIndex = 0
|
||||
|
||||
func flushGroup() {
|
||||
if currentUser != nil || !currentAssistant.isEmpty {
|
||||
// Use stable sequential IDs so SwiftUI doesn't re-create views
|
||||
// when streaming messages finalize (id changes from 0 to -N)
|
||||
groups.append(MessageGroup(
|
||||
id: currentUser?.id ?? currentAssistant.first?.id ?? groups.count,
|
||||
id: groupIndex,
|
||||
userMessage: currentUser,
|
||||
assistantMessages: currentAssistant,
|
||||
toolResults: currentToolResults
|
||||
))
|
||||
groupIndex += 1
|
||||
}
|
||||
currentUser = nil
|
||||
currentAssistant = []
|
||||
|
||||
@@ -27,9 +27,27 @@ struct ChatView: View {
|
||||
Circle()
|
||||
.fill(.green)
|
||||
.frame(width: 6, height: 6)
|
||||
Text("Active")
|
||||
Text(viewModel.acpStatus.isEmpty ? "Active" : viewModel.acpStatus)
|
||||
.font(.caption)
|
||||
.foregroundStyle(.secondary)
|
||||
.lineLimit(1)
|
||||
} else if let error = viewModel.acpError {
|
||||
Circle()
|
||||
.fill(.red)
|
||||
.frame(width: 6, height: 6)
|
||||
Text(error)
|
||||
.font(.caption)
|
||||
.foregroundStyle(.red)
|
||||
.lineLimit(1)
|
||||
.help(error)
|
||||
} else if !viewModel.acpStatus.isEmpty {
|
||||
Circle()
|
||||
.fill(.yellow)
|
||||
.frame(width: 6, height: 6)
|
||||
Text(viewModel.acpStatus)
|
||||
.font(.caption)
|
||||
.foregroundStyle(.secondary)
|
||||
.lineLimit(1)
|
||||
} else {
|
||||
Circle()
|
||||
.fill(.secondary)
|
||||
@@ -41,7 +59,7 @@ struct ChatView: View {
|
||||
|
||||
Spacer()
|
||||
|
||||
if viewModel.hasActiveProcess {
|
||||
if viewModel.hasActiveProcess && viewModel.displayMode == .terminal {
|
||||
voiceControls
|
||||
}
|
||||
|
||||
@@ -63,6 +81,13 @@ struct ChatView: View {
|
||||
}
|
||||
|
||||
Menu {
|
||||
if viewModel.hasActiveProcess, let activeId = viewModel.richChatViewModel.sessionId {
|
||||
Button("Return to Active Session (\(activeId.prefix(8))...)") {
|
||||
// Already active — just ensure we're showing it
|
||||
}
|
||||
.disabled(true)
|
||||
Divider()
|
||||
}
|
||||
Button("New Session") {
|
||||
viewModel.startNewSession()
|
||||
}
|
||||
@@ -183,7 +208,7 @@ struct ChatView: View {
|
||||
@ViewBuilder
|
||||
private var richChatArea: some View {
|
||||
ZStack {
|
||||
// Keep terminal alive in background for process hosting
|
||||
// Keep terminal alive in background if it exists (terminal mode session)
|
||||
if let terminal = viewModel.terminalView {
|
||||
PersistentTerminalView(terminalView: terminal)
|
||||
.frame(width: 0, height: 0)
|
||||
@@ -192,7 +217,11 @@ struct ChatView: View {
|
||||
}
|
||||
|
||||
if viewModel.hermesBinaryExists {
|
||||
RichChatView()
|
||||
RichChatView(
|
||||
richChat: viewModel.richChatViewModel,
|
||||
onSend: { viewModel.sendText($0) },
|
||||
isEnabled: viewModel.hasActiveProcess || viewModel.hermesBinaryExists
|
||||
)
|
||||
} else {
|
||||
ContentUnavailableView(
|
||||
"Hermes Not Found",
|
||||
@@ -202,5 +231,92 @@ struct ChatView: View {
|
||||
.frame(maxWidth: .infinity, maxHeight: .infinity)
|
||||
}
|
||||
}
|
||||
// Permission approval sheet
|
||||
.sheet(item: permissionBinding) { permission in
|
||||
PermissionApprovalView(
|
||||
title: permission.title,
|
||||
kind: permission.kind,
|
||||
options: permission.options,
|
||||
onRespond: { optionId in
|
||||
viewModel.respondToPermission(optionId: optionId)
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private var permissionBinding: Binding<RichChatViewModel.PendingPermission?> {
|
||||
Binding(
|
||||
get: { viewModel.richChatViewModel.pendingPermission },
|
||||
set: { viewModel.richChatViewModel.pendingPermission = $0 }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Permission Approval View
|
||||
|
||||
extension RichChatViewModel.PendingPermission: @retroactive Identifiable {
|
||||
var id: Int { requestId }
|
||||
}
|
||||
|
||||
struct PermissionApprovalView: View {
|
||||
let title: String
|
||||
let kind: String
|
||||
let options: [(optionId: String, name: String)]
|
||||
let onRespond: (String) -> Void
|
||||
@Environment(\.dismiss) private var dismiss
|
||||
|
||||
var body: some View {
|
||||
VStack(spacing: 16) {
|
||||
Image(systemName: kindIcon)
|
||||
.font(.title)
|
||||
.foregroundStyle(kindColor)
|
||||
|
||||
Text("Tool Approval Required")
|
||||
.font(.headline)
|
||||
|
||||
Text(title)
|
||||
.font(.body.monospaced())
|
||||
.foregroundStyle(.secondary)
|
||||
.multilineTextAlignment(.center)
|
||||
.padding(.horizontal)
|
||||
|
||||
HStack(spacing: 12) {
|
||||
ForEach(options, id: \.optionId) { option in
|
||||
if option.optionId == "deny" {
|
||||
Button(option.name) {
|
||||
onRespond(option.optionId)
|
||||
dismiss()
|
||||
}
|
||||
.buttonStyle(.bordered)
|
||||
} else {
|
||||
Button(option.name) {
|
||||
onRespond(option.optionId)
|
||||
dismiss()
|
||||
}
|
||||
.buttonStyle(.borderedProminent)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
.padding(24)
|
||||
.frame(minWidth: 350)
|
||||
}
|
||||
|
||||
private var kindIcon: String {
|
||||
switch kind {
|
||||
case "execute": return "terminal"
|
||||
case "edit": return "pencil"
|
||||
case "delete": return "trash"
|
||||
default: return "wrench"
|
||||
}
|
||||
}
|
||||
|
||||
private var kindColor: Color {
|
||||
switch kind {
|
||||
case "execute": return .orange
|
||||
case "edit": return .blue
|
||||
case "delete": return .red
|
||||
default: return .secondary
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,12 +4,22 @@ struct RichChatMessageList: View {
|
||||
let groups: [MessageGroup]
|
||||
let isWorking: Bool
|
||||
|
||||
/// Track the last group's assistant content length to detect streaming updates.
|
||||
private var scrollAnchor: String {
|
||||
if isWorking { return "typing-indicator" }
|
||||
if let last = groups.last { return "group-\(last.id)" }
|
||||
return "scroll-top"
|
||||
}
|
||||
|
||||
var body: some View {
|
||||
ScrollViewReader { proxy in
|
||||
ScrollView {
|
||||
LazyVStack(alignment: .leading, spacing: 16) {
|
||||
Spacer(minLength: 0)
|
||||
.id("scroll-top")
|
||||
ForEach(groups) { group in
|
||||
MessageGroupView(group: group)
|
||||
.id("group-\(group.id)")
|
||||
}
|
||||
|
||||
if isWorking {
|
||||
@@ -19,29 +29,45 @@ struct RichChatMessageList: View {
|
||||
}
|
||||
.padding()
|
||||
}
|
||||
.defaultScrollAnchor(.bottom)
|
||||
// Scroll on new groups
|
||||
.onChange(of: groups.count) {
|
||||
withAnimation(.easeOut(duration: 0.2)) {
|
||||
if isWorking {
|
||||
proxy.scrollTo("typing-indicator", anchor: .bottom)
|
||||
} else if let last = groups.last {
|
||||
proxy.scrollTo(last.id, anchor: .bottom)
|
||||
}
|
||||
}
|
||||
scrollToBottom(proxy: proxy)
|
||||
}
|
||||
// Scroll when agent starts/stops working
|
||||
.onChange(of: isWorking) {
|
||||
if isWorking {
|
||||
withAnimation(.easeOut(duration: 0.2)) {
|
||||
proxy.scrollTo("typing-indicator", anchor: .bottom)
|
||||
}
|
||||
}
|
||||
scrollToBottom(proxy: proxy)
|
||||
}
|
||||
// Scroll on streaming content updates (group content changes)
|
||||
.onChange(of: scrollAnchor) {
|
||||
scrollToBottom(proxy: proxy)
|
||||
}
|
||||
// Scroll on last message content change (streaming text)
|
||||
.onChange(of: groups.last?.assistantMessages.last?.content ?? "") {
|
||||
scrollToBottom(proxy: proxy, animated: false)
|
||||
}
|
||||
// Scroll on tool call count change
|
||||
.onChange(of: groups.last?.toolCallCount ?? 0) {
|
||||
scrollToBottom(proxy: proxy)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func scrollToBottom(proxy: ScrollViewProxy, animated: Bool = true) {
|
||||
let target = scrollAnchor
|
||||
if animated {
|
||||
withAnimation(.easeOut(duration: 0.15)) {
|
||||
proxy.scrollTo(target, anchor: .bottom)
|
||||
}
|
||||
} else {
|
||||
proxy.scrollTo(target, anchor: .bottom)
|
||||
}
|
||||
}
|
||||
|
||||
private var typingIndicator: some View {
|
||||
HStack {
|
||||
HStack(spacing: 4) {
|
||||
ForEach(0..<3, id: \.self) { i in
|
||||
ForEach(0..<3, id: \.self) { _ in
|
||||
Circle()
|
||||
.fill(.secondary)
|
||||
.frame(width: 6, height: 6)
|
||||
@@ -76,7 +102,6 @@ struct MessageGroupView: View {
|
||||
toolSummary
|
||||
}
|
||||
}
|
||||
.id(group.id)
|
||||
}
|
||||
|
||||
@ViewBuilder
|
||||
|
||||
@@ -1,18 +1,24 @@
|
||||
import SwiftUI
|
||||
|
||||
struct RichChatView: View {
|
||||
@Environment(ChatViewModel.self) private var viewModel
|
||||
@Bindable var richChat: RichChatViewModel
|
||||
var onSend: (String) -> Void
|
||||
var isEnabled: Bool
|
||||
@Environment(HermesFileWatcher.self) private var fileWatcher
|
||||
@Environment(ChatViewModel.self) private var chatViewModel
|
||||
|
||||
/// In ACP mode, events drive updates directly — no DB polling needed.
|
||||
private var isACPMode: Bool { chatViewModel.isACPConnected }
|
||||
|
||||
var body: some View {
|
||||
VStack(spacing: 0) {
|
||||
SessionInfoBar(
|
||||
session: viewModel.richChatViewModel.currentSession,
|
||||
isWorking: viewModel.richChatViewModel.isAgentWorking
|
||||
session: richChat.currentSession,
|
||||
isWorking: richChat.isAgentWorking
|
||||
)
|
||||
Divider()
|
||||
|
||||
if viewModel.richChatViewModel.messageGroups.isEmpty && !viewModel.richChatViewModel.isAgentWorking {
|
||||
if richChat.messageGroups.isEmpty && !richChat.isAgentWorking {
|
||||
ContentUnavailableView(
|
||||
"Chat Messages",
|
||||
systemImage: "bubble.left.and.text.bubble.right",
|
||||
@@ -21,22 +27,24 @@ struct RichChatView: View {
|
||||
.frame(maxWidth: .infinity, maxHeight: .infinity)
|
||||
} else {
|
||||
RichChatMessageList(
|
||||
groups: viewModel.richChatViewModel.messageGroups,
|
||||
isWorking: viewModel.richChatViewModel.isAgentWorking
|
||||
groups: richChat.messageGroups,
|
||||
isWorking: richChat.isAgentWorking
|
||||
)
|
||||
}
|
||||
|
||||
Divider()
|
||||
RichChatInputBar(
|
||||
onSend: { text in
|
||||
viewModel.sendText(text)
|
||||
viewModel.richChatViewModel.markAgentWorking()
|
||||
onSend(text)
|
||||
},
|
||||
isEnabled: viewModel.hasActiveProcess
|
||||
isEnabled: isEnabled
|
||||
)
|
||||
}
|
||||
// DB polling fallback for terminal mode only — never overwrite ACP messages
|
||||
.onChange(of: fileWatcher.lastChangeDate) {
|
||||
Task { await viewModel.richChatViewModel.refreshMessages() }
|
||||
if !isACPMode, !richChat.hasMessages, richChat.sessionId != nil {
|
||||
richChat.scheduleRefresh()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,13 @@ struct SessionInfoBar: View {
|
||||
}
|
||||
}
|
||||
|
||||
if let title = session.title, !title.isEmpty {
|
||||
Text(title)
|
||||
.font(.caption.bold())
|
||||
.lineLimit(1)
|
||||
.truncationMode(.tail)
|
||||
}
|
||||
|
||||
if let model = session.model {
|
||||
Label(model, systemImage: "cpu")
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ struct ToolUsage: Identifiable {
|
||||
}
|
||||
|
||||
struct NotableSession: Identifiable {
|
||||
var id: String { session.id }
|
||||
var id: String { "\(session.id)-\(label)" }
|
||||
let label: String
|
||||
let value: String
|
||||
let session: HermesSession
|
||||
|
||||
Reference in New Issue
Block a user