Commit 8900e45

Anton Golub <antongolub@antongolub.com>
2024-11-23 10:30:29
fix: autorun halted processes on pipe run (#951)
relates #949
1 parent 15bb135
src/core.ts
@@ -23,6 +23,7 @@ import { type AsyncHook, AsyncLocalStorage, createHook } from 'node:async_hooks'
 import { type Readable, type Writable } from 'node:stream'
 import { inspect } from 'node:util'
 import { EOL as _EOL } from 'node:os'
+import { EventEmitter } from 'node:events'
 import {
   exec,
   buildCmd,
@@ -205,6 +206,10 @@ export class ProcessPromise extends Promise<ProcessOutput> {
   private _resolved = false
   private _halted?: boolean
   private _piped = false
+  private _pipedFrom?: ProcessPromise
+  private _run = false
+  private _ee = new EventEmitter()
+  private _stdin = new VoidStream()
   private _zurk: ReturnType<typeof exec> | null = null
   private _output: ProcessOutput | null = null
   private _reject: Resolve = noop
@@ -225,7 +230,10 @@ export class ProcessPromise extends Promise<ProcessOutput> {
   }
 
   run(): ProcessPromise {
-    if (this.child) return this // The _run() can be called from a few places.
+    if (this._run) return this // The _run() can be called from a few places.
+    this._halted = false
+    this._run = true
+    this._pipedFrom?.run()
 
     const $ = this._snapshot
     const self = this
@@ -255,9 +263,11 @@ export class ProcessPromise extends Promise<ProcessOutput> {
       spawn: $.spawn,
       spawnSync: $.spawnSync,
       store: $.store,
+      stdin: self._stdin,
       stdio: self._stdio ?? $.stdio,
       sync: $[SYNC],
       detached: $.detached,
+      ee: self._ee,
       run: (cb) => cb(),
       on: {
         start: () => {
@@ -326,20 +336,18 @@ export class ProcessPromise extends Promise<ProcessOutput> {
     ...args: any[]
   ): (Writable & PromiseLike<Writable>) | ProcessPromise {
     if (isStringLiteral(dest, ...args))
-      return this.pipe($(dest as TemplateStringsArray, ...args))
+      return this.pipe($({ halt: true })(dest as TemplateStringsArray, ...args))
     if (isString(dest))
       throw new Error('The pipe() method does not take strings. Forgot $?')
 
     this._piped = true
-    const { store, ee, fulfilled } = this._zurk!
+    const ee = this._ee
     const from = new VoidStream()
     const fill = () => {
-      for (const chunk of store.stdout) {
-        from.write(chunk)
-      }
+      for (const chunk of this._zurk!.store.stdout) from.write(chunk)
     }
 
-    if (fulfilled) {
+    if (this._resolved) {
       fill()
       from.end()
     } else {
@@ -354,8 +362,14 @@ export class ProcessPromise extends Promise<ProcessOutput> {
     }
 
     if (dest instanceof ProcessPromise) {
-      this.catch((e) => (dest.isNothrow() ? noop : dest._reject(e)))
-      from.pipe(dest.stdin)
+      dest._pipedFrom = this
+
+      if (dest.isHalted() && this.isHalted()) {
+        ee.once('start', () => from.pipe(dest.run()._stdin))
+      } else {
+        this.catch((e) => (dest.isNothrow() ? noop : dest._reject(e)))
+        from.pipe(dest.run()._stdin)
+      }
       return dest
     }
     from.once('end', () => dest.emit('end-piped-from')).pipe(dest)
test/core.test.js
@@ -436,6 +436,32 @@ describe('core', () => {
           assert.equal(o2, 'HELLO WORLD\n')
         })
 
+        test('$ > $ halted', async () => {
+          const $h = $({ halt: true })
+          const { stdout } = await $`echo "hello"`
+            .pipe($h`awk '{print $1" world"}'`)
+            .pipe($h`tr '[a-z]' '[A-Z]'`)
+
+          assert.equal(stdout, 'HELLO WORLD\n')
+        })
+
+        test('$ halted > $ halted', async () => {
+          const $h = $({ halt: true })
+          const { stdout } = await $h`echo "hello"`
+            .pipe($h`awk '{print $1" world"}'`)
+            .pipe($h`tr '[a-z]' '[A-Z]'`)
+            .run()
+
+          assert.equal(stdout, 'HELLO WORLD\n')
+        })
+
+        test('$ halted > $ literal', async () => {
+          const { stdout } = await $({ halt: true })`echo "hello"`
+            .pipe`awk '{print $1" world"}'`.pipe`tr '[a-z]' '[A-Z]'`.run()
+
+          assert.equal(stdout, 'HELLO WORLD\n')
+        })
+
         test('$ > stream', async () => {
           const file = tempfile()
           const fileStream = fs.createWriteStream(file)
.size-limit.json
@@ -2,7 +2,7 @@
   {
     "name": "zx/core",
     "path": ["build/core.cjs", "build/util.cjs", "build/vendor-core.cjs"],
-    "limit": "71.1 kB",
+    "limit": "72 kB",
     "brotli": false,
     "gzip": false
   },
@@ -30,7 +30,7 @@
   {
     "name": "all",
     "path": "build/*",
-    "limit": "833.6 kB",
+    "limit": "835 kB",
     "brotli": false,
     "gzip": false
   }