Commit 237844e

Anton Medvedev <anton@medv.io>
2021-05-21 00:56:10
Add pipes
1 parent 9ad9c34
index.mjs
@@ -51,25 +51,29 @@ export function $(pieces, ...args) {
 
   if ($.verbose) console.log('$', colorize(cmd))
 
-  return new Promise((resolve, reject) => {
-    let options = {
-      windowsHide: true,
-    }
-    if (typeof $.shell !== 'undefined') options.shell = $.shell
-    if (typeof $.cwd !== 'undefined') options.cwd = $.cwd
-
-    let child = exec($.prefix + cmd, options)
-    let stdout = '', stderr = '', combined = ''
-    child.stdout.on('data', data => {
-      if ($.verbose) process.stdout.write(data)
-      stdout += data
-      combined += data
-    })
-    child.stderr.on('data', data => {
-      if ($.verbose) process.stderr.write(data)
-      stderr += data
-      combined += data
-    })
+  let options = {
+    windowsHide: true,
+    maxBuffer: 100e6,
+  }
+  if (typeof $.shell !== 'undefined') options.shell = $.shell
+  if (typeof $.cwd !== 'undefined') options.cwd = $.cwd
+
+  let child = exec($.prefix + cmd, options)
+  process.stdin.pipe(child.stdin)
+
+  let stdout = '', stderr = '', combined = ''
+  child.stdout.on('data', data => {
+    if ($.verbose) process.stdout.write(data)
+    stdout += data
+    combined += data
+  })
+  child.stderr.on('data', data => {
+    if ($.verbose) process.stderr.write(data)
+    stderr += data
+    combined += data
+  })
+
+  let promise = new ProcessPromise((resolve, reject) => {
     child.on('exit', code => {
       child.on('close', () => {
         (code === 0 ? resolve : reject)
@@ -77,6 +81,8 @@ export function $(pieces, ...args) {
       })
     })
   })
+  promise.child = child
+  return promise
 }
 
 $.verbose = true
@@ -134,6 +140,40 @@ export async function fetch(url, init) {
 
 export const sleep = promisify(setTimeout)
 
+export class ProcessPromise extends Promise {
+  child = undefined
+
+  get stdin() {
+    return this.child.stdin
+  }
+
+  get stdout() {
+    return this.child.stdout
+  }
+
+  get stderr() {
+    return this.child.stderr
+  }
+
+  pipe(dest) {
+    if (typeof dest === 'string') return this.pipe($([dest]))
+    if (dest instanceof ProcessPromise) {
+      process.stdin.unpipe(dest.stdin)
+      this.stdout.pipe(dest.stdin)
+      let combinedPromise = new ProcessPromise((resolve, reject) => {
+        Promise.all([this, dest])
+          .then(() => resolve(this))
+          .catch(reject)
+      })
+      combinedPromise.child = this.child
+      return combinedPromise
+    } else {
+      this.stdout.pipe(dest)
+    }
+    return this
+  }
+}
+
 export class ProcessOutput {
   #code = 0
   #stdout = ''
test.mjs
@@ -87,6 +87,12 @@ import {strict as assert} from 'assert'
   await $`node zx.mjs examples/index.md`
 }
 
+{ // Pipes works both ways
+  let p = $`read foo; echo "$foo"`.pipe($`cat | wc`).pipe('wc -c')
+  p.stdin.write('hello\n')
+  assert((await p).stdout === 'hello\n')
+}
+
 { // require() is working in ESM
   const {name, version} = require('./package.json')
   assert(typeof name === 'string')
zx.mjs
@@ -16,7 +16,7 @@
 
 import {join, basename, extname, resolve, dirname} from 'path'
 import os, {tmpdir} from 'os'
-import {promises as fs} from 'fs'
+import {promises as fs, createWriteStream, createReadStream} from 'fs'
 import {createRequire} from 'module'
 import url from 'url'
 import {$, cd, question, fetch, chalk, sleep, ProcessOutput} from './index.mjs'
@@ -28,7 +28,7 @@ Object.assign(global, {
   question,
   chalk,
   sleep,
-  fs,
+  fs: {...fs, createWriteStream, createReadStream},
   os
 })