Commit 3e6d8ea

Anton Medvedev <anton@medv.io>
2021-05-21 20:33:39
Add better pipelines
1 parent d0e889c
examples/interactive.mjs
@@ -0,0 +1,19 @@
+#!/usr/bin/env zx
+
+// Copyright 2021 Google LLC
+// 
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// 
+//     https://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+let out = fs.createWriteStream('log.txt')
+process.stdin.pipe(out)
+await $`npm init`.pipe(out)
examples/pipelines.md
@@ -0,0 +1,42 @@
+# Pipelines
+
+> You can run this markdown file: `zx examples/pipelines.md`
+
+The `zx` supports Node.js streams and special `pipe()` method can be used to
+redirect stdout.
+
+```js
+await $`echo "Hello, stdout!"`
+  .pipe(fs.createWriteStream('/tmp/output.txt'))
+
+await $`cat /tmp/output.txt`
+```
+
+Processes created with `$` gets stdin from `process.stdin`, but we can also
+write to child process too:
+
+```js
+let p = $`read var; echo "$var";`
+p.stdin.write('Hello, stdin!\n')
+
+let {stdout} = await p
+```
+
+Pipes can be used to show real-time output of programs:
+
+```js
+$.verbose = false
+
+await $`echo 1; sleep 1; echo 2; sleep 1; echo 3;`
+  .pipe(process.stdout)
+```
+
+Also, the `pipe()` method can combine `$` programs. Same as `|` in bash:
+
+```js
+let greeting = await $`printf "hello"`
+  .pipe($`awk '{printf $1", world!"}'`)
+  .pipe($`tr '[a-z]' '[A-Z]'`)
+
+console.log(greeting.stdout)
+```
index.d.ts
@@ -12,8 +12,11 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+import {ChildProcess} from 'child_process'
+import {Readable, Writable} from 'stream'
+
 interface $ {
-  (pieces: TemplateStringsArray, ...args: any[]): Promise<ProcessOutput>
+  (pieces: TemplateStringsArray, ...args: any[]): ProcessPromise<ProcessOutput>
   verbose: boolean
   shell: string
   cwd: string
@@ -30,6 +33,14 @@ export type QuestionOptions = { choices: string[] }
 
 export function sleep(ms: number): Promise<void>
 
+export interface ProcessPromise<T> extends Promise<T> {
+  child: ChildProcess
+  readonly stdin: Writable
+  readonly stdout: Readable
+  readonly stderr: Readable
+  pipe(dest: ProcessPromise<ProcessOutput>|Writable): ProcessPromise<ProcessOutput>
+}
+
 export class ProcessOutput {
   readonly exitCode: number
   readonly stdout: string
index.mjs
@@ -13,10 +13,10 @@
 // limitations under the License.
 
 import {existsSync} from 'fs'
+import {promisify} from 'util'
 import {exec} from 'child_process'
 import {createInterface} from 'readline'
 import {default as nodeFetch} from 'node-fetch'
-import {promisify} from 'util'
 import which from 'which'
 import chalk from 'chalk'
 import shq from 'shq'
@@ -76,8 +76,11 @@ export function $(pieces, ...args) {
   let promise = new ProcessPromise((resolve, reject) => {
     child.on('exit', code => {
       child.on('close', () => {
-        (code === 0 ? resolve : reject)
-        (new ProcessOutput({code, stdout, stderr, combined, __from}))
+        let output = new ProcessOutput({
+          code, stdout, stderr, combined,
+          message: `${stderr || '\n'}    at ${__from}`
+        });
+        (code === 0 ? resolve : reject)(output)
       })
     })
   })
@@ -101,7 +104,7 @@ export function cd(path) {
   if (!existsSync(path)) {
     let __from = (new Error().stack.split('at ')[2]).trim()
     console.error(`cd: ${path}: No such directory`)
-    console.error(`  at ${__from}`)
+    console.error(`    at ${__from}`)
     process.exit(1)
   }
   $.cwd = path
@@ -162,33 +165,25 @@ export class ProcessPromise extends Promise {
     if (dest instanceof ProcessPromise) {
       process.stdin.unpipe(dest.stdin)
       this.stdout.pipe(dest.stdin)
-      let combinedPromise = new ProcessPromise((resolve, reject) => {
-        Promise.all([this, dest])
-          .then(() => resolve(this))
-          .catch(reject)
-      })
-      combinedPromise.child = this.child
-      return combinedPromise
-    } else {
-      this.stdout.pipe(dest)
+      return dest
     }
+    this.stdout.pipe(dest)
     return this
   }
 }
 
-export class ProcessOutput {
+export class ProcessOutput extends Error {
   #code = 0
   #stdout = ''
   #stderr = ''
   #combined = ''
-  #__from = ''
 
-  constructor({code, stdout, stderr, combined, __from}) {
+  constructor({code, stdout, stderr, combined, message}) {
+    super(message)
     this.#code = code
     this.#stdout = stdout
     this.#stderr = stderr
     this.#combined = combined
-    this.#__from = __from
   }
 
   toString() {
@@ -206,8 +201,4 @@ export class ProcessOutput {
   get exitCode() {
     return this.#code
   }
-
-  get __from() {
-    return this.#__from
-  }
 }
package.json
@@ -14,6 +14,7 @@
     "test": "node zx.mjs test.mjs"
   },
   "dependencies": {
+    "@types/node": "^15.3",
     "chalk": "^4.1.1",
     "node-fetch": "^2.6.1",
     "shq": "^1.0.2",
README.md
@@ -60,7 +60,7 @@ When using `zx` via the executable or a shebang, all of the functions
 ### ``$`command` ``
 
 Executes a given string using the `exec` function from the
-`child_process` package and returns `Promise<ProcessOutput>`.
+`child_process` package and returns `ProcessPromise<ProcessOutput>`.
 
 ```js
 let count = parseInt(await $`ls -1 | wc -l`)
@@ -88,6 +88,25 @@ try {
 }
 ```
 
+### `ProcessPromise`
+
+```ts
+class ProcessPromise<T> extends Promise<T> {
+  readonly stdin: Writable
+  readonly stdout: Readable
+  readonly stderr: Readable
+  pipe(dest): ProcessPromise<T>
+}
+```
+
+The `pipe()` method can be used to redirect stdout:
+
+```js
+await $`cat file.txt`.pipe(process.stdout)
+```
+
+Read more about [pipelines](examples/pipelines.md).
+
 ### `ProcessOutput`
 
 ```ts
test.mjs
@@ -87,10 +87,36 @@ import {strict as assert} from 'assert'
   await $`node zx.mjs examples/index.md`
 }
 
-{ // Pipes works both ways
-  let p = $`read foo; echo "$foo"`.pipe($`cat | wc`).pipe($`wc -c`)
-  p.stdin.write('hello\n')
-  assert((await p).stdout === 'hello\n')
+{ // Pipes are working
+  let {stdout} = await $`echo "hello"`
+    .pipe($`awk '{print $1" world"}'`)
+    .pipe($`tr '[a-z]' '[A-Z]'`)
+  assert(stdout === 'HELLO WORLD\n')
+
+  try {
+    let w = await $`echo foo`
+      .pipe(fs.createWriteStream('/tmp/output.txt'))
+    assert((await w).stdout === 'foo\n')
+
+    let r = $`cat`
+    fs.createReadStream('/tmp/output.txt')
+      .pipe(r.stdin)
+    assert((await r).stdout === 'foo\n')
+  } finally {
+    await fs.rm('/tmp/output.txt')
+  }
+}
+
+{ // ProcessOutput thrown as error
+  let err
+  try {
+    await $`wtf`
+  } catch (p) {
+    err = p
+  }
+  console.log(err)
+  assert(err.exitCode > 0)
+  console.log('☝️ Error above is expected')
 }
 
 { // require() is working in ESM
zx.mjs
@@ -62,7 +62,7 @@ try {
 
 } catch (p) {
   if (p instanceof ProcessOutput) {
-    console.error('  at ' + p.__from)
+    console.error('Error: ' + p.message)
     process.exit(1)
   } else {
     throw p