mirror of
https://github.com/awizemann/scarf.git
synced 2026-05-10 10:36:35 +00:00
fix(transport): drain ssh stdout/stderr concurrently to unwedge >64KB payloads
Issue #77 — Sessions screen rendered empty even though Dashboard
reported 161 sessions and Activity reported 116. Root cause was a
classic pipe-buffer deadlock in SSHScriptRunner: stdout was read via
`readToEnd()` AFTER the subprocess had exited. macOS pipes default to
a 16–64 KB kernel buffer; once the remote `sqlite3 -json` script wrote
more than that to its stdout, ssh back-pressured across the wire,
sshd back-pressured sqlite3, sqlite3 blocked, the script never
finished, the 30-second timeout fired, `streamScript` threw, and
`HermesDataService.sessionListSnapshot()` swallowed the failure into
an empty array. Empty Sessions list. Dashboard kept working because
its smaller LIMIT 5 payload fit under the threshold.
Why this was a v2.7 regression specifically: 20cc3a2 folded the
previously-separate sessions + previews queries into a single batched
round-trip (perf win for remote users). The new combined payload for
~150+ sessions crossed the buffer threshold for the first time.
Fix: drain stdout/stderr concurrently with the running process via
Foundation's `FileHandle.readabilityHandler`, accumulating chunks
into an NSLock-guarded `Data` buffer. The kernel pipe never fills,
the subprocess never blocks, the script returns the full payload.
Same change applied to both the SSH path (`runOverSSH`) and the
local path (`runLocally`) — they had identical bug shapes.
Adds SSHScriptRunnerTests with three regression checks: a 256 KB
synthetic payload that would have wedged pre-fix, a small-payload
sanity round-trip, and a non-zero exit propagation check.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -49,6 +49,34 @@ public enum SSHScriptRunner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Lock-protected `Data` accumulator used by the stdout/stderr
|
||||||
|
/// readability handlers below. Two of these per script run, one per
|
||||||
|
/// stream. `@unchecked Sendable` because mutation goes through the
|
||||||
|
/// `NSLock` — Swift can't see that.
|
||||||
|
///
|
||||||
|
/// Why this exists (issue #77): the previous implementation read
|
||||||
|
/// stdout/stderr via `readToEnd()` *after* the subprocess exited.
|
||||||
|
/// On macOS pipes default to a 16–64 KB kernel buffer; once
|
||||||
|
/// `sqlite3 -json` writes more than that, the SSH client back-
|
||||||
|
/// pressures over the wire, the remote sqlite3 blocks, the script
|
||||||
|
/// never finishes, the 30 s timeout fires, and the caller sees
|
||||||
|
/// "Script timed out" + an empty result set. v2.7's
|
||||||
|
/// `sessionListSnapshot(limit: 500)` crossed that threshold for
|
||||||
|
/// any user with ~150+ sessions. Draining concurrently with
|
||||||
|
/// `readabilityHandler` removes the back-pressure.
|
||||||
|
private final class LockedData: @unchecked Sendable {
|
||||||
|
private let lock = NSLock()
|
||||||
|
private var buf = Data()
|
||||||
|
func append(_ chunk: Data) {
|
||||||
|
lock.lock(); defer { lock.unlock() }
|
||||||
|
buf.append(chunk)
|
||||||
|
}
|
||||||
|
func snapshot() -> Data {
|
||||||
|
lock.lock(); defer { lock.unlock() }
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public enum Outcome: Sendable {
|
public enum Outcome: Sendable {
|
||||||
/// Couldn't even reach the remote (process spawn failed,
|
/// Couldn't even reach the remote (process spawn failed,
|
||||||
/// timeout before any output, network refused). Carries the
|
/// timeout before any output, network refused). Carries the
|
||||||
@@ -151,9 +179,35 @@ public enum SSHScriptRunner {
|
|||||||
proc.standardOutput = stdoutPipe
|
proc.standardOutput = stdoutPipe
|
||||||
proc.standardError = stderrPipe
|
proc.standardError = stderrPipe
|
||||||
|
|
||||||
|
// Drain stdout/stderr concurrently with the running process —
|
||||||
|
// see the LockedData docstring above for the issue-#77
|
||||||
|
// back-story. Without these handlers a >64 KB script output
|
||||||
|
// wedges the pipe + ssh + remote sqlite3 chain and the only
|
||||||
|
// visible symptom is a timeout.
|
||||||
|
let outBuf = LockedData()
|
||||||
|
let errBuf = LockedData()
|
||||||
|
stdoutPipe.fileHandleForReading.readabilityHandler = { handle in
|
||||||
|
let chunk = handle.availableData
|
||||||
|
if chunk.isEmpty {
|
||||||
|
handle.readabilityHandler = nil
|
||||||
|
} else {
|
||||||
|
outBuf.append(chunk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stderrPipe.fileHandleForReading.readabilityHandler = { handle in
|
||||||
|
let chunk = handle.availableData
|
||||||
|
if chunk.isEmpty {
|
||||||
|
handle.readabilityHandler = nil
|
||||||
|
} else {
|
||||||
|
errBuf.append(chunk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
do {
|
do {
|
||||||
try proc.run()
|
try proc.run()
|
||||||
} catch {
|
} catch {
|
||||||
|
stdoutPipe.fileHandleForReading.readabilityHandler = nil
|
||||||
|
stderrPipe.fileHandleForReading.readabilityHandler = nil
|
||||||
return .connectFailure("Failed to launch ssh: \(error.localizedDescription)")
|
return .connectFailure("Failed to launch ssh: \(error.localizedDescription)")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -172,6 +226,8 @@ public enum SSHScriptRunner {
|
|||||||
// belt-and-suspenders.
|
// belt-and-suspenders.
|
||||||
if cancelFlag.isCancelled || Task.isCancelled {
|
if cancelFlag.isCancelled || Task.isCancelled {
|
||||||
proc.terminate()
|
proc.terminate()
|
||||||
|
stdoutPipe.fileHandleForReading.readabilityHandler = nil
|
||||||
|
stderrPipe.fileHandleForReading.readabilityHandler = nil
|
||||||
try? stdoutPipe.fileHandleForReading.close()
|
try? stdoutPipe.fileHandleForReading.close()
|
||||||
try? stderrPipe.fileHandleForReading.close()
|
try? stderrPipe.fileHandleForReading.close()
|
||||||
return .connectFailure("Script cancelled")
|
return .connectFailure("Script cancelled")
|
||||||
@@ -180,6 +236,8 @@ public enum SSHScriptRunner {
|
|||||||
}
|
}
|
||||||
if proc.isRunning {
|
if proc.isRunning {
|
||||||
proc.terminate()
|
proc.terminate()
|
||||||
|
stdoutPipe.fileHandleForReading.readabilityHandler = nil
|
||||||
|
stderrPipe.fileHandleForReading.readabilityHandler = nil
|
||||||
// Pipe fds leak otherwise — closing on the timeout branch
|
// Pipe fds leak otherwise — closing on the timeout branch
|
||||||
// matches the success-path discipline (see CLAUDE.md
|
// matches the success-path discipline (see CLAUDE.md
|
||||||
// "Always close both fileHandleForReading and
|
// "Always close both fileHandleForReading and
|
||||||
@@ -188,8 +246,14 @@ public enum SSHScriptRunner {
|
|||||||
try? stderrPipe.fileHandleForReading.close()
|
try? stderrPipe.fileHandleForReading.close()
|
||||||
return .connectFailure("Script timed out after \(Int(timeout))s")
|
return .connectFailure("Script timed out after \(Int(timeout))s")
|
||||||
}
|
}
|
||||||
let out = (try? stdoutPipe.fileHandleForReading.readToEnd()) ?? Data()
|
// Detach the readabilityHandlers and capture whatever the
|
||||||
let err = (try? stderrPipe.fileHandleForReading.readToEnd()) ?? Data()
|
// accumulator has. The handler may have already seen EOF
|
||||||
|
// (`chunk.isEmpty`) and self-cleared, but assigning nil is
|
||||||
|
// idempotent and guards against a late tick from the queue.
|
||||||
|
stdoutPipe.fileHandleForReading.readabilityHandler = nil
|
||||||
|
stderrPipe.fileHandleForReading.readabilityHandler = nil
|
||||||
|
let out = outBuf.snapshot()
|
||||||
|
let err = errBuf.snapshot()
|
||||||
// Best-effort fd close — Pipe leaks fd's otherwise.
|
// Best-effort fd close — Pipe leaks fd's otherwise.
|
||||||
try? stdoutPipe.fileHandleForReading.close()
|
try? stdoutPipe.fileHandleForReading.close()
|
||||||
try? stderrPipe.fileHandleForReading.close()
|
try? stderrPipe.fileHandleForReading.close()
|
||||||
@@ -213,15 +277,43 @@ public enum SSHScriptRunner {
|
|||||||
let stderrPipe = Pipe()
|
let stderrPipe = Pipe()
|
||||||
proc.standardOutput = stdoutPipe
|
proc.standardOutput = stdoutPipe
|
||||||
proc.standardError = stderrPipe
|
proc.standardError = stderrPipe
|
||||||
|
|
||||||
|
// Drain concurrently — same pipe-buffer fix as runOverSSH.
|
||||||
|
// Local scripts can also blow past the 16–64 KB pipe buffer
|
||||||
|
// (e.g. local `sqlite3 -json` over a fat result set) and
|
||||||
|
// would wedge in exactly the same way.
|
||||||
|
let outBuf = LockedData()
|
||||||
|
let errBuf = LockedData()
|
||||||
|
stdoutPipe.fileHandleForReading.readabilityHandler = { handle in
|
||||||
|
let chunk = handle.availableData
|
||||||
|
if chunk.isEmpty {
|
||||||
|
handle.readabilityHandler = nil
|
||||||
|
} else {
|
||||||
|
outBuf.append(chunk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stderrPipe.fileHandleForReading.readabilityHandler = { handle in
|
||||||
|
let chunk = handle.availableData
|
||||||
|
if chunk.isEmpty {
|
||||||
|
handle.readabilityHandler = nil
|
||||||
|
} else {
|
||||||
|
errBuf.append(chunk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
do {
|
do {
|
||||||
try proc.run()
|
try proc.run()
|
||||||
} catch {
|
} catch {
|
||||||
|
stdoutPipe.fileHandleForReading.readabilityHandler = nil
|
||||||
|
stderrPipe.fileHandleForReading.readabilityHandler = nil
|
||||||
return .connectFailure("Failed to launch /bin/sh: \(error.localizedDescription)")
|
return .connectFailure("Failed to launch /bin/sh: \(error.localizedDescription)")
|
||||||
}
|
}
|
||||||
let deadline = Date().addingTimeInterval(timeout)
|
let deadline = Date().addingTimeInterval(timeout)
|
||||||
while proc.isRunning && Date() < deadline {
|
while proc.isRunning && Date() < deadline {
|
||||||
if cancelFlag.isCancelled || Task.isCancelled {
|
if cancelFlag.isCancelled || Task.isCancelled {
|
||||||
proc.terminate()
|
proc.terminate()
|
||||||
|
stdoutPipe.fileHandleForReading.readabilityHandler = nil
|
||||||
|
stderrPipe.fileHandleForReading.readabilityHandler = nil
|
||||||
try? stdoutPipe.fileHandleForReading.close()
|
try? stdoutPipe.fileHandleForReading.close()
|
||||||
try? stderrPipe.fileHandleForReading.close()
|
try? stderrPipe.fileHandleForReading.close()
|
||||||
return .connectFailure("Script cancelled")
|
return .connectFailure("Script cancelled")
|
||||||
@@ -230,12 +322,16 @@ public enum SSHScriptRunner {
|
|||||||
}
|
}
|
||||||
if proc.isRunning {
|
if proc.isRunning {
|
||||||
proc.terminate()
|
proc.terminate()
|
||||||
|
stdoutPipe.fileHandleForReading.readabilityHandler = nil
|
||||||
|
stderrPipe.fileHandleForReading.readabilityHandler = nil
|
||||||
try? stdoutPipe.fileHandleForReading.close()
|
try? stdoutPipe.fileHandleForReading.close()
|
||||||
try? stderrPipe.fileHandleForReading.close()
|
try? stderrPipe.fileHandleForReading.close()
|
||||||
return .connectFailure("Script timed out after \(Int(timeout))s")
|
return .connectFailure("Script timed out after \(Int(timeout))s")
|
||||||
}
|
}
|
||||||
let out = (try? stdoutPipe.fileHandleForReading.readToEnd()) ?? Data()
|
stdoutPipe.fileHandleForReading.readabilityHandler = nil
|
||||||
let err = (try? stderrPipe.fileHandleForReading.readToEnd()) ?? Data()
|
stderrPipe.fileHandleForReading.readabilityHandler = nil
|
||||||
|
let out = outBuf.snapshot()
|
||||||
|
let err = errBuf.snapshot()
|
||||||
try? stdoutPipe.fileHandleForReading.close()
|
try? stdoutPipe.fileHandleForReading.close()
|
||||||
try? stderrPipe.fileHandleForReading.close()
|
try? stderrPipe.fileHandleForReading.close()
|
||||||
return .completed(
|
return .completed(
|
||||||
|
|||||||
@@ -0,0 +1,85 @@
|
|||||||
|
import Testing
|
||||||
|
import Foundation
|
||||||
|
@testable import ScarfCore
|
||||||
|
|
||||||
|
/// Regression tests for `SSHScriptRunner`. Mac-only because the
|
||||||
|
/// implementation relies on `Foundation.Process`, which doesn't exist
|
||||||
|
/// on Swift Linux. Drives the `runLocally` path so we don't need an
|
||||||
|
/// SSH endpoint in CI.
|
||||||
|
#if os(macOS)
|
||||||
|
@Suite struct SSHScriptRunnerTests {
|
||||||
|
|
||||||
|
/// Issue #77 regression. Pre-fix the runner read stdout via
|
||||||
|
/// `readToEnd()` *after* the subprocess exited; once the script's
|
||||||
|
/// output crossed the kernel's pipe buffer (16–64 KB on macOS) the
|
||||||
|
/// process wedged because nothing was draining the read end. The
|
||||||
|
/// only visible symptom was a 30-second timeout and an empty
|
||||||
|
/// result.
|
||||||
|
///
|
||||||
|
/// This script writes ~256 KB of bytes — comfortably past every
|
||||||
|
/// pipe-buffer threshold. With the readabilityHandler drain in
|
||||||
|
/// place the run should complete in well under a second and
|
||||||
|
/// return the full payload.
|
||||||
|
@Test func drainsLargeStdoutWithoutTimeout() async throws {
|
||||||
|
// 256 lines × 1024 bytes/line = 256 KB.
|
||||||
|
let script = """
|
||||||
|
for i in $(seq 1 256); do
|
||||||
|
printf '%04d:' "$i"
|
||||||
|
printf '%.0sx' $(seq 1 1018)
|
||||||
|
printf '\\n'
|
||||||
|
done
|
||||||
|
"""
|
||||||
|
let outcome = await SSHScriptRunner.run(
|
||||||
|
script: script,
|
||||||
|
context: .local,
|
||||||
|
timeout: 10
|
||||||
|
)
|
||||||
|
switch outcome {
|
||||||
|
case .completed(let stdout, _, let exitCode):
|
||||||
|
#expect(exitCode == 0)
|
||||||
|
// 256 lines + final newline.
|
||||||
|
let lines = stdout.split(separator: "\n", omittingEmptySubsequences: false)
|
||||||
|
#expect(lines.count >= 256)
|
||||||
|
#expect(stdout.utf8.count >= 256 * 1024)
|
||||||
|
case .connectFailure(let reason):
|
||||||
|
Issue.record("Expected completion, got connectFailure: \(reason)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sanity check that small scripts still come back the way they
|
||||||
|
/// did before the drain refactor. Guards against an off-by-one in
|
||||||
|
/// the readability handler that swallowed trailing bytes.
|
||||||
|
@Test func smallScriptPayloadRoundTrips() async throws {
|
||||||
|
let outcome = await SSHScriptRunner.run(
|
||||||
|
script: "printf 'hello\\n' && printf 'world\\n' >&2 && exit 0",
|
||||||
|
context: .local,
|
||||||
|
timeout: 5
|
||||||
|
)
|
||||||
|
switch outcome {
|
||||||
|
case .completed(let stdout, let stderr, let exitCode):
|
||||||
|
#expect(exitCode == 0)
|
||||||
|
#expect(stdout == "hello\n")
|
||||||
|
#expect(stderr == "world\n")
|
||||||
|
case .connectFailure(let reason):
|
||||||
|
Issue.record("Expected completion, got connectFailure: \(reason)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Non-zero exit codes should still be reported as `.completed`
|
||||||
|
/// with the captured stdout/stderr — unchanged contract.
|
||||||
|
@Test func nonZeroExitIsReportedAsCompleted() async throws {
|
||||||
|
let outcome = await SSHScriptRunner.run(
|
||||||
|
script: "echo nope >&2 && exit 7",
|
||||||
|
context: .local,
|
||||||
|
timeout: 5
|
||||||
|
)
|
||||||
|
switch outcome {
|
||||||
|
case .completed(_, let stderr, let exitCode):
|
||||||
|
#expect(exitCode == 7)
|
||||||
|
#expect(stderr.contains("nope"))
|
||||||
|
case .connectFailure(let reason):
|
||||||
|
Issue.record("Expected completion, got connectFailure: \(reason)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
Reference in New Issue
Block a user