iOS port M1: decouple ACPClient from Process via ACPChannel protocol

Introduces the key architectural abstraction that lets iOS share the
ACP state machine with Mac in M4+. ACPClient no longer touches
`Process`, `Pipe`, file descriptors, or SSH sessions directly — it
reads / writes line-oriented JSON-RPC through an `ACPChannel`.

New in ScarfCore/ACP/:
  - ACPChannel.swift (protocol + ACPChannelError enum)
  - ProcessACPChannel.swift (Mac + Linux; `#if !os(iOS)` guard —
    iOS can't spawn subprocesses). Wraps the Process + Pipe +
    raw POSIX write(2) code that used to live inline inside
    ACPClient: SIGPIPE-ignore, partial-write loops, EPIPE →
    `.writeEndClosed`, graceful SIGINT + 2s SIGKILL watchdog.
    Uses `canImport(Darwin)` / `canImport(Glibc)` for the
    platform-specific `write(2)` binding.
  - ACPClient.swift (moved from scarf/Core/Services and refactored).
    Process/Pipe/stdinFd/Darwin.write state replaced with a single
    `channel: any ACPChannel` reference. Construction takes a
    `ChannelFactory = @Sendable (ServerContext) async throws -> any ACPChannel`
    closure — Mac wires ProcessACPChannel, iOS will wire a Citadel
    SSHExecACPChannel in M4.

Mac-side glue (stays in main target):
  - scarf/Core/Services/ACPClient+Mac.swift (new) carries the
    `ACPClient.forMacApp(context:)` factory. Internally spawns
    `hermes acp` locally or `ssh -T host -- hermes acp` remotely
    via SSHTransport.makeProcess, passing the enriched shell env
    (local: full PATH + credentials; remote: just SSH_AUTH_SOCK
    + SSH_AGENT_PID) with TERM stripped. Behaviour identical to
    pre-M1.
  - ChatViewModel updated at 3 sites from `ACPClient(context:)`
    to `ACPClient.forMacApp(context:)`.

Public API change callers need to know about:
  - `ACPClient.respondToPermission(requestId:optionId:)` is now
    `async`. ChatViewModel already `await`ed it, so that upgrade
    is a no-op; no other callers.

Also deleted scarf/Core/Services/ACPClient.swift (605 lines;
replaced by ScarfCore version).

Test coverage (M1ACPTests, 10 tests):
  Using a MockACPChannel actor to script JSON-RPC deterministically,
  not a real subprocess:
  - ACPChannel protocol (mock send/receive, write-after-close,
    error descriptions).
  - ACPClient initial state.
  - start() sends initialize and flips isConnected on reply.
  - RPC error reply surfaces as ACPClientError.rpcError.
  - Mid-flight channel close → pending request resolves with
    .processTerminated, isConnected flips false.
  - session/update notification routes into the `events` stream
    as .messageChunk.
  - Stderr lines feed the recentStderr ring buffer.
  - ACPErrorHint.classify across credential / missing-binary /
    rate-limit / unknown cases.

`swift test` on Linux now reports 62 / 62 passing.

Updated scarf/docs/IOS_PORT_PLAN.md with M1's shipped state, the
behavior-preservation rationale for the Mac factory, and the
iOS hook point M2–M4 will plug into.

https://claude.ai/code/session_019yMRP6mwZWfzVrPTqevx2y
This commit is contained in:
Claude
2026-04-22 22:49:24 +00:00
parent 920c86b4f8
commit bdf31d6781
7 changed files with 972 additions and 253 deletions
@@ -0,0 +1,82 @@
import Foundation
/// The bidirectional line-oriented transport that `ACPClient` speaks
/// JSON-RPC over. Abstracts away whether the other end is a local
/// `hermes acp` subprocess (macOS) or a remote SSH exec channel (iOS via
/// Citadel in M4+). ACPClient never touches `Process`, `Pipe`, file
/// descriptors, or SSH sessions directly it just sends and receives
/// newline-delimited JSON lines over one of these.
///
/// **Line framing.** Senders pass a JSON object serialized to a single
/// line (no embedded `\n`). The channel appends the terminator itself.
/// The receiver yields one complete JSON line per `incoming` element;
/// partial lines are buffered internally until a newline arrives.
///
/// **Lifecycle.** A channel is "already live" when you hold a reference
/// the constructor (or channel-factory call) spawns the subprocess / opens
/// the SSH exec channel. `close()` tears down and causes `incoming` /
/// `stderr` to finish. After `close()`, `send(_:)` throws.
///
/// **Errors.** Transport errors (broken pipe, SSH disconnect, process
/// died) surface as an error-terminated `incoming` stream consumers
/// should be prepared for that, not just for clean `.finished` stream
/// termination. `send(_:)` also throws on these.
public protocol ACPChannel: Sendable {
/// Append `\n` and write atomically. Thread-safe (the actor boundary
/// is on the implementation side, not the protocol).
func send(_ line: String) async throws
/// One complete JSON-RPC line per element, without the trailing
/// newline. Yields in arrival order. Finishes (clean or error) when
/// the underlying transport closes.
var incoming: AsyncThrowingStream<String, Error> { get }
/// Diagnostic stderr. For `ProcessACPChannel` this is the spawned
/// process's stderr, line-buffered. For future SSH-exec channels
/// where stderr folds into events, this is an empty stream.
/// Lines are yielded without the trailing newline.
var stderr: AsyncThrowingStream<String, Error> { get }
/// Request graceful shutdown. Closes stdin first (so the remote side
/// sees EOF and can flush), then waits briefly for the subprocess /
/// exec channel to exit, then force-terminates. Idempotent calling
/// `close()` on an already-closed channel is a no-op.
func close() async
/// Short identifier for logs. Process channels return the child PID;
/// SSH exec channels return the SSH channel id or `nil` when not
/// applicable.
var diagnosticID: String? { get async }
}
/// Errors raised by `ACPChannel` implementations when the underlying
/// transport breaks. JSON-RPC errors (the remote returning an `error`
/// field) are not in this enum they ride as valid `incoming` lines and
/// are ACPClient's problem to decode.
public enum ACPChannelError: Error, LocalizedError {
/// The underlying subprocess or SSH exec channel exited. `exitCode`
/// is the subprocess exit status (or a synthetic value for SSH).
case closed(exitCode: Int32)
/// `send(_:)` was called on a channel whose write end is already
/// closed. Typically means a previous `close()` call or a pipe
/// broken by a remote termination.
case writeEndClosed
/// Bytes sent or received couldn't be encoded/decoded as UTF-8.
/// Hermes emits only UTF-8; hitting this usually means a framing
/// bug or random binary junk on the channel.
case invalidEncoding
/// Failed to launch the subprocess or open the SSH exec channel.
case launchFailed(String)
/// Catch-all for everything else with a context string.
case other(String)
public var errorDescription: String? {
switch self {
case .closed(let code): return "ACP channel closed (exit \(code))"
case .writeEndClosed: return "ACP channel write end is closed"
case .invalidEncoding: return "ACP channel carried non-UTF-8 bytes"
case .launchFailed(let msg): return "Failed to launch ACP channel: \(msg)"
case .other(let msg): return msg
}
}
}
@@ -0,0 +1,555 @@
import Foundation
#if canImport(os)
import os
#endif
/// Manages an ACP (Agent Client Protocol) session with a backing Hermes
/// agent. Talks JSON-RPC over an `ACPChannel` the channel itself owns
/// the transport (subprocess for macOS, SSH exec session for iOS via
/// Citadel in M4+). This actor is transport-agnostic.
///
/// **Channel factory injection.** Construction takes a closure that
/// builds a channel on demand. The Mac target wires this at app launch
/// to produce a `ProcessACPChannel` configured with the enriched
/// shell env (PATH, credentials). iOS will wire a `SSHExecACPChannel`
/// factory at app launch.
///
/// Under iOS the `ProcessACPChannel` implementation is skipped at
/// compile time (`#if !os(iOS)`) an iOS `ACPClient` that tried to
/// spawn a subprocess would be a build error, not a runtime bug.
public actor ACPClient {
#if canImport(os)
private let logger = Logger(subsystem: "com.scarf", category: "ACPClient")
#endif
/// Returns a fresh ACPChannel connected to `hermes acp` for this
/// context. Mac wires this to spawn a `ProcessACPChannel` with the
/// enriched env (so `hermes` can find Homebrew/nvm/asdf binaries
/// on PATH). iOS wires a Citadel-backed channel in M4+.
public typealias ChannelFactory = @Sendable (ServerContext) async throws -> any ACPChannel
private var channel: (any ACPChannel)?
private let channelFactory: ChannelFactory
private var nextRequestId = 1
private var pendingRequests: [Int: CheckedContinuation<AnyCodable?, Error>] = [:]
private var readTask: Task<Void, Never>?
private var stderrTask: Task<Void, Never>?
private var keepaliveTask: Task<Void, Never>?
private var eventContinuation: AsyncStream<ACPEvent>.Continuation?
private var _eventStream: AsyncStream<ACPEvent>?
public private(set) var isConnected = false
public private(set) var currentSessionId: String?
public private(set) var statusMessage = ""
public let context: ServerContext
public init(
context: ServerContext = .local,
channelFactory: @escaping ChannelFactory
) {
self.context = context
self.channelFactory = channelFactory
}
/// Ring buffer of recent stderr lines from the ACP channel used to
/// attach a diagnostic tail to user-visible errors. Capped to avoid
/// unbounded growth when the subprocess logs heavily.
private var stderrBuffer: [String] = []
private static let stderrBufferMaxLines = 50
/// Returns the last ~`stderrBufferMaxLines` stderr lines captured
/// from the ACP channel, joined by newlines.
public var recentStderr: String {
stderrBuffer.joined(separator: "\n")
}
fileprivate func appendStderr(_ text: String) {
for line in text.split(separator: "\n", omittingEmptySubsequences: true) {
stderrBuffer.append(String(line))
}
if stderrBuffer.count > Self.stderrBufferMaxLines {
stderrBuffer.removeFirst(stderrBuffer.count - Self.stderrBufferMaxLines)
}
}
/// True while the underlying channel is alive. Equivalent to the
/// old `process.isRunning` check.
public var isHealthy: Bool {
isConnected && channel != nil
}
// MARK: - Event Stream
/// Access the event stream. Must call `start()` first. Before start,
/// returns an immediately-finished stream so callers can iterate
/// without a nil check.
public var events: AsyncStream<ACPEvent> {
_eventStream ?? AsyncStream { $0.finish() }
}
// MARK: - Lifecycle
public func start() async throws {
guard channel == nil else { return }
// Create the event stream BEFORE anything else so no events are
// lost while the channel is handshaking.
let (stream, continuation) = AsyncStream.makeStream(of: ACPEvent.self)
self._eventStream = stream
self.eventContinuation = continuation
statusMessage = "Starting hermes acp..."
let ch: any ACPChannel
do {
ch = try await channelFactory(context)
} catch {
statusMessage = "Failed to start: \(error.localizedDescription)"
#if canImport(os)
logger.error("Failed to open ACP channel: \(error.localizedDescription)")
#endif
continuation.finish()
throw error
}
self.channel = ch
self.isConnected = true
// Start reading incoming JSON-RPC BEFORE sending initialize so
// we catch the response.
startReadLoops(channel: ch)
#if canImport(os)
if let id = await ch.diagnosticID {
logger.info("ACP channel opened (\(id, privacy: .public))")
} else {
logger.info("ACP channel opened")
}
#endif
statusMessage = "Initializing..."
// Initialize the ACP connection.
let initParams: [String: AnyCodable] = [
"protocolVersion": AnyCodable(1),
"clientCapabilities": AnyCodable([String: Any]()),
"clientInfo": AnyCodable([
"name": "Scarf",
"version": "1.0",
] as [String: Any]),
]
_ = try await sendRequest(method: "initialize", params: initParams)
statusMessage = "Connected"
#if canImport(os)
logger.info("ACP connection initialized")
#endif
startKeepalive()
}
public func stop() async {
readTask?.cancel()
readTask = nil
stderrTask?.cancel()
stderrTask = nil
keepaliveTask?.cancel()
keepaliveTask = nil
eventContinuation?.finish()
eventContinuation = nil
_eventStream = nil
for (_, continuation) in pendingRequests {
continuation.resume(throwing: CancellationError())
}
pendingRequests.removeAll()
if let ch = channel {
await ch.close()
}
channel = nil
isConnected = false
currentSessionId = nil
statusMessage = "Disconnected"
#if canImport(os)
logger.info("ACP client stopped")
#endif
}
// MARK: - Keepalive
private func startKeepalive() {
keepaliveTask = Task { [weak self] in
while !Task.isCancelled {
try? await Task.sleep(nanoseconds: 30_000_000_000) // 30 seconds
guard !Task.isCancelled else { break }
await self?.sendKeepalive()
}
}
}
/// Valid JSON-RPC notification used as a keepalive probe. Plain
/// newlines upstream produce `json.loads("")` errors in the ACP
/// server so we send a real method.
private static let keepalivePayload: String = #"{"jsonrpc":"2.0","method":"$/ping"}"#
private func sendKeepalive() async {
guard let ch = channel else { return }
do {
try await ch.send(Self.keepalivePayload)
} catch {
await handleWriteFailed()
}
}
// MARK: - Session Management
public func newSession(cwd: String) async throws -> String {
statusMessage = "Creating session..."
let params: [String: AnyCodable] = [
"cwd": AnyCodable(cwd),
"mcpServers": AnyCodable([Any]()),
]
let result = try await sendRequest(method: "session/new", params: params)
guard let dict = result?.dictValue,
let sessionId = dict["sessionId"] as? String
else {
throw ACPClientError.invalidResponse("Missing sessionId in session/new response")
}
currentSessionId = sessionId
statusMessage = "Session ready"
#if canImport(os)
logger.info("Created new ACP session: \(sessionId)")
#endif
return sessionId
}
public func loadSession(cwd: String, sessionId: String) async throws -> String {
statusMessage = "Loading session \(sessionId.prefix(12))..."
let params: [String: AnyCodable] = [
"cwd": AnyCodable(cwd),
"sessionId": AnyCodable(sessionId),
"mcpServers": AnyCodable([Any]()),
]
let result = try await sendRequest(method: "session/load", params: params)
// ACP returns {} on success (no sessionId echoed), or an error if
// not found. If we got here without throwing, the session was
// loaded use the ID we sent.
let loadedId = (result?.dictValue?["sessionId"] as? String) ?? sessionId
currentSessionId = loadedId
statusMessage = "Session loaded"
#if canImport(os)
logger.info("Loaded ACP session: \(loadedId)")
#endif
return loadedId
}
public func resumeSession(cwd: String, sessionId: String) async throws -> String {
statusMessage = "Resuming session..."
let params: [String: AnyCodable] = [
"cwd": AnyCodable(cwd),
"sessionId": AnyCodable(sessionId),
"mcpServers": AnyCodable([Any]()),
]
let result = try await sendRequest(method: "session/resume", params: params)
guard let dict = result?.dictValue,
let resumedId = dict["sessionId"] as? String
else {
throw ACPClientError.invalidResponse("Missing sessionId in session/resume response")
}
currentSessionId = resumedId
statusMessage = "Session resumed"
#if canImport(os)
logger.info("Resumed ACP session: \(resumedId)")
#endif
return resumedId
}
// MARK: - Messaging
public func sendPrompt(sessionId: String, text: String) async throws -> ACPPromptResult {
statusMessage = "Sending prompt..."
let messageId = UUID().uuidString
let params: [String: AnyCodable] = [
"sessionId": AnyCodable(sessionId),
"messageId": AnyCodable(messageId),
"prompt": AnyCodable([
["type": "text", "text": text] as [String: Any],
] as [Any]),
]
let result = try await sendRequest(method: "session/prompt", params: params)
let dict = result?.dictValue ?? [:]
let usage = dict["usage"] as? [String: Any] ?? [:]
statusMessage = "Ready"
return ACPPromptResult(
stopReason: dict["stopReason"] as? String ?? "end_turn",
inputTokens: usage["inputTokens"] as? Int ?? 0,
outputTokens: usage["outputTokens"] as? Int ?? 0,
thoughtTokens: usage["thoughtTokens"] as? Int ?? 0,
cachedReadTokens: usage["cachedReadTokens"] as? Int ?? 0
)
}
public func cancel(sessionId: String) async throws {
let params: [String: AnyCodable] = [
"sessionId": AnyCodable(sessionId),
]
_ = try await sendRequest(method: "session/cancel", params: params)
statusMessage = "Cancelled"
}
public func respondToPermission(requestId: Int, optionId: String) async {
let response: [String: Any] = [
"jsonrpc": "2.0",
"id": requestId,
"result": [
"outcome": [
"kind": optionId == "deny" ? "rejected" : "allowed",
"optionId": optionId,
] as [String: Any],
] as [String: Any],
]
await writeJSON(response)
}
// MARK: - JSON-RPC Transport
private func sendRequest(method: String, params: [String: AnyCodable]) async throws -> AnyCodable? {
let requestId = nextRequestId
nextRequestId += 1
let request = ACPRequest(id: requestId, method: method, params: params)
guard let data = try? JSONEncoder().encode(request),
let line = String(data: data, encoding: .utf8)
else {
throw ACPClientError.encodingFailed
}
#if canImport(os)
logger.debug("Sending: \(method) (id: \(requestId))")
#endif
// session/prompt streams events and can run for minutes no hard
// timeout. Control messages get a 30s watchdog.
let timeoutTask: Task<Void, Error>? = if method != "session/prompt" {
Task { [weak self] in
try await Task.sleep(nanoseconds: 30 * 1_000_000_000)
await self?.timeoutRequest(id: requestId, method: method)
}
} else {
nil
}
defer { timeoutTask?.cancel() }
guard let ch = channel else {
throw ACPClientError.notConnected
}
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<AnyCodable?, Error>) in
pendingRequests[requestId] = continuation
// Write in a detached task so the actor can process incoming
// response messages while we're awaiting the send. The
// continuation is already stored; the response arrives via
// the read loop.
Task.detached { [weak self] in
do {
try await ch.send(line)
} catch {
await self?.handleWriteFailedForRequest(id: requestId)
}
}
}
}
private func timeoutRequest(id: Int, method: String) {
guard let continuation = pendingRequests.removeValue(forKey: id) else { return }
#if canImport(os)
logger.error("Request timed out: \(method) (id: \(id))")
#endif
statusMessage = "Request timed out"
continuation.resume(throwing: ACPClientError.requestTimeout(method: method))
}
private func writeJSON(_ dict: [String: Any]) async {
guard let ch = channel,
let data = try? JSONSerialization.data(withJSONObject: dict),
let line = String(data: data, encoding: .utf8)
else { return }
do {
try await ch.send(line)
} catch {
await handleWriteFailed()
}
}
// MARK: - Read Loops
private func startReadLoops(channel ch: any ACPChannel) {
// Consume incoming JSON-RPC lines from the channel.
readTask = Task { [weak self] in
do {
for try await line in ch.incoming {
guard let data = line.data(using: .utf8) else { continue }
do {
let message = try JSONDecoder().decode(ACPRawMessage.self, from: data)
await self?.handleMessage(message)
} catch {
#if canImport(os)
await self?.logParseFailure(error, line: line)
#endif
}
}
await self?.handleReadLoopEnded(cleanly: true)
} catch {
await self?.handleReadLoopEnded(cleanly: false, error: error)
}
}
// Mirror stderr into the diagnostic ring buffer.
stderrTask = Task { [weak self] in
do {
for try await text in ch.stderr {
await self?.appendStderr(text)
#if canImport(os)
await self?.logStderrLine(text)
#endif
}
} catch {
// Stderr errors don't matter we already handle EOF on
// the incoming stream.
}
}
}
#if canImport(os)
private func logParseFailure(_ error: Error, line: String) {
logger.warning("Failed to decode ACP message: \(error.localizedDescription)")
}
private func logStderrLine(_ text: String) {
logger.info("ACP stderr: \(text.prefix(500))")
}
#endif
private func handleMessage(_ message: ACPRawMessage) {
if message.isResponse {
if let requestId = message.id,
let continuation = pendingRequests.removeValue(forKey: requestId) {
if let error = message.error {
#if canImport(os)
logger.error("ACP RPC error (id: \(requestId)): \(error.message)")
#endif
statusMessage = "Error: \(error.message)"
continuation.resume(throwing: ACPClientError.rpcError(code: error.code, message: error.message))
} else {
#if canImport(os)
logger.debug("ACP response (id: \(requestId))")
#endif
continuation.resume(returning: message.result)
}
} else {
#if canImport(os)
logger.warning("ACP response for unknown request id: \(message.id ?? -1)")
#endif
}
} else if message.isNotification {
if let event = ACPEventParser.parse(notification: message) {
eventContinuation?.yield(event)
}
} else if message.isRequest {
if message.method == "session/request_permission",
let event = ACPEventParser.parsePermissionRequest(message) {
statusMessage = "Permission required"
eventContinuation?.yield(event)
}
}
}
// MARK: - Disconnect Cleanup
/// Single idempotent cleanup path for all disconnect scenarios.
private func performDisconnectCleanup(reason: String) {
guard isConnected else { return }
#if canImport(os)
logger.warning("ACP disconnecting: \(reason)")
#endif
isConnected = false
statusMessage = "Connection lost"
for (_, continuation) in pendingRequests {
continuation.resume(throwing: ACPClientError.processTerminated)
}
pendingRequests.removeAll()
eventContinuation?.finish()
eventContinuation = nil
}
private func handleReadLoopEnded(cleanly: Bool, error: Error? = nil) {
let reason = cleanly ? "read loop ended (EOF)" : "read loop failed: \(error?.localizedDescription ?? "unknown")"
performDisconnectCleanup(reason: reason)
}
private func handleWriteFailed() {
performDisconnectCleanup(reason: "write failed (broken pipe)")
}
private func handleWriteFailedForRequest(id: Int) {
if let continuation = pendingRequests.removeValue(forKey: id) {
continuation.resume(throwing: ACPClientError.processTerminated)
}
performDisconnectCleanup(reason: "write failed (broken pipe)")
}
}
// MARK: - Errors
public enum ACPClientError: Error, LocalizedError {
case notConnected
case encodingFailed
case invalidResponse(String)
case rpcError(code: Int, message: String)
case processTerminated
case requestTimeout(method: String)
public var errorDescription: String? {
switch self {
case .notConnected: return "ACP client is not connected"
case .encodingFailed: return "Failed to encode JSON-RPC request"
case .invalidResponse(let msg): return "Invalid ACP response: \(msg)"
case .rpcError(let code, let msg): return "ACP error \(code): \(msg)"
case .processTerminated: return "ACP process terminated unexpectedly"
case .requestTimeout(let method): return "ACP request '\(method)' timed out"
}
}
}
/// Maps a raw error message (RPC message or captured stderr) to a short
/// human-readable hint for the chat UI. Pattern-matches the most common
/// fresh-install failure modes. Returns nil when no known pattern matches.
public enum ACPErrorHint {
public static func classify(errorMessage: String, stderrTail: String) -> String? {
let haystack = errorMessage + "\n" + stderrTail
if haystack.range(of: #"No\s+(Anthropic|OpenAI|OpenRouter|Gemini|Google|Groq|Mistral|XAI)?\s*credentials\s+found"#,
options: .regularExpression) != nil
|| haystack.contains("ANTHROPIC_API_KEY")
|| haystack.contains("ANTHROPIC_TOKEN")
|| haystack.contains("claude setup-token")
|| haystack.contains("claude /login") {
return "Hermes can't find your AI provider credentials. Set `ANTHROPIC_API_KEY` (or similar) in `~/.hermes/.env` or your shell profile, then restart Scarf."
}
if let match = haystack.range(of: #"No such file or directory:\s*'([^']+)'"#,
options: .regularExpression) {
let matched = String(haystack[match])
if let nameStart = matched.range(of: "'"),
let nameEnd = matched.range(of: "'", range: nameStart.upperBound..<matched.endIndex) {
let name = String(matched[nameStart.upperBound..<nameEnd.lowerBound])
return "Hermes couldn't find `\(name)` on PATH. If you use nvm/asdf/mise, make sure it's exported in `~/.zprofile` (not only `~/.zshrc`), then restart Scarf."
}
return "Hermes couldn't find a required binary on PATH. Check that your shell's PATH is exported in `~/.zprofile`, then restart Scarf."
}
if haystack.localizedCaseInsensitiveContains("rate limit")
|| haystack.localizedCaseInsensitiveContains("429") {
return "Your AI provider returned a rate-limit error. Try again in a moment."
}
return nil
}
}
@@ -0,0 +1,253 @@
// iOS can't spawn subprocesses (no `Process`, sandboxed away from fork/exec).
// Everything below only makes sense on platforms that can macOS and Linux.
// iOS gets its ACP transport from a future `SSHExecACPChannel` (Citadel)
// landing in M4.
#if !os(iOS)
import Foundation
/// `ACPChannel` backed by a `Foundation.Process` spawning `hermes acp`
/// (local) or `ssh -T host -- hermes acp` (remote, via
/// `SSHTransport.makeProcess`). Owns the process lifecycle, stdin/stdout
/// pipes, and a small ring-buffered stderr capture for diagnostics.
///
/// The per-call `send(_:)` path uses raw POSIX `write(2)` instead of
/// `FileHandle.write` `FileHandle.write` crashes the whole app on
/// EPIPE (broken pipe) rather than throwing, so the original ACPClient
/// installed a `SIGPIPE` handler and a POSIX-write helper. That logic
/// moves here intact.
public actor ProcessACPChannel: ACPChannel {
private let process: Process
private let stdinPipe: Pipe
private let stdoutPipe: Pipe
private let stderrPipe: Pipe
/// Cached raw file descriptor for the stdin write end. Captured on
/// init because `Process.standardInput` gets nilled after `close()`.
private let stdinFd: Int32
private let incomingContinuation: AsyncThrowingStream<String, Error>.Continuation
/// Retain the stream callers get it lazily; we stash it here so the
/// continuation doesn't outlive its producer.
public nonisolated let incoming: AsyncThrowingStream<String, Error>
private let stderrContinuation: AsyncThrowingStream<String, Error>.Continuation
public nonisolated let stderr: AsyncThrowingStream<String, Error>
private var isClosed = false
private var readerTask: Task<Void, Never>?
private var stderrTask: Task<Void, Never>?
/// The subprocess's PID as a human-readable string.
public var diagnosticID: String? {
"pid=\(process.processIdentifier)"
}
/// Spawn `executable` with `args`, wiring its stdin/stdout/stderr into
/// this channel. `env` is passed verbatim to the subprocess (callers
/// are responsible for running it through whatever enrichment they
/// need this layer doesn't know about `SSH_AUTH_SOCK` or PATH).
///
/// For remote contexts, the Mac caller passes a pre-configured
/// `Process` via `init(process:)` below `SSHTransport.makeProcess`
/// already set up the ssh argv.
public init(
executable: String,
args: [String],
env: [String: String]
) async throws {
let proc = Process()
proc.executableURL = URL(fileURLWithPath: executable)
proc.arguments = args
proc.environment = env
try await Self.launch(process: proc, self_: nil)
try Self.ignoreSIGPIPE_once()
self.process = proc
self.stdinPipe = proc.standardInput as! Pipe
self.stdoutPipe = proc.standardOutput as! Pipe
self.stderrPipe = proc.standardError as! Pipe
self.stdinFd = stdinPipe.fileHandleForWriting.fileDescriptor
let (inStream, inContinuation) = AsyncThrowingStream<String, Error>.makeStream()
self.incoming = inStream
self.incomingContinuation = inContinuation
let (errStream, errContinuation) = AsyncThrowingStream<String, Error>.makeStream()
self.stderr = errStream
self.stderrContinuation = errContinuation
await startReaders()
}
/// Secondary entry point for callers that have a pre-configured
/// `Process` (typically from `SSHTransport.makeProcess`). The process
/// must NOT already be running this initializer calls `run()`.
public init(process: Process) async throws {
try await Self.launch(process: process, self_: nil)
try Self.ignoreSIGPIPE_once()
self.process = process
self.stdinPipe = process.standardInput as! Pipe
self.stdoutPipe = process.standardOutput as! Pipe
self.stderrPipe = process.standardError as! Pipe
self.stdinFd = stdinPipe.fileHandleForWriting.fileDescriptor
let (inStream, inContinuation) = AsyncThrowingStream<String, Error>.makeStream()
self.incoming = inStream
self.incomingContinuation = inContinuation
let (errStream, errContinuation) = AsyncThrowingStream<String, Error>.makeStream()
self.stderr = errStream
self.stderrContinuation = errContinuation
await startReaders()
}
/// Wire fresh stdin/stdout/stderr pipes (overwriting any the caller
/// set) and start the subprocess. `self_` is unused today the
/// placeholder keeps the signature ready for a future hook that
/// captures termination in `proc.terminationHandler` and routes it
/// into the channel's actor state.
private static func launch(process: Process, self_: Any?) async throws {
process.standardInput = Pipe()
process.standardOutput = Pipe()
process.standardError = Pipe()
do {
try process.run()
} catch {
throw ACPChannelError.launchFailed(error.localizedDescription)
}
}
/// Ignore SIGPIPE once per process so a broken-pipe write returns
/// `EPIPE` (which we surface as `.writeEndClosed`) instead of
/// delivering SIGPIPE and tearing the app down. Idempotent; the
/// kernel is fine with repeated `SIG_IGN` installs.
nonisolated private static func ignoreSIGPIPE_once() throws {
signal(SIGPIPE, SIG_IGN)
}
// MARK: - Send
public func send(_ line: String) async throws {
guard !isClosed else { throw ACPChannelError.writeEndClosed }
guard var data = line.data(using: .utf8) else {
throw ACPChannelError.invalidEncoding
}
data.append(0x0A) // '\n'
let fd = stdinFd
// POSIX write, looping on partial writes and surfacing EPIPE as
// `.writeEndClosed`. Crucial: `FileHandle.write(_:)` crashes the
// app on EPIPE rather than throwing; the original ACPClient used
// this same `Darwin.write` (or `Glibc.write` on Linux) technique.
let ok = Self.safeWrite(fd: fd, data: data)
if !ok {
throw ACPChannelError.writeEndClosed
}
}
nonisolated private static func safeWrite(fd: Int32, data: Data) -> Bool {
data.withUnsafeBytes { buf in
guard let base = buf.baseAddress else { return false }
var written = 0
let total = buf.count
while written < total {
#if canImport(Darwin)
let result = Darwin.write(fd, base.advanced(by: written), total - written)
#elseif canImport(Glibc)
let result = Glibc.write(fd, base.advanced(by: written), total - written)
#else
return false
#endif
if result <= 0 { return false }
written += result
}
return true
}
}
// MARK: - Close
public func close() async {
guard !isClosed else { return }
isClosed = true
// Close stdin so the child sees EOF and can flush. readerTask
// will see the pipe close and finish naturally.
stdinPipe.fileHandleForWriting.closeFile()
if process.isRunning {
// SIGINT for graceful Python shutdown raises KeyboardInterrupt
// cleanly instead of aborting in the middle of a JSON write.
process.interrupt()
// Watchdog: force-kill if still running after 2s. A stuck
// child shouldn't keep the app's close() hanging.
let watchdog = process
Task.detached {
try? await Task.sleep(nanoseconds: 2_000_000_000)
if watchdog.isRunning { watchdog.terminate() }
}
}
stdinPipe.fileHandleForReading.closeFile()
stdoutPipe.fileHandleForReading.closeFile()
stderrPipe.fileHandleForReading.closeFile()
readerTask?.cancel()
stderrTask?.cancel()
incomingContinuation.finish()
stderrContinuation.finish()
}
// MARK: - Reader loops
private func startReaders() {
let outHandle = stdoutPipe.fileHandleForReading
let errHandle = stderrPipe.fileHandleForReading
let inCont = incomingContinuation
let errCont = stderrContinuation
readerTask = Task.detached {
var buffer = Data()
while !Task.isCancelled {
let chunk = outHandle.availableData
if chunk.isEmpty { break } // EOF
buffer.append(chunk)
while let nl = buffer.firstIndex(of: 0x0A) {
let lineData = Data(buffer[buffer.startIndex..<nl])
buffer = Data(buffer[buffer.index(after: nl)...])
guard !lineData.isEmpty else { continue }
if let text = String(data: lineData, encoding: .utf8) {
inCont.yield(text)
} else {
inCont.finish(throwing: ACPChannelError.invalidEncoding)
return
}
}
}
inCont.finish()
}
stderrTask = Task.detached {
var buffer = Data()
while !Task.isCancelled {
let chunk = errHandle.availableData
if chunk.isEmpty { break }
buffer.append(chunk)
while let nl = buffer.firstIndex(of: 0x0A) {
let lineData = Data(buffer[buffer.startIndex..<nl])
buffer = Data(buffer[buffer.index(after: nl)...])
guard !lineData.isEmpty else { continue }
if let text = String(data: lineData, encoding: .utf8) {
errCont.yield(text)
}
// Non-UTF-8 stderr lines are dropped silently;
// we're not going to crash the channel over a
// weird byte in a log line.
}
}
errCont.finish()
}
}
}
#endif // !os(iOS)
@@ -0,0 +1,328 @@
import Testing
import Foundation
@testable import ScarfCore
/// Exercises M1's `ACPChannel` abstraction and the refactored
/// `ACPClient`. Uses a `MockACPChannel` to script JSON-RPC responses
/// deterministically no subprocess, no SSH, no timing flakiness.
///
/// `ProcessACPChannel` itself isn't exercised here because spawning a
/// real `hermes acp` subprocess in CI would be brittle; the channel's
/// POSIX-write / pipe-framing behaviour is covered on the Mac side
/// during smoke-run testing.
@Suite struct M1ACPTests {
// MARK: - Mock
/// In-memory `ACPChannel` for tests. Send queue captures outgoing
/// lines so tests can assert what ACPClient wrote; `reply(with:)`
/// / `emit(event:)` script incoming JSON-RPC responses /
/// notifications; `simulateClose()` closes both streams.
actor MockACPChannel: ACPChannel {
nonisolated let incoming: AsyncThrowingStream<String, Error>
nonisolated let stderr: AsyncThrowingStream<String, Error>
private let incomingCont: AsyncThrowingStream<String, Error>.Continuation
private let stderrCont: AsyncThrowingStream<String, Error>.Continuation
private(set) var sent: [String] = []
private(set) var closed = false
public var diagnosticID: String? { "mock-channel" }
init() {
let (inStream, inCont) = AsyncThrowingStream<String, Error>.makeStream()
let (errStream, errCont) = AsyncThrowingStream<String, Error>.makeStream()
self.incoming = inStream
self.incomingCont = inCont
self.stderr = errStream
self.stderrCont = errCont
}
func send(_ line: String) async throws {
if closed { throw ACPChannelError.writeEndClosed }
sent.append(line)
}
func close() async {
guard !closed else { return }
closed = true
incomingCont.finish()
stderrCont.finish()
}
// Test-only scripting entry points.
func reply(with line: String) {
incomingCont.yield(line)
}
func emitStderr(_ line: String) {
stderrCont.yield(line)
}
func simulateEOF() {
incomingCont.finish()
}
func simulateError(_ error: Error) {
incomingCont.finish(throwing: error)
}
func lastSentRequestId() -> Int? {
// Pull the last sent line, decode as JSON-RPC, return id.
guard let last = sent.last,
let data = last.data(using: .utf8),
let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any]
else { return nil }
return obj["id"] as? Int
}
}
// MARK: - ACPChannel protocol basics
@Test func channelMockBasicSendReceive() async throws {
let ch = MockACPChannel()
try await ch.send(#"{"jsonrpc":"2.0","method":"ping"}"#)
let sent = await ch.sent
#expect(sent.count == 1)
await ch.reply(with: #"{"jsonrpc":"2.0","result":{}}"#)
// Drain one incoming line to prove the stream works.
var iterator = ch.incoming.makeAsyncIterator()
let first = try await iterator.next()
#expect(first == #"{"jsonrpc":"2.0","result":{}}"#)
}
@Test func channelWriteFailsAfterClose() async {
let ch = MockACPChannel()
await ch.close()
do {
try await ch.send("should fail")
Issue.record("expected writeEndClosed error")
} catch let error as ACPChannelError {
if case .writeEndClosed = error {} else {
Issue.record("expected .writeEndClosed, got \(error)")
}
} catch {
Issue.record("unexpected error: \(error)")
}
}
@Test func channelErrorDescriptions() {
#expect(ACPChannelError.closed(exitCode: 2).errorDescription?.contains("exit 2") == true)
#expect(ACPChannelError.writeEndClosed.errorDescription?.contains("closed") == true)
#expect(ACPChannelError.invalidEncoding.errorDescription?.contains("UTF-8") == true)
#expect(ACPChannelError.launchFailed("nope").errorDescription?.contains("nope") == true)
#expect(ACPChannelError.other("x").errorDescription == "x")
}
// MARK: - ACPClient state machine
/// Build an ACPClient wired to the mock and kick off `start()`.
/// Returns `(client, mock, startTask)` `startTask` is pending
/// until the mock replies to the initialize request.
@MainActor
private func buildClientWithMock() async -> (ACPClient, MockACPChannel, Task<Void, Error>) {
let mock = MockACPChannel()
let client = ACPClient(context: .local) { _ in mock }
let startTask = Task {
try await client.start()
}
return (client, mock, startTask)
}
@Test @MainActor func clientInitiallyDisconnected() async {
let mock = MockACPChannel()
let client = ACPClient(context: .local) { _ in mock }
let connected = await client.isConnected
let healthy = await client.isHealthy
#expect(connected == false)
#expect(healthy == false)
}
@Test @MainActor func clientStartSendsInitializeAndSetsConnected() async throws {
let (client, mock, startTask) = await buildClientWithMock()
// Wait until the client has sent the initialize request.
try await waitFor { await mock.sent.count >= 1 }
let first = await mock.sent[0]
#expect(first.contains(#""method":"initialize""#))
// Reply to that initialize.
let id = await mock.lastSentRequestId() ?? 1
await mock.reply(with: #"{"jsonrpc":"2.0","id":\#(id),"result":{}}"#)
try await startTask.value
let connected = await client.isConnected
#expect(connected == true)
let status = await client.statusMessage
#expect(status == "Connected")
await client.stop()
}
@Test @MainActor func clientRpcErrorIsSurfaced() async throws {
let (client, mock, startTask) = await buildClientWithMock()
try await waitFor { await mock.sent.count >= 1 }
let id = await mock.lastSentRequestId() ?? 1
await mock.reply(with: #"{"jsonrpc":"2.0","id":\#(id),"error":{"code":-32601,"message":"method not found"}}"#)
do {
try await startTask.value
Issue.record("expected start() to throw")
} catch let error as ACPClientError {
if case .rpcError(let code, let msg) = error {
#expect(code == -32601)
#expect(msg.contains("method not found"))
} else {
Issue.record("expected .rpcError, got \(error)")
}
}
await client.stop()
}
@Test @MainActor func clientChannelCloseSurfacesAsProcessTerminated() async throws {
let (client, mock, startTask) = await buildClientWithMock()
try await waitFor { await mock.sent.count >= 1 }
let id = await mock.lastSentRequestId() ?? 1
await mock.reply(with: #"{"jsonrpc":"2.0","id":\#(id),"result":{}}"#)
try await startTask.value
// Client is connected. Issue a session/new; before the mock
// replies, close the channel. The pending request should
// resolve with `.processTerminated`.
let sessionTask = Task {
try await client.newSession(cwd: "/tmp")
}
try await waitFor { await mock.sent.count >= 2 }
await mock.simulateEOF()
do {
_ = try await sessionTask.value
Issue.record("expected session/new to throw")
} catch let error as ACPClientError {
if case .processTerminated = error {} else {
Issue.record("expected .processTerminated, got \(error)")
}
}
let connected = await client.isConnected
#expect(connected == false)
await client.stop()
}
@Test @MainActor func clientRoutesSessionUpdateNotificationToEventStream() async throws {
let (client, mock, startTask) = await buildClientWithMock()
try await waitFor { await mock.sent.count >= 1 }
let id = await mock.lastSentRequestId() ?? 1
await mock.reply(with: #"{"jsonrpc":"2.0","id":\#(id),"result":{}}"#)
try await startTask.value
// Start event consumption.
let eventTask = Task { () -> ACPEvent? in
var it = await client.events.makeAsyncIterator()
return await it.next()
}
// Emit a session/update notification for an agent_message_chunk.
let notification = #"{"jsonrpc":"2.0","method":"session/update","params":{"sessionId":"s1","update":{"sessionUpdate":"agent_message_chunk","content":{"text":"hello"}}}}"#
await mock.reply(with: notification)
let event = try await withTimeout(seconds: 2) {
await eventTask.value
}
guard case .messageChunk(let sid, let text) = event else {
Issue.record("expected .messageChunk, got \(String(describing: event))")
return
}
#expect(sid == "s1")
#expect(text == "hello")
await client.stop()
}
@Test @MainActor func clientStderrFeedsRecentStderrRingBuffer() async throws {
let (client, mock, startTask) = await buildClientWithMock()
try await waitFor { await mock.sent.count >= 1 }
let id = await mock.lastSentRequestId() ?? 1
await mock.reply(with: #"{"jsonrpc":"2.0","id":\#(id),"result":{}}"#)
try await startTask.value
await mock.emitStderr("WARNING: something")
await mock.emitStderr("ERROR: boom")
// Wait for the read loop to drain.
try await waitFor { await client.recentStderr.contains("boom") }
let tail = await client.recentStderr
#expect(tail.contains("WARNING: something"))
#expect(tail.contains("ERROR: boom"))
await client.stop()
}
// MARK: - ACPErrorHint
@Test func errorHintsClassifyCommonFailures() {
let noCreds = ACPErrorHint.classify(
errorMessage: "No Anthropic credentials found",
stderrTail: ""
)
#expect(noCreds?.contains("ANTHROPIC_API_KEY") == true)
let missingBinary = ACPErrorHint.classify(
errorMessage: "",
stderrTail: "No such file or directory: 'npx'"
)
#expect(missingBinary?.contains("npx") == true)
let rateLimit = ACPErrorHint.classify(
errorMessage: "",
stderrTail: "HTTP 429 Too Many Requests: rate limit"
)
#expect(rateLimit?.contains("rate-limit") == true)
let unknown = ACPErrorHint.classify(
errorMessage: "weird thing",
stderrTail: "other weird thing"
)
#expect(unknown == nil)
}
// MARK: - Helpers
/// Poll `predicate` every ~20ms up to `timeout` seconds. Fails if
/// the condition never becomes true. Used to bridge between
/// ACPClient's detached tasks (send loops, read loop, etc.) and
/// the synchronous test assertions without leaning on Thread.sleep.
private func waitFor(
timeout: TimeInterval = 2.0,
_ predicate: @escaping @Sendable () async -> Bool
) async throws {
let deadline = Date().addingTimeInterval(timeout)
while Date() < deadline {
if await predicate() { return }
try await Task.sleep(nanoseconds: 20_000_000)
}
Issue.record("waitFor timed out after \(timeout)s")
}
/// Run `op` with an awaited timeout if it doesn't finish in time,
/// record an Issue and return `op`'s pending value (cancellation
/// lets the test fail cleanly rather than hang CI).
private func withTimeout<T: Sendable>(
seconds: TimeInterval,
_ op: @escaping @Sendable () async -> T
) async throws -> T {
try await withThrowingTaskGroup(of: T?.self) { group in
group.addTask { await op() }
group.addTask {
try await Task.sleep(nanoseconds: UInt64(seconds * 1_000_000_000))
return nil
}
let first = try await group.next()
group.cancelAll()
guard let result = first, let value = result else {
throw ACPChannelError.other("withTimeout timed out after \(seconds)s")
}
return value
}
}
}