Commit 2a9e46f

Anton Golub <antongolub@antongolub.com>
2025-07-30 15:52:14
feat: let piping of rejected processes (#1296)
* feat: let piping of rejected processes * test: increase coverage * test: check `pipe()` throws if dest is settled
1 parent 4cd0980
build/core.cjs
@@ -574,17 +574,21 @@ var _ProcessPromise = class _ProcessPromise extends Promise {
           $2.log({ kind: "stderr", data, verbose: !self.isQuiet(), id });
         },
         end: (data, c) => {
-          const { error, status, signal, duration, ctx: { store } } = data;
+          const { error: _error, status, signal: __signal, duration, ctx: { store } } = data;
           const { stdout, stderr } = store;
+          const { cause, exitCode, signal: _signal } = __spreadValues({}, self._breakData);
+          const signal = _signal != null ? _signal : __signal;
+          const code = exitCode != null ? exitCode : status;
+          const error = cause != null ? cause : _error;
           const output = new ProcessOutput({
-            code: status,
+            code,
             signal,
             error,
             duration,
             store,
             from: $2.from
           });
-          $2.log({ kind: "end", signal, exitCode: status, duration, error, verbose: self.isVerbose(), id });
+          $2.log({ kind: "end", signal, exitCode: code, duration, error, verbose: self.isVerbose(), id });
           if (stdout.length && (0, import_util.getLast)((0, import_util.getLast)(stdout)) !== BR_CC) c.on.stdout(EOL, c);
           if (stderr.length && (0, import_util.getLast)((0, import_util.getLast)(stderr)) !== BR_CC) c.on.stderr(EOL, c);
           self.finalize(output);
@@ -593,6 +597,11 @@ var _ProcessPromise = class _ProcessPromise extends Promise {
     });
     return this;
   }
+  break(exitCode, signal, cause) {
+    if (!this.isRunning()) return;
+    this._breakData = { exitCode, signal, cause };
+    this.kill(signal);
+  }
   finalize(output, legacy = false) {
     if (this.isSettled()) return;
     this._output = output;
@@ -609,9 +618,13 @@ var _ProcessPromise = class _ProcessPromise extends Promise {
       if (this.sync) throw output;
     }
   }
+  // prettier-ignore
   _pipe(source, dest, ...args) {
+    if ((0, import_util.isString)(dest))
+      return this._pipe(source, import_node_fs.default.createWriteStream(dest));
     if ((0, import_util.isStringLiteral)(dest, ...args))
-      return this.pipe[source](
+      return this._pipe(
+        source,
         $({
           halt: true,
           signal: this.signal
@@ -619,13 +632,19 @@ var _ProcessPromise = class _ProcessPromise extends Promise {
       );
     this._piped = true;
     const { ee } = this._snapshot;
+    const output = this.output;
     const from = new import_vendor_core2.VoidStream();
     const fill = () => {
       for (const chunk of this._zurk.store[source]) from.write(chunk);
-      return true;
     };
-    const fillEnd = () => this.isSettled() && fill() && from.end();
-    if (!this.isSettled()) {
+    const fillSettled = () => {
+      var _a;
+      if (!output) return;
+      if (!output.ok) (_a = dest.break) == null ? void 0 : _a.call(dest, output.exitCode, output.signal, output.cause);
+      fill();
+      from.end();
+    };
+    if (!output) {
       const onData = (chunk) => from.write(chunk);
       ee.once(source, () => {
         fill();
@@ -635,20 +654,20 @@ var _ProcessPromise = class _ProcessPromise extends Promise {
         from.end();
       });
     }
-    if ((0, import_util.isString)(dest)) dest = import_node_fs.default.createWriteStream(dest);
     if (dest instanceof _ProcessPromise) {
+      if (dest.isSettled()) throw new Fail("Cannot pipe to a settled process.");
       dest._pipedFrom = this;
       if (dest.isHalted() && this.isHalted()) {
         ee.once("start", () => from.pipe(dest.run()._stdin));
       } else {
-        this.catch((e) => !dest.isNothrow() && dest._reject(e));
         from.pipe(dest.run()._stdin);
+        this.catch((e) => dest.break(e.exitCode, e.signal, e.cause));
       }
-      fillEnd();
+      fillSettled();
       return dest;
     }
     from.once("end", () => dest.emit(EPF)).pipe(dest);
-    fillEnd();
+    fillSettled();
     return promisifyStream(dest, this);
   }
   abort(reason) {
build/core.d.ts
@@ -101,13 +101,15 @@ export declare class ProcessPromise extends Promise<ProcessOutput> {
     constructor(executor: PromiseCallback);
     private build;
     run(): ProcessPromise;
+    private _breakData?;
+    private break;
     private finalize;
     pipe: PipeMethod & {
         [key in keyof TSpawnStore]: PipeMethod;
     };
     private _pipe;
     abort(reason?: string): void;
-    kill(signal?: NodeJS.Signals): Promise<void>;
+    kill(signal?: NodeJS.Signals | null): Promise<void>;
     /**
      *  @deprecated Use $({halt: true})`cmd` instead.
      */
build/vendor-core.d.ts
@@ -272,7 +272,7 @@ export type TSpawnStore = {
 	stderr: TSpawnStoreChunks;
 	stdall: TSpawnStoreChunks;
 };
-type TSpawnResult = {
+export type TSpawnResult = {
 	stderr: string;
 	stdout: string;
 	stdall: string;
src/core.ts
@@ -37,6 +37,7 @@ import {
   ps,
   VoidStream,
   type TSpawnStore,
+  type TSpawnResult,
 } from './vendor-core.ts'
 import {
   type Duration,
@@ -351,10 +352,15 @@ export class ProcessPromise extends Promise<ProcessOutput> {
           $.log({ kind: 'stderr', data, verbose: !self.isQuiet(), id })
         },
         end: (data, c) => {
-          const { error, status, signal, duration, ctx: {store} } = data
+          const { error: _error, status, signal: __signal, duration, ctx: { store }} = data
           const { stdout, stderr } = store
+          const { cause, exitCode, signal: _signal } = {...self._breakData}
+
+          const signal = _signal ?? __signal
+          const code = exitCode ?? status
+          const error = cause ?? _error
           const output = new ProcessOutput({
-            code: status,
+            code,
             signal,
             error,
             duration,
@@ -362,7 +368,7 @@ export class ProcessPromise extends Promise<ProcessOutput> {
             from: $.from,
           })
 
-          $.log({ kind: 'end', signal, exitCode: status, duration, error, verbose: self.isVerbose(), id })
+          $.log({ kind: 'end', signal, exitCode: code, duration, error, verbose: self.isVerbose(), id })
 
           // Ensures EOL
           if (stdout.length && getLast(getLast(stdout)) !== BR_CC) c.on.stdout!(EOL, c)
@@ -375,6 +381,19 @@ export class ProcessPromise extends Promise<ProcessOutput> {
 
     return this
   }
+  private _breakData?: Partial<
+    Pick<ProcessOutput, 'exitCode' | 'signal' | 'cause'>
+  >
+
+  private break(
+    exitCode?: ProcessOutput['exitCode'],
+    signal?: ProcessOutput['signal'],
+    cause?: ProcessOutput['cause']
+  ): void {
+    if (!this.isRunning()) return
+    this._breakData = { exitCode, signal, cause }
+    this.kill(signal)
+  }
 
   private finalize(output: ProcessOutput, legacy = false): void {
     if (this.isSettled()) return
@@ -408,13 +427,14 @@ export class ProcessPromise extends Promise<ProcessOutput> {
       return Object.assign(stdout, { stderr, stdout, stdall })
     }})
   }
-  private _pipe(
-    source: keyof TSpawnStore,
-    dest: PipeDest,
-    ...args: any[]
-  ): PromisifiedStream<Writable> | ProcessPromise {
+  // prettier-ignore
+  private _pipe(source: keyof TSpawnStore, dest: PipeDest, ...args: any[]): PromisifiedStream<Writable> | ProcessPromise {
+    if (isString(dest))
+      return this._pipe(source, fs.createWriteStream(dest))
+
     if (isStringLiteral(dest, ...args))
-      return this.pipe[source](
+      return this._pipe(
+        source,
         $({
           halt: true,
           signal: this.signal,
@@ -423,41 +443,48 @@ export class ProcessPromise extends Promise<ProcessOutput> {
 
     this._piped = true
     const { ee } = this._snapshot
+    const output = this.output!
     const from = new VoidStream()
     const fill = () => {
       for (const chunk of this._zurk!.store[source]) from.write(chunk)
-      return true
     }
-    const fillEnd = () => this.isSettled() && fill() && from.end()
+    const fillSettled = () => {
+      if (!output) return
+      if (!output.ok) (dest as ProcessPromise).break?.(output.exitCode, output.signal, output.cause)
+      fill()
+      from.end()
+    }
 
-    if (!this.isSettled()) {
+    if (!output) {
       const onData = (chunk: string | Buffer) => from.write(chunk)
-      ee.once(source, () => {
-        fill()
-        ee.on(source, onData)
-      }).once('end', () => {
-        ee.removeListener(source, onData)
-        from.end()
-      })
+      ee
+        .once(source, () => {
+          fill()
+          ee.on(source, onData)
+        })
+        .once('end', () => {
+          ee.removeListener(source, onData)
+          from.end()
+        })
     }
 
-    if (isString(dest)) dest = fs.createWriteStream(dest)
-
     if (dest instanceof ProcessPromise) {
+      if (dest.isSettled()) throw new Fail('Cannot pipe to a settled process.')
+
       dest._pipedFrom = this
 
       if (dest.isHalted() && this.isHalted()) {
         ee.once('start', () => from.pipe(dest.run()._stdin))
       } else {
-        this.catch((e) => !dest.isNothrow() && dest._reject(e))
         from.pipe(dest.run()._stdin)
+        this.catch((e) => dest.break(e.exitCode, e.signal, e.cause))
       }
-      fillEnd()
+      fillSettled()
       return dest
     }
 
     from.once('end', () => dest.emit(EPF)).pipe(dest)
-    fillEnd()
+    fillSettled()
     return promisifyStream(dest, this) as Writable &
       PromiseLike<ProcessOutput & Writable>
   }
@@ -472,7 +499,7 @@ export class ProcessPromise extends Promise<ProcessOutput> {
     this.ac.abort(reason)
   }
 
-  kill(signal?: NodeJS.Signals): Promise<void> {
+  kill(signal?: NodeJS.Signals | null): Promise<void> {
     if (this.isSettled()) throw new Fail('Too late to kill the process.')
     if (!this.child)
       throw new Fail('Trying to kill a process without creating one.')
src/vendor-core.ts
@@ -20,6 +20,7 @@ import { bus } from './internals.ts'
 export {
   type TSpawnStore,
   type TSpawnStoreChunks,
+  type TSpawnResult,
   exec,
   buildCmd,
   isStringLiteral,
test/core.test.js
@@ -665,6 +665,14 @@ describe('core', () => {
         assert.equal(p.stdout.trim(), 'foo')
       })
 
+      test('throws if dest ProcessPromise is settled', async () => {
+        const dest = $`echo bar`
+        await dest
+        const p = $`echo foo`
+        assert.throws(() => p.pipe(dest), /Cannot pipe to a settled process/)
+        await p
+      })
+
       test('detects inappropriate ProcessPromise', async () => {
         const foo = $`echo foo`
         const p1 = $`cat`
@@ -830,6 +838,31 @@ describe('core', () => {
         assert.equal((await piped2).toString(), '1\n2\n3\n')
       })
 
+      it('fulfilled piping', async () => {
+        const p1 = $`echo foo && sleep 0.1 && echo bar`
+        await p1
+        const p2 = p1.pipe`cat`
+        await p2
+
+        assert.equal(p1.output.toString(), 'foo\nbar\n')
+        assert.equal(p2.output.toString(), 'foo\nbar\n')
+      })
+
+      it('rejected piping', async () => {
+        const p1 = $({ nothrow: true })`echo foo && exit 1`
+        await p1
+        const p2 = p1.pipe($({ nothrow: true })`cat`)
+        await p2
+
+        assert.equal(p1.output.toString(), 'foo\n')
+        assert.equal(p1.output.ok, false)
+        assert.equal(p1.output.exitCode, 1)
+
+        assert.equal(p2.output.toString(), 'foo\n')
+        assert.equal(p2.output.ok, false)
+        assert.equal(p2.output.exitCode, 1)
+      })
+
       test('propagates rejection', async () => {
         const p1 = $`exit 1`
         const p2 = p1.pipe($`echo hello`)
@@ -845,7 +878,7 @@ describe('core', () => {
           await p2
         } catch (e) {
           assert.equal(e.exitCode, 1)
-          assert.equal(e.stdout, '')
+          assert.equal(e.ok, false)
         }
 
         const p3 = await $({ nothrow: true })`echo hello && exit 1`.pipe($`cat`)
@@ -857,18 +890,39 @@ describe('core', () => {
           await p4
         } catch (e) {
           assert.equal(e.exitCode, 1)
-          assert.equal(e.stdout, '')
+          assert.equal(e.ok, false)
         }
 
-        const p5 = $`echo foo && exit 1`
-        const [r1, r2] = await Promise.allSettled([
-          p5.pipe($({ nothrow: true })`cat`),
+        const p5 = $`echo bar && sleep 0.1 && exit 1`
+        const [r1, r2, r3] = await Promise.allSettled([
           p5.pipe($`cat`),
+          p5.pipe($({ nothrow: true })`cat`),
+          p5.pipe($({ nothrow: true, halt: true })`cat`),
         ])
-        assert.equal(r1.value.stdout, 'foo\n')
-        assert.equal(r1.value.exitCode, 0)
-        assert.equal(r2.reason.stdout, 'foo\n')
-        assert.equal(r2.reason.exitCode, 1)
+        assert.equal(r1.reason.stdout, 'bar\n')
+        assert.equal(r1.reason.exitCode, 1)
+        assert.equal(r1.reason.ok, false)
+
+        assert.equal(r2.value.stdout, 'bar\n')
+        assert.equal(r2.value.exitCode, 1)
+        assert.equal(r2.value.ok, false)
+
+        assert.equal(r3.value.stdout, 'bar\n')
+        assert.equal(r3.value.exitCode, 1)
+        assert.equal(r3.value.ok, false)
+
+        const p6 = $`echo bar && exit 1`
+        const [r4, r5] = await Promise.allSettled([
+          p6.pipe($`cat`),
+          p6.pipe($({ nothrow: true })`cat`),
+        ])
+        assert.equal(r4.reason.stdout, 'bar\n')
+        assert.equal(r4.reason.exitCode, 1)
+        assert.equal(r4.reason.ok, false)
+
+        assert.equal(r5.value.stdout, 'bar\n')
+        assert.equal(r5.value.exitCode, 1)
+        assert.equal(r5.value.ok, false)
       })
 
       test('pipes particular stream: stdout, stderr, stdall', async () => {
test/deps.test.js
@@ -14,7 +14,7 @@
 
 import assert from 'node:assert'
 import { test, describe } from 'node:test'
-import { $, tmpfile, fs, path } from '../build/index.js'
+import { $, tmpfile, tmpdir, fs, path } from '../build/index.js'
 import { installDeps, parseDeps } from '../build/deps.js'
 
 const __dirname = new URL('.', import.meta.url).pathname
@@ -84,6 +84,12 @@ describe('deps', () => {
         }
       )
     })
+
+    test('does nothing on empty deps', async () => {
+      const cwd = tmpdir()
+      await installDeps({}, cwd)
+      assert(!fs.existsSync(path.join(cwd, 'node_modules')))
+    })
   })
 
   describe('parseDeps()', () => {
test/log.test.ts
@@ -86,6 +86,15 @@ describe('log', () => {
       )
     })
 
+    test('custom', () => {
+      log({
+        kind: 'custom',
+        data: 'test',
+        verbose: true,
+      })
+      assert.equal(data.join(''), 'test')
+    })
+
     test('retry', () => {
       log({
         kind: 'retry',
.size-limit.json
@@ -15,7 +15,7 @@
       "README.md",
       "LICENSE"
     ],
-    "limit": "122.50 kB",
+    "limit": "123.30 kB",
     "brotli": false,
     "gzip": false
   },
@@ -29,14 +29,14 @@
       "build/globals.js",
       "build/deno.js"
     ],
-    "limit": "813.00 kB",
+    "limit": "813.75 kB",
     "brotli": false,
     "gzip": false
   },
   {
     "name": "libdefs",
     "path": "build/*.d.ts",
-    "limit": "40.20 kB",
+    "limit": "40.25 kB",
     "brotli": false,
     "gzip": false
   },
@@ -62,7 +62,7 @@
       "README.md",
       "LICENSE"
     ],
-    "limit": "870.05 kB",
+    "limit": "870.85 kB",
     "brotli": false,
     "gzip": false
   }