Commit 38da212

Anton Golub <antongolub@antongolub.com>
2025-07-31 22:17:34
feat: enable many to one piping (#1300)
* feat: enable many to one piping * docs: describe many-to-one piping * chore: linting
1 parent e18fe85
build/core.cjs
@@ -521,14 +521,14 @@ var _ProcessPromise = class _ProcessPromise extends Promise {
     );
   }
   run() {
-    var _a, _b, _c, _d;
+    var _a, _b, _c;
+    _ProcessPromise.bus.runBack(this);
     if (this.isRunning() || this.isSettled()) return this;
     this._stage = "running";
-    (_a = this._pipedFrom) == null ? void 0 : _a.run();
     const self = this;
     const $2 = self._snapshot;
     const id = self.id;
-    const cwd = (_b = $2.cwd) != null ? _b : $2[CWD];
+    const cwd = (_a = $2.cwd) != null ? _a : $2[CWD];
     if ($2.preferLocal) {
       const dirs = $2.preferLocal === true ? [$2.cwd, $2[CWD]] : [$2.preferLocal].flat();
       $2.env = (0, import_util.preferLocalBin)($2.env, ...dirs);
@@ -536,7 +536,7 @@ var _ProcessPromise = class _ProcessPromise extends Promise {
     this._zurk = (0, import_vendor_core2.exec)({
       cmd: self.fullCmd,
       cwd,
-      input: (_d = (_c = $2.input) == null ? void 0 : _c.stdout) != null ? _d : $2.input,
+      input: (_c = (_b = $2.input) == null ? void 0 : _b.stdout) != null ? _c : $2.input,
       stdin: self._stdin,
       sync: self.sync,
       signal: self.signal,
@@ -605,6 +605,7 @@ var _ProcessPromise = class _ProcessPromise extends Promise {
   finalize(output, legacy = false) {
     if (this.isSettled()) return;
     this._output = output;
+    _ProcessPromise.bus.unpipeBack(this);
     if (output.ok || this.isNothrow()) {
       this._stage = "fulfilled";
       this._resolve(output);
@@ -633,16 +634,23 @@ var _ProcessPromise = class _ProcessPromise extends Promise {
     this._piped = true;
     const { ee } = this._snapshot;
     const output = this.output;
+    const isP = dest instanceof _ProcessPromise;
     const from = new import_vendor_core2.VoidStream();
+    const end = () => {
+      if (!isP) return from.end();
+      setImmediate(() => {
+        _ProcessPromise.bus.unpipe(this, dest);
+        _ProcessPromise.bus.sources(dest).length === 0 && from.end();
+      });
+    };
     const fill = () => {
       for (const chunk of this._zurk.store[source]) from.write(chunk);
     };
     const fillSettled = () => {
-      var _a;
       if (!output) return;
-      if (!output.ok) (_a = dest.break) == null ? void 0 : _a.call(dest, output.exitCode, output.signal, output.cause);
+      if (!output.ok && isP) dest.break(output.exitCode, output.signal, output.cause);
       fill();
-      from.end();
+      end();
     };
     if (!output) {
       const onData = (chunk) => from.write(chunk);
@@ -651,16 +659,17 @@ var _ProcessPromise = class _ProcessPromise extends Promise {
         ee.on(source, onData);
       }).once("end", () => {
         ee.removeListener(source, onData);
-        from.end();
+        end();
       });
     }
-    if (dest instanceof _ProcessPromise) {
+    if (isP) {
       if (dest.isSettled()) throw new Fail("Cannot pipe to a settled process.");
-      dest._pipedFrom = this;
+      _ProcessPromise.bus.pipe(this, dest);
+      from.pipe(dest._stdin);
       if (dest.isHalted() && this.isHalted()) {
-        ee.once("start", () => from.pipe(dest.run()._stdin));
+        ee.once("start", () => dest.run());
       } else {
-        from.pipe(dest.run()._stdin);
+        dest.run();
         this.catch((e) => dest.break(e.exitCode, e.signal, e.cause));
       }
       fillSettled();
@@ -894,6 +903,40 @@ Object.defineProperty(_ProcessPromise.prototype, "pipe", { get() {
   const stdall = getPipeMethod("stdall");
   return Object.assign(stdout, { stderr, stdout, stdall });
 } });
+// prettier-ignore
+_ProcessPromise.bus = {
+  refs: /* @__PURE__ */ new Map(),
+  pipe(from, to) {
+    const set = this.refs.get(from) || this.refs.set(from, /* @__PURE__ */ new Set()).get(from);
+    set.add(to);
+  },
+  unpipe(from, to) {
+    const set = this.refs.get(from);
+    if (!set) return;
+    if (to) set.delete(to);
+    if (set.size) return;
+    this.refs.delete(from);
+    from._piped = false;
+  },
+  unpipeBack(to, from) {
+    if (from) return this.unpipe(from, to);
+    for (const _from of this.refs.keys()) {
+      this.unpipe(_from, to);
+    }
+  },
+  runBack(p) {
+    for (const from of this.sources(p)) {
+      from.run();
+    }
+  },
+  sources(p) {
+    const refs = [];
+    for (const [from, set] of this.refs.entries()) {
+      set.has(p) && refs.push(from);
+    }
+    return refs;
+  }
+};
 var ProcessPromise = _ProcessPromise;
 var _ProcessOutput = class _ProcessOutput extends Error {
   // prettier-ignore
@@ -1072,7 +1115,7 @@ var promisifyStream = (stream, from) => (0, import_util.proxyOverride)(stream, {
   run() {
     return from.run();
   },
-  _pipedFrom: from,
+  // TODO _pipedFrom: from,
   pipe(...args) {
     const piped = stream.pipe.apply(stream, args);
     return piped instanceof ProcessPromise ? piped : promisifyStream(piped, from);
build/core.d.ts
@@ -92,7 +92,6 @@ export declare class ProcessPromise extends Promise<ProcessOutput> {
     private _snapshot;
     private _timeoutId?;
     private _piped;
-    private _pipedFrom?;
     private _stdin;
     private _zurk;
     private _output;
@@ -108,6 +107,7 @@ export declare class ProcessPromise extends Promise<ProcessOutput> {
         [key in keyof TSpawnStore]: PipeMethod;
     };
     private _pipe;
+    private static bus;
     abort(reason?: string): void;
     kill(signal?: NodeJS.Signals | null): Promise<void>;
     /**
build/index.cjs
@@ -53,7 +53,7 @@ var import_vendor = require("./vendor.cjs");
 
 // src/versions.ts
 var versions = {
-  zx: "8.7.2",
+  zx: "8.8.0",
   chalk: "5.4.1",
   depseek: "0.4.1",
   dotenv: "0.2.3",
docs/process-promise.md
@@ -103,16 +103,19 @@ for await (const line of $({
 
 ## `pipe()`
 
-Redirects the output of the process.
+Redirects the output of the process. Almost same as `|` in bash but with enhancements.
+```js
+const greeting = await $`printf "hello"`
+  .pipe($`awk '{printf $1", world!"}'`)
+  .pipe($`tr '[a-z]' '[A-Z]'`)
+```
+
+`pipe()` accepts any kind `Writable`, `ProcessPromise` or a file path.
 
 ```js
 await $`echo "Hello, stdout!"`
   .pipe(fs.createWriteStream('/tmp/output.txt'))
-
-await $`cat /tmp/output.txt`
 ```
-
-`pipe()` accepts any kind `Writable`, `ProcessPromise` or a file path.
 You can pass a string to `pipe()` to implicitly create a receiving file. The previous example is equivalent to:
 
 ```js
@@ -167,16 +170,6 @@ const [o1, o2] = await Process.all([
 ])
 ```
 
-The `pipe()` method can combine `$` processes. Same as `|` in bash:
-
-```js
-const greeting = await $`printf "hello"`
-  .pipe($`awk '{printf $1", world!"}'`)
-  .pipe($`tr '[a-z]' '[A-Z]'`)
-
-echo(greeting)
-```
-
 Use combinations of `pipe()` and [`nothrow()`](#nothrow):
 
 ```js
@@ -185,7 +178,7 @@ await $`find ./examples -type f -print0`
   .pipe($`wc -l`)
 ```
 
-And literals! Pipe does support them too:
+And literals! The `pipe()` does support them too:
 
 ```js
 await $`printf "hello"`
@@ -193,7 +186,25 @@ await $`printf "hello"`
   .pipe`tr '[a-z]' '[A-Z]'`
 ```
 
-By default, `pipe()` API operates with `stdout` stream, but you can specify `stderr` as well:
+The `pipe()` allows not only chain or split stream, but also to merge them.
+```js
+const $h = $({ halt: true })
+const p1 = $`echo foo`
+const p2 = $h`echo a && sleep 0.1 && echo c && sleep 0.2 && echo e`
+const p3 = $h`sleep 0.05 && echo b && sleep 0.1 && echo d`
+const p4 = $`sleep 0.4 && echo bar`
+const p5 = $h`cat`
+
+await p1
+p1.pipe(p5)
+p2.pipe(p5)
+p3.pipe(p5)
+p4.pipe(p5)
+
+const { stdout } = await p5.run() // 'foo\na\nb\nc\nd\ne\nbar\n'
+```
+
+By default, `pipe()` operates with `stdout` stream, but you can specify `stderr` as well:
 
 ```js
 const p = $`echo foo >&2; echo bar`
@@ -201,7 +212,7 @@ const o1 = (await p.pipe.stderr`cat`).toString()  // 'foo\n'
 const o2 = (await p.pipe.stdout`cat`).toString()  // 'bar\n'
 ```
 
-Btw, the signal, if specified, will be transmitted through pipeline.
+The [signal](/api#signal) option, if specified, will be transmitted through the pipeline.
 
 ```js
 const ac = new AbortController()
src/core.ts
@@ -37,7 +37,6 @@ import {
   ps,
   VoidStream,
   type TSpawnStore,
-  type TSpawnResult,
 } from './vendor-core.ts'
 import {
   type Duration,
@@ -250,7 +249,6 @@ export class ProcessPromise extends Promise<ProcessOutput> {
   private _snapshot!: Snapshot
   private _timeoutId?: ReturnType<typeof setTimeout>
   private _piped = false
-  private _pipedFrom?: ProcessPromise
   private _stdin = new VoidStream()
   private _zurk: ReturnType<typeof exec> | null = null
   private _output: ProcessOutput | null = null
@@ -295,9 +293,9 @@ export class ProcessPromise extends Promise<ProcessOutput> {
     ) as string
   }
   run(): ProcessPromise {
+    ProcessPromise.bus.runBack(this)
     if (this.isRunning() || this.isSettled()) return this // The _run() can be called from a few places.
     this._stage = 'running'
-    this._pipedFrom?.run()
 
     const self = this
     const $ = self._snapshot
@@ -398,6 +396,7 @@ export class ProcessPromise extends Promise<ProcessOutput> {
   private finalize(output: ProcessOutput, legacy = false): void {
     if (this.isSettled()) return
     this._output = output
+    ProcessPromise.bus.unpipeBack(this)
     if (output.ok || this.isNothrow()) {
       this._stage = 'fulfilled'
       this._resolve(output)
@@ -444,15 +443,23 @@ export class ProcessPromise extends Promise<ProcessOutput> {
     this._piped = true
     const { ee } = this._snapshot
     const output = this.output!
+    const isP = dest instanceof ProcessPromise
     const from = new VoidStream()
+    const end = () => {
+      if (!isP) return from.end()
+      setImmediate(() => {
+        ProcessPromise.bus.unpipe(this, dest)
+        ProcessPromise.bus.sources(dest).length === 0 && from.end()
+      })
+    }
     const fill = () => {
       for (const chunk of this._zurk!.store[source]) from.write(chunk)
     }
     const fillSettled = () => {
       if (!output) return
-      if (!output.ok) (dest as ProcessPromise).break?.(output.exitCode, output.signal, output.cause)
+      if (!output.ok && isP) dest.break(output.exitCode, output.signal, output.cause)
       fill()
-      from.end()
+      end()
     }
 
     if (!output) {
@@ -464,19 +471,19 @@ export class ProcessPromise extends Promise<ProcessOutput> {
         })
         .once('end', () => {
           ee.removeListener(source, onData)
-          from.end()
+          end()
         })
     }
 
-    if (dest instanceof ProcessPromise) {
+    if (isP) {
       if (dest.isSettled()) throw new Fail('Cannot pipe to a settled process.')
+      ProcessPromise.bus.pipe(this, dest)
 
-      dest._pipedFrom = this
-
+      from.pipe(dest._stdin)
       if (dest.isHalted() && this.isHalted()) {
-        ee.once('start', () => from.pipe(dest.run()._stdin))
+        ee.once('start', () => dest.run())
       } else {
-        from.pipe(dest.run()._stdin)
+        dest.run()
         this.catch((e) => dest.break(e.exitCode, e.signal, e.cause))
       }
       fillSettled()
@@ -489,6 +496,41 @@ export class ProcessPromise extends Promise<ProcessOutput> {
       PromiseLike<ProcessOutput & Writable>
   }
 
+  // prettier-ignore
+  private static bus = {
+    refs: new Map<ProcessPromise, Set<ProcessPromise>>,
+    pipe(from: ProcessPromise, to: ProcessPromise) {
+      const set = this.refs.get(from) || (this.refs.set(from, new Set<ProcessPromise>())).get(from)!
+      set.add(to)
+    },
+    unpipe(from: ProcessPromise, to?: ProcessPromise) {
+      const set = this.refs.get(from)
+      if (!set) return
+      if (to) set.delete(to)
+      if (set.size) return
+      this.refs.delete(from)
+      from._piped = false
+    },
+    unpipeBack(to: ProcessPromise, from?: ProcessPromise) {
+      if (from) return this.unpipe(from, to)
+      for (const _from of this.refs.keys()) {
+        this.unpipe(_from, to)
+      }
+    },
+    runBack(p: ProcessPromise) {
+      for (const from of this.sources(p)) {
+        from.run()
+      }
+    },
+    sources(p: ProcessPromise): ProcessPromise[] {
+      const refs = []
+      for (const [from, set] of this.refs.entries()) {
+        set.has(p) && refs.push(from)
+      }
+      return refs
+    }
+  }
+
   abort(reason?: string) {
     if (this.isSettled()) throw new Fail('Too late to abort the process.')
     if (this.signal !== this.ac.signal)
@@ -1011,7 +1053,7 @@ const promisifyStream = <S extends Writable>(
     run() {
       return from.run()
     },
-    _pipedFrom: from,
+    // TODO _pipedFrom: from,
     pipe(...args: any) {
       const piped = stream.pipe.apply(stream, args)
       return piped instanceof ProcessPromise
src/versions.ts
@@ -13,7 +13,7 @@
 // limitations under the License.
 
 export const versions: Record<string, string> = {
-  zx: '8.7.2',
+  zx: '8.8.0',
   chalk: '5.4.1',
   depseek: '0.4.1',
   dotenv: '0.2.3',
test/core.test.js
@@ -738,6 +738,25 @@ describe('core', () => {
           assert.equal(stdout, 'HELLO WORLD\n')
         })
 
+        test('several $ halted > $ halted', async () => {
+          const $h = $({ halt: true })
+          const p1 = $`echo foo`
+          const p2 = $h`echo a && sleep 0.1 && echo c && sleep 0.2 && echo e`
+          const p3 = $h`sleep 0.05 && echo b && sleep 0.1 && echo d`
+          const p4 = $`sleep 0.4 && echo bar`
+          const p5 = $h`cat`
+
+          await p1
+          p1.pipe(p5)
+          p2.pipe(p5)
+          p3.pipe(p5)
+          p4.pipe(p5)
+
+          const { stdout } = await p5.run()
+
+          assert.equal(stdout, 'foo\na\nb\nc\nd\ne\nbar\n')
+        })
+
         test('$ > stream', async () => {
           const file = tempfile()
           const fileStream = fs.createWriteStream(file)
test/export.test.js
@@ -33,6 +33,7 @@ describe('core', () => {
     assert.equal(typeof core.ProcessOutput.getExitCodeInfo, 'function', 'core.ProcessOutput.getExitCodeInfo')
     assert.equal(typeof core.ProcessOutput.getExitMessage, 'function', 'core.ProcessOutput.getExitMessage')
     assert.equal(typeof core.ProcessPromise, 'function', 'core.ProcessPromise')
+    assert.equal(typeof core.ProcessPromise.bus, 'object', 'core.ProcessPromise.bus')
     assert.equal(typeof core.cd, 'function', 'core.cd')
     assert.equal(typeof core.chalk, 'function', 'core.chalk')
     assert.equal(typeof core.chalk.level, 'number', 'core.chalk.level')
@@ -159,6 +160,7 @@ describe('index', () => {
     assert.equal(typeof index.ProcessOutput.getExitCodeInfo, 'function', 'index.ProcessOutput.getExitCodeInfo')
     assert.equal(typeof index.ProcessOutput.getExitMessage, 'function', 'index.ProcessOutput.getExitMessage')
     assert.equal(typeof index.ProcessPromise, 'function', 'index.ProcessPromise')
+    assert.equal(typeof index.ProcessPromise.bus, 'object', 'index.ProcessPromise.bus')
     assert.equal(typeof index.VERSION, 'string', 'index.VERSION')
     assert.equal(typeof index.YAML, 'object', 'index.YAML')
     assert.equal(typeof index.YAML.Alias, 'function', 'index.YAML.Alias')
.size-limit.json
@@ -15,7 +15,7 @@
       "README.md",
       "LICENSE"
     ],
-    "limit": "123.30 kB",
+    "limit": "124.302 kB",
     "brotli": false,
     "gzip": false
   },
@@ -29,7 +29,7 @@
       "build/globals.js",
       "build/deno.js"
     ],
-    "limit": "813.60 kB",
+    "limit": "814.60 kB",
     "brotli": false,
     "gzip": false
   },
@@ -62,7 +62,7 @@
       "README.md",
       "LICENSE"
     ],
-    "limit": "870.80 kB",
+    "limit": "871.805 kB",
     "brotli": false,
     "gzip": false
   }