Commit 3871229

Anton Golub <antongolub@antongolub.com>
2024-10-29 21:51:37
fix: properly promisify pipe stream args (#921)
* fix: properly promisify pipe stream args * test: extend Duplex pipe test asserts
1 parent c38363f
src/core.ts
@@ -317,10 +317,13 @@ export class ProcessPromise extends Promise<ProcessOutput> {
   }
 
   // Essentials
+  pipe(dest: TemplateStringsArray, ...args: any[]): ProcessPromise
+  pipe<D extends Writable>(dest: D): D & PromiseLike<void>
+  pipe<D extends ProcessPromise>(dest: D): D
   pipe(
     dest: Writable | ProcessPromise | TemplateStringsArray,
     ...args: any[]
-  ): ProcessPromise {
+  ): (Writable & PromiseLike<void>) | ProcessPromise {
     if (isStringLiteral(dest, ...args))
       return this.pipe($(dest as TemplateStringsArray, ...args))
     if (isString(dest))
@@ -355,8 +358,8 @@ export class ProcessPromise extends Promise<ProcessOutput> {
       return dest
     }
 
-    from.pipe(dest as Writable)
-    return this
+    from.pipe(dest)
+    return ProcessPromise.promisifyStream(dest)
   }
 
   abort(reason?: string) {
@@ -527,6 +530,38 @@ export class ProcessPromise extends Promise<ProcessOutput> {
   ): Promise<ProcessOutput | T> {
     return super.catch(onrejected)
   }
+
+  private static promisifyStream<D extends Writable>(
+    dest: D
+  ): D & PromiseLike<void>
+  private static promisifyStream(
+    dest: Writable | ProcessPromise
+  ): (Writable & PromiseLike<void>) | ProcessPromise {
+    return dest instanceof ProcessPromise
+      ? dest
+      : (new Proxy(dest as Writable, {
+          get(target, key) {
+            if (key === 'then') {
+              return (res: any = noop, rej: any = noop) =>
+                new Promise((_res, _rej) =>
+                  target
+                    .once('error', () => _rej(rej()))
+                    .once('finish', () => _res(res()))
+                )
+            }
+            if (key === 'pipe') {
+              const pipe = Reflect.get(target, key)
+              if (typeof pipe === 'function')
+                return function (...args: any) {
+                  return ProcessPromise.promisifyStream(
+                    pipe.apply(target, args) as Writable
+                  )
+                }
+            }
+            return Reflect.get(target, key)
+          },
+        }) as Writable & PromiseLike<void>)
+  }
 }
 
 type GettersRecord<T extends Record<any, any>> = { [K in keyof T]: () => T[K] }
test/core.test.js
@@ -16,7 +16,8 @@ import assert from 'node:assert'
 import { test, describe, before, after, it } from 'node:test'
 import { inspect } from 'node:util'
 import { basename } from 'node:path'
-import { Readable, Writable } from 'node:stream'
+import { WriteStream } from 'node:fs'
+import { Readable, Transform, Writable } from 'node:stream'
 import { Socket } from 'node:net'
 import { ProcessPromise, ProcessOutput } from '../build/index.js'
 import '../build/globals.js'
@@ -329,11 +330,12 @@ describe('core', () => {
             next()
           },
         })
-        let p = $`echo 'test'`.pipe(stream)
-        await p
-        assert.ok(p._piped)
+        let p1 = $`echo 'test'`
+        let p2 = p1.pipe(stream)
+        await p2
+        assert.ok(p1._piped)
+        assert.ok(p1.stderr instanceof Socket)
         assert.equal(contents, 'test\n')
-        assert.ok(p.stderr instanceof Socket)
       })
 
       test('accepts WriteStream', async () => {
@@ -365,7 +367,6 @@ describe('core', () => {
       test('accepts stdout', async () => {
         const p1 = $`echo pipe-to-stdout`
         const p2 = p1.pipe(process.stdout)
-        assert.equal(p1, p2)
         assert.equal((await p1).stdout.trim(), 'pipe-to-stdout')
       })
 
@@ -382,7 +383,7 @@ describe('core', () => {
         )
       })
 
-      test('is chainable', async () => {
+      test('is chainable ($)', async () => {
         let { stdout: o1 } = await $`echo "hello"`
           .pipe($`awk '{print $1" world"}'`)
           .pipe($`tr '[a-z]' '[A-Z]'`)
@@ -393,6 +394,25 @@ describe('core', () => {
         assert.equal(o2, 'HELLO WORLD\n')
       })
 
+      test('is chainable (Transform/Duplex)', async () => {
+        const p = $`echo "hello"`
+          .pipe(
+            new Transform({
+              transform(chunk, encoding, callback) {
+                callback(null, String(chunk).toUpperCase())
+              },
+            })
+          )
+          .pipe(fs.createWriteStream('/tmp/output2.txt'))
+
+        assert.ok(p instanceof WriteStream)
+        assert.equal(await p, undefined)
+        assert.equal(
+          (await fs.readFile('/tmp/output2.txt')).toString(),
+          'HELLO\n'
+        )
+      })
+
       it('supports multipiping', async () => {
         const result = $`echo 1; sleep 1; echo 2; sleep 1; echo 3`
         const piped1 = result.pipe`cat`
test-d/core.test-d.ts
@@ -26,6 +26,8 @@ expectType<Readable>(p.stderr)
 expectType<ProcessPromise>(p.nothrow())
 expectType<ProcessPromise>(p.quiet())
 expectType<ProcessPromise>(p.pipe($`cmd`))
+expectType<ProcessPromise>(p.pipe`cmd`)
+expectType<typeof process.stdout & PromiseLike<void>>(p.pipe(process.stdout))
 expectType<ProcessPromise>(p.stdio('pipe'))
 expectType<ProcessPromise>(p.timeout('1s'))
 expectType<Promise<void>>(p.kill())
.size-limit.json
@@ -2,7 +2,7 @@
   {
     "name": "zx/core",
     "path": ["build/core.cjs", "build/util.cjs", "build/vendor-core.cjs"],
-    "limit": "71 kB",
+    "limit": "71.1 kB",
     "brotli": false,
     "gzip": false
   },