Commit 4bb470b

Anton Golub <antongolub@antongolub.com>
2024-11-24 14:49:58
feat: let $ be piped from stream (#953)
continues #949
1 parent 8900e45
src/core.ts
@@ -48,7 +48,6 @@ import {
   once,
   parseDuration,
   preferLocalBin,
-  promisifyStream,
   quote,
   quotePowerShell,
 } from './util.js'
@@ -373,7 +372,7 @@ export class ProcessPromise extends Promise<ProcessOutput> {
       return dest
     }
     from.once('end', () => dest.emit('end-piped-from')).pipe(dest)
-    return promisifyStream(dest)
+    return promisifyStream(dest, this)
   }
 
   abort(reason?: string) {
@@ -544,6 +543,32 @@ export class ProcessPromise extends Promise<ProcessOutput> {
   ): Promise<ProcessOutput | T> {
     return super.catch(onrejected)
   }
+
+  // Stream-like API
+  private writable = true
+  private emit(event: string, ...args: any[]) {
+    return this
+  }
+  private on(event: string, cb: any) {
+    this._stdin.on(event, cb)
+    return this
+  }
+  private once(event: string, cb: any) {
+    this._stdin.once(event, cb)
+    return this
+  }
+  private write(data: any, encoding: BufferEncoding, cb: any) {
+    this._stdin.write(data, encoding, cb)
+    return this
+  }
+  private end(chunk: any, cb: any) {
+    this._stdin.end(chunk, cb)
+    return this
+  }
+  private removeListener(event: string, cb: any) {
+    this._stdin.removeListener(event, cb)
+    return this
+  }
 }
 
 type GettersRecord<T extends Record<any, any>> = { [K in keyof T]: () => T[K] }
@@ -841,3 +866,33 @@ export function log(entry: LogEntry) {
       process.stderr.write(entry.error + '\n')
   }
 }
+
+export const promisifyStream = <S extends Writable>(
+  stream: S,
+  from?: ProcessPromise
+): S & PromiseLike<S> =>
+  new Proxy(stream as S & PromiseLike<S>, {
+    get(target, key) {
+      if (key === 'run') return from?.run.bind(from)
+      if (key === 'then') {
+        return (res: any = noop, rej: any = noop) =>
+          new Promise((_res, _rej) =>
+            target
+              .once('error', (e) => _rej(rej(e)))
+              .once('finish', () => _res(res(target)))
+              .once('end-piped-from', () => _res(res(target)))
+          )
+      }
+      const value = Reflect.get(target, key)
+      if (key === 'pipe' && typeof value === 'function') {
+        return function (...args: any) {
+          const piped = value.apply(target, args)
+          piped._pipedFrom = from
+          return piped instanceof ProcessPromise
+            ? piped
+            : promisifyStream(piped, from)
+        }
+      }
+      return value
+    },
+  })
src/util.ts
@@ -16,7 +16,6 @@ import os from 'node:os'
 import path from 'node:path'
 import fs from 'node:fs'
 import { chalk } from './vendor-core.js'
-import type { Writable } from 'node:stream'
 
 export { isStringLiteral } from './vendor-core.js'
 
@@ -450,27 +449,3 @@ export const once = <T extends (...args: any[]) => any>(fn: T) => {
     return (result = fn(...args))
   }
 }
-
-export const promisifyStream = <S extends Writable>(
-  stream: S
-): S & PromiseLike<S> =>
-  new Proxy(stream as S & PromiseLike<S>, {
-    get(target, key) {
-      if (key === 'then') {
-        return (res: any = noop, rej: any = noop) =>
-          new Promise((_res, _rej) =>
-            target
-              .once('error', (e) => _rej(rej(e)))
-              .once('finish', () => _res(res(target)))
-              .once('end-piped-from', () => _res(res(target)))
-          )
-      }
-      const value = Reflect.get(target, key)
-      if (key === 'pipe' && typeof value === 'function') {
-        return function (...args: any) {
-          return promisifyStream(value.apply(target, args))
-        }
-      }
-      return value
-    },
-  })
test/core.test.js
@@ -425,6 +425,13 @@ describe('core', () => {
       })
 
       describe('supports chaining', () => {
+        const getUpperCaseTransform = () =>
+          new Transform({
+            transform(chunk, encoding, callback) {
+              callback(null, String(chunk).toUpperCase())
+            },
+          })
+
         test('$ > $', async () => {
           const { stdout: o1 } = await $`echo "hello"`
             .pipe($`awk '{print $1" world"}'`)
@@ -466,13 +473,7 @@ describe('core', () => {
           const file = tempfile()
           const fileStream = fs.createWriteStream(file)
           const p = $`echo "hello"`
-            .pipe(
-              new Transform({
-                transform(chunk, encoding, callback) {
-                  callback(null, String(chunk).toUpperCase())
-                },
-              })
-            )
+            .pipe(getUpperCaseTransform())
             .pipe(fileStream)
 
           assert.ok(p instanceof WriteStream)
@@ -481,6 +482,39 @@ describe('core', () => {
           await fs.rm(file)
         })
 
+        test('$ halted > stream', async () => {
+          const file = tempfile()
+          const fileStream = fs.createWriteStream(file)
+          const p = $({ halt: true })`echo "hello"`
+            .pipe(getUpperCaseTransform())
+            .pipe(fileStream)
+
+          assert.ok(p instanceof WriteStream)
+          assert.ok(p.run() instanceof ProcessPromise)
+          await p
+          assert.equal((await p.run()).stdout, 'hello\n')
+          assert.equal((await fs.readFile(file)).toString(), 'HELLO\n')
+          await fs.rm(file)
+        })
+
+        test('stream > $', async () => {
+          const file = tempfile()
+          await fs.writeFile(file, 'test')
+          const { stdout } = await fs
+            .createReadStream(file)
+            .pipe(getUpperCaseTransform())
+            .pipe($`cat`)
+
+          assert.equal(stdout, 'TEST')
+        })
+
+        test('$ > stream > $', async () => {
+          const p = $`echo "hello"`
+          const { stdout } = await p.pipe(getUpperCaseTransform()).pipe($`cat`)
+
+          assert.equal(stdout, 'HELLO\n')
+        })
+
         test('$ > stdout', async () => {
           const p = $`echo 1`.pipe(process.stdout)
           assert.equal(await p, process.stdout)
.size-limit.json
@@ -2,7 +2,7 @@
   {
     "name": "zx/core",
     "path": ["build/core.cjs", "build/util.cjs", "build/vendor-core.cjs"],
-    "limit": "72 kB",
+    "limit": "73 kB",
     "brotli": false,
     "gzip": false
   },
@@ -16,7 +16,7 @@
   {
     "name": "dts libdefs",
     "path": "build/*.d.ts",
-    "limit": "36 kB",
+    "limit": "37 kB",
     "brotli": false,
     "gzip": false
   },
@@ -30,7 +30,7 @@
   {
     "name": "all",
     "path": "build/*",
-    "limit": "835 kB",
+    "limit": "840 kB",
     "brotli": false,
     "gzip": false
   }