fix(transport,widgets): code-review fixes for v2.7 + iOS Citadel transport

- CronStatusWidgetView: include jobId + lineCount in `.task(id:)` so widget reload fires when dashboard.json changes either field, not only when the file watcher ticks
- CitadelServerTransport.runScript: enforce the timeout via withThrowingTaskGroup race; propagate transport-level Citadel errors as TransportError.other (so RemoteSQLiteBackend.query maps them to BackendError.transport instead of misclassifying as BackendError.sqlite via a fake -1 exit code); throw TransportError.timeout on the deadline branch with partial stdout preserved
- SSHScriptRunner: close fileHandleForReading on stdout/stderr Pipes in the timeout branch (success path already did); check Task.isCancelled inside the busy-wait so a cancelled parent task terminates the subprocess early instead of waiting out the full timeout. Both runOverSSH and runLocally fixed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alan Wizemann
2026-05-04 21:40:07 +02:00
parent c7bcfd8655
commit 272da6a915
3 changed files with 76 additions and 20 deletions
@@ -124,10 +124,22 @@ public enum SSHScriptRunner {
let deadline = Date().addingTimeInterval(timeout) let deadline = Date().addingTimeInterval(timeout)
while proc.isRunning && Date() < deadline { while proc.isRunning && Date() < deadline {
if Task.isCancelled {
proc.terminate()
try? stdoutPipe.fileHandleForReading.close()
try? stderrPipe.fileHandleForReading.close()
return .connectFailure("Script cancelled")
}
try? await Task.sleep(nanoseconds: 100_000_000) try? await Task.sleep(nanoseconds: 100_000_000)
} }
if proc.isRunning { if proc.isRunning {
proc.terminate() proc.terminate()
// Pipe fds leak otherwise closing on the timeout branch
// matches the success-path discipline (see CLAUDE.md
// "Always close both fileHandleForReading and
// fileHandleForWriting on Pipe objects").
try? stdoutPipe.fileHandleForReading.close()
try? stderrPipe.fileHandleForReading.close()
return .connectFailure("Script timed out after \(Int(timeout))s") return .connectFailure("Script timed out after \(Int(timeout))s")
} }
let out = (try? stdoutPipe.fileHandleForReading.readToEnd()) ?? Data() let out = (try? stdoutPipe.fileHandleForReading.readToEnd()) ?? Data()
@@ -162,10 +174,18 @@ public enum SSHScriptRunner {
} }
let deadline = Date().addingTimeInterval(timeout) let deadline = Date().addingTimeInterval(timeout)
while proc.isRunning && Date() < deadline { while proc.isRunning && Date() < deadline {
if Task.isCancelled {
proc.terminate()
try? stdoutPipe.fileHandleForReading.close()
try? stderrPipe.fileHandleForReading.close()
return .connectFailure("Script cancelled")
}
try? await Task.sleep(nanoseconds: 100_000_000) try? await Task.sleep(nanoseconds: 100_000_000)
} }
if proc.isRunning { if proc.isRunning {
proc.terminate() proc.terminate()
try? stdoutPipe.fileHandleForReading.close()
try? stderrPipe.fileHandleForReading.close()
return .connectFailure("Script timed out after \(Int(timeout))s") return .connectFailure("Script timed out after \(Int(timeout))s")
} }
let out = (try? stdoutPipe.fileHandleForReading.readToEnd()) ?? Data() let out = (try? stdoutPipe.fileHandleForReading.readToEnd()) ?? Data()
@@ -199,11 +199,18 @@ public final class CitadelServerTransport: ServerTransport, @unchecked Sendable
} catch { } catch {
throw TransportError.other(message: "Failed to start exec stream: \(error.localizedDescription)") throw TransportError.other(message: "Failed to start exec stream: \(error.localizedDescription)")
} }
// Drain in a child task and race against a sleep so a wedged remote
// sqlite3 (or a mid-stream Citadel transport failure) can't hang the
// caller indefinitely. Mirrors the busy-wait deadline that
// SSHScriptRunner enforces on Mac.
return try await withThrowingTaskGroup(of: ProcessResult?.self) { group in
group.addTask {
var stdout = Data() var stdout = Data()
var stderr = Data() var stderr = Data()
var exitCode: Int32 = 0 var exitCode: Int32 = 0
do { do {
for try await chunk in stream { for try await chunk in stream {
try Task.checkCancellation()
switch chunk { switch chunk {
case .stdout(var buf): case .stdout(var buf):
if let s = buf.readString(length: buf.readableBytes) { if let s = buf.readString(length: buf.readableBytes) {
@@ -216,13 +223,42 @@ public final class CitadelServerTransport: ServerTransport, @unchecked Sendable
} }
} }
} catch let failed as SSHClient.CommandFailed { } catch let failed as SSHClient.CommandFailed {
// Genuine remote non-zero exit surface as
// ProcessResult so the caller's existing exit-code
// handling fires (mapped to BackendError.sqlite by
// RemoteSQLiteBackend).
exitCode = Int32(failed.exitCode) exitCode = Int32(failed.exitCode)
} catch is CancellationError {
throw TransportError.timeout(seconds: timeout, partialStdout: stdout)
} catch { } catch {
stderr.append(Data(error.localizedDescription.utf8)) // Transport-level failure (host unreachable, channel
exitCode = -1 // dropped, ControlMaster died, NIO read error). Throw
// as a typed TransportError so RemoteSQLiteBackend
// routes it to BackendError.transport rather than
// misclassifying as a sqlite crash via a fake -1 exit.
throw TransportError.other(
message: "SSH stream failed: \(error.localizedDescription)"
)
} }
return ProcessResult(exitCode: exitCode, stdout: stdout, stderr: stderr) return ProcessResult(exitCode: exitCode, stdout: stdout, stderr: stderr)
} }
group.addTask {
try await Task.sleep(nanoseconds: UInt64(timeout * 1_000_000_000))
return nil
}
guard let first = try await group.next() else {
group.cancelAll()
throw TransportError.other(message: "SSH stream produced no result")
}
group.cancelAll()
if let result = first {
return result
}
// Timeout fired first drain task gets cancelled by the
// group cancel above; surface as a typed timeout.
throw TransportError.timeout(seconds: timeout, partialStdout: Data())
}
}
// MARK: - ServerTransport: watching // MARK: - ServerTransport: watching
@@ -38,7 +38,7 @@ struct CronStatusWidgetView: View {
) )
} }
} }
.task(id: fileWatcher.lastChangeDate) { .task(id: "\(jobId ?? "")|\(lineCount)|\(fileWatcher.lastChangeDate.timeIntervalSince1970)") {
await reload() await reload()
} }
} }