Haoyi's Programming Blog

Table of Contents

How to work with Subprocesses in Scala

Posted 2019-06-06
How to work with Files in ScalaHow to work with HTTP JSON APIs in Scala

Most complex systems are made of multiple processes: often the tool you need is not easily usable as a library within your program, but can be easily started as a subprocess to accomplish the task you need it to do. This tutorial will walk through how to easily work with such subprocesses from the Scala programming language, to allow you to interact with the rich ecosystem of third-party tools and utilities that subprocesses make available.


About the Author: Haoyi is a software engineer, and the author of many open-source Scala tools such as the Ammonite REPL and the Mill Build Tool. If you enjoyed the contents on this blog, you may also enjoy Haoyi's book Hands-on Scala Programming


The easiest way to work with the subprocesses in Scala is through the OS-Lib library. OS-Lib is available on Maven Central for you to use with any version of Scala:

// SBT
"com.lihaoyi" %% "os-lib" % "0.2.7"

// Mill
ivy"com.lihaoyi::os-lib:0.2.7"

OS-Lib also comes bundled with Ammonite, and can be used within the REPL and *.sc script files.

All functionality within this library comes from the os package, e.g. os.Path, os.read, os.list, and so on. To begin with, I will install Ammonite:

$ sudo sh -c '(echo "#!/usr/bin/env sh" && curl -L https://github.com/lihaoyi/Ammonite/releases/download/1.6.7/2.12-1.6.7) > /usr/local/bin/amm && chmod +x /usr/local/bin/amm'

And open the Ammonite REPL, using os.<tab> to see the list of available operations:

$ amm
Loading...
Welcome to the Ammonite Repl 1.6.7
(Scala 2.12.8 Java 11.0.2)
@ os.<tab>
/                                RelPath                          list
BasePath                         ResourceNotFoundException        makeDir
BasePathImpl                     ResourcePath                     move
BasicStatInfo                    ResourceRoot                     mtime
Bytes                            SeekableSource                   owner
CommandResult                    SegmentedPath                    perms
FilePath                         Shellable                        proc
...

Most of the functionality we want will be in the os.proc function. From here, we can begin our tutorial.

os.proc.call

The most common thing to do with subprocesses is to spawn one and wait for it to complete. This is done through the os.proc.call function:

os.proc(command: os.Shellable*)
  .call(cwd: Path = null,
        env: Map[String, String] = null,
        stdin: ProcessInput = Pipe,
        stdout: ProcessOutput = Pipe,
        stderr: ProcessOutput = Pipe,
        mergeErrIntoOut: Boolean = false,
        timeout: Long = Long.MaxValue,
        check: Boolean = true,
        propagateEnv: Boolean = true): os.CommandResult

os.proc.call takes a lot of optional parameters, but at its simplest you simply pass in the command you want to execute:

@ val gitStatus = os.proc("git", "status").call()
gitStatus: os.CommandResult = CommandResult(
...

This gives you a os.CommandResult object, which contains the exit code, stdout, and stderr of the completed process:

@ gitStatus.exitCode
res3: Int = 0
@ gitStatus.out.string
res4: String = """On branch master
Your branch is up to date with 'origin/master'.

Changes to be committed:
  (use "git reset HEAD <file>..." to unstage)

	new file:   post/37 - How to work with Subprocesses in Scala.md
...
@ gitStatus.err.string
res5: String = ""

Common things to customize include:

While here we are passing in Strings into os.proc(), you can also pass in Seq[String]s, Option[String]s, and os.Paths.

These can be used in a variety of ways:

Find distinct contributors in a Git repo history

We can do this in two steps: first, get the git log of the current working directory as a string:

val gitLog = os.proc("git", "log").call().out.string
gitLog: String = """commit 160bf840ec70756b1dc0ea06036ada5a9993cd7f
Author: Li Haoyi
Date:   Mon Jun 3 09:32:05 2019 +0800

    tweaks

commit 32ccf9fe457f625243abf6903fd6095ff8d825c3
Author: Li Haoyi
Date:   Mon Jun 3 09:27:14 2019 +0800
...

Next, pass the string into a separate grep "Author: " subprocess, as that process's standard input:

@ val authorLines = os.proc("grep", "Author: ").call(stdin = gitLog).out.lines
authorLines: Vector[String] = Vector(
  "Author: Li Haoyi",
  "Author: Li Haoyi",
  "Author: Li Haoyi",
  "Author: Li Haoyi",
...

Lastly, call .distinct on the lines to get the unique contributors:

@ authorLines.distinct
res15: Vector[String] = Vector(
  "Author: Li Haoyi",
  "Author: jlncrnt",
  "Author: chasets",
  "Author: n4to4",
  "Author: George Shakhnazaryan",
  "Author: Oleg Skovpen",
  "Author: Prasanna Swaminathan",
  "Author: nafg",
...

This retrieves the distinct contributors in two distinct subprocesses, one after the other, and buffering the intermediate output in memory. Later, we will see how to do so in a streaming fashion, with both subprocesses running in parallel and without storing the intermediate data in memory.

Remove non-master branches from a Git Repo

Often, someone working with git ends up creating one branch for every task they are working on. After merging that branch into master, the branch names still hang around, and it can be a bit tedious to run git branch -D over and over to remove them. Let's use Git subprocesses to help remove all branches except the current branch from the local Git repo.

To do this, first we run git branch to see the current branches, and get the output as a series of lines:

@ val gitBranches = os.proc("git", "branch").call().out.lines
gitBranches: Vector[String] = Vector(
  "  561",
  "  571",
  "  595",
  "  599",
  "  600",
  "  609",
  "* master"
)

Next, we find all the branches whose lines start with " ", and remove the whitespace:

@ val otherBranches = gitBranches.filter(_.startsWith("  ")).map(_.drop(2))
otherBranches: Vector[String] = Vector("561", "571", "595", "599", "600", "609")

Lastly, we run git branch -D on each such branch, to remove them

@ for(branch <- otherBranches) os.proc("git", "branch", "-D", branch).call()

Now, we can see the other branches have been removed, leaving only the current * master branch:

@ val gitBranches = os.proc("git", "branch").call().out.lines
gitBranches: Vector[String] = Vector("* master")

Curl to a local file

We have already seen how to configure the standard input we pass into os.proc.call. You can also redirect the output: here we use it to spawn a curl subprocess that saves its output to a local file:

@ val url = "https://api.github.com/repos/lihaoyi/mill/releases"
url: String = "https://api.github.com/repos/lihaoyi/mill/releases"

@ os.proc("curl", url).call(stdout = os.pwd / "github.json")
res7: os.CommandResult = CommandResult(
  0,
...

We can now spawn a ls -lh subprocess to get the metadata of the file we just downloaded:

@ os.proc("ls", "-lh", "github.json").call().out.string
res10: String = """-rw-r--r--  1 lihaoyi  staff   607K Jun  3 13:16 github.json
"""

Streaming Gzip

os.proc.call allows you to set both the stdin as well as stdout, using the subprocess to process data from one file to another in a streaming fashion:

@ os.proc("gzip").call(stdin = os.pwd / "github.json", stdout = os.pwd / "github.json.gz")
res11: os.CommandResult = CommandResult(0, ArrayBuffer())

@ os.proc("ls", "-lh", "github.json.gz").call().out.string
res12: String = """-rw-r--r--  1 lihaoyi  staff    23K Jun  3 13:30 github.json.gz
"""

This lets you use subprocesses to handle large files and large amounts of data without having to load either the input or the output into the host process's memory. Useful if the files are large and memory is limited.

os.proc.spawn

While os.proc.call allows you to pass concrete input data and receive concrete output data from a subprocess, and allows some degree of streaming input and output, it has one core limitation: the spawned subprocess must terminate before os.proc.call returns. This means you cannot use it to set up pipelines where two or more processes are running in parallel and feeding data into each other to process, or to start a subprocess that runs in the background for you to interact with. For these use cases, you need os.proc.spawn:

os.proc(command: os.Shellable*)
  .spawn(cwd: Path = null,
         env: Map[String, String] = null,
         stdin: os.ProcessInput = os.Pipe,
         stdout: os.ProcessOutput = os.Pipe,
         stderr: os.ProcessOutput = os.Pipe,
         mergeErrIntoOut: Boolean = false,
         propagateEnv: Boolean = true): os.SubProcess

os.proc.spawn takes a similar set of arguments as os.proc.call, but instead of returning a completed os.CommandResult, it instead returns a os.SubProcess object. This represents a subprocess that may or may not have completed, and you can interact with.

Streaming distinct contributors in a Git repo history

The first use case we will visit is to find the distinct contributors to a Git repository, using git log and grep. The earlier solution does so in a sequential fashion: running one process after the other, and accumulating the intermediate data in the host process memory in between. This can cause problems if the intermediate data is large.

To run these steps in parallel, in a streaming pipeline, you can use os.proc.spawn:

@ {
  val gitLog = os.proc("git", "log").spawn()
  val grepAuthor = os.proc("grep", "Author: ").spawn(stdin = gitLog.stdout)
  val output = grepAuthor.stdout.lines.distinct
  }
gitLog: os.SubProcess = os.SubProcess@604c7e9b
grepAuthor: os.SubProcess = os.SubProcess@70485aa
output: Vector[String] = Vector(
  "Author: Li Haoyi",
  "Author: Guillaume Galy",
  "Author: Nik Vanderhoof",
...

Here, we spawn one subprocess, and pass the stdout of gitLog into the stdin of grepAuthor. At that point, both os.SubProcesses are running in the background, one feeding into into the other. The grepAuthor subprocess exposes a grepAuthor.stdout attribute that you can use to read the output, which (similar to os.CommandResult) exposes helper methods like .string or .lines that wait for the subprocess to complete and aggregate the output.

We are aggregating the filtered output as one big list, and then call .distinct on it. We can also avoid accumulating the filtered output by iterating over the lines of stdout and directly adding it to a Set for de-duplication (here we use a LinkedHashSet to preserve the ordering of input):

@ {
  val gitLog = os.proc("git", "log").spawn()
  val grepAuthor = os.proc("grep", "Author: ").spawn(stdin = gitLog.stdout)
  val distinct = collection.mutable.LinkedHashSet.empty[String]
  while(grepAuthor.stdout.available() > 0 || grepAuthor.isAlive()){
    distinct.add(grepAuthor.stdout.readLine())
  }
  }
gitLog: os.SubProcess = os.SubProcess@6693818c
grepAuthor: os.SubProcess = os.SubProcess@a1e578f
distinct: collection.mutable.LinkedHashSet[String] = Set(
  "Author: Li Haoyi",
  "Author: Guillaume Galy",
  "Author: Nik Vanderhoof",
...

Streaming download-process-reupload

Subprocess pipelines do not need to start or end in memory, or even on your local computer. Here is an example of downloading some data from api.github.com, and re-uploading it to httpbin.org, in a streaming fashion using curl on both ends:

@ {
  val download = os.proc(
    "curl",
    "https://api.github.com/repos/lihaoyi/mill/releases"
  ).spawn()

  val upload = os.proc(
    "curl", "-X", "PUT",
    "-H", "Content-Type:application/octet-stream",
    "-d", "@-",
    "https://httpbin.org/anything"
  ).spawn(stdin = download.stdout)

  val contentLength = upload.stdout.lines.filter(_.contains("Content-Length"))
  }
download: os.SubProcess = os.SubProcess@19370af1
upload: os.SubProcess = os.SubProcess@37e967df
contentLength: Vector[String] = Vector("    \"Content-Length\": \"609216\", ")

We look at the JSON output of the final upload response to see the "Content-Length" of the output, which at 609216 bytes matches the 607kb number we saw earlier.

We can add even more stages to the pipeline if we wish, e.g. compressing the data using gzip between downloading and re-uploading it:

@ {
  val download = os.proc(
    "curl",
    "https://api.github.com/repos/lihaoyi/mill/releases"
  ).spawn()

  val gzip = os.proc("gzip").spawn(stdin = download.stdout)

  val upload = os.proc(
    "curl", "-X", "PUT",
    "-H", "Content-Type:application/octet-stream",
    "-d", "@-",
    "https://httpbin.org/anything"
  ).spawn(stdin = gzip.stdout)

  val contentLength = upload.stdout.lines.filter(_.contains("Content-Length"))
  }
download: os.SubProcess = os.SubProcess@4f45290f
gzip: os.SubProcess = os.SubProcess@56511eda
upload: os.SubProcess = os.SubProcess@3b9d85c2
contentLength: Vector[String] = Vector("    \"Content-Length\": \"17191\", ")

With the added gzip step in the pipeline, the uploaded data has been compressed from 609216 bytes to 17191 bytes. Again, all three stages of the pipeline {download,gzip,upload} are running in parallel, and at no point are we buffering the entire data set in the memory of the host process.

os.proc.spawn thus allows you to put together quite sophisticated subprocess pipelines with minimal effort, letting you perform streaming data processing with pipelined parallelism without needing to accumulate large amounts of data in memory.

Interacting with a Python process

The second big use case for os.proc.spawn is to spawn a long-lived working process which you want to keep running concurrently with the host process. This background process may not be streaming large amount of data, but simply has its own in-memory state you want to interact with.

A simple example of this is to keep a Python process running in the background that you want to interact with:

@ val sub = os.proc("python", "-u", "-c", "while True: print(eval(raw_input()))").spawn()
sub: os.SubProcess = os.SubProcess@22a7d4a2

Rather than passing in data through the stdin = argument and accumulating the output via .stdout.lines, we can instead write to the subprocess using sub.stdin.write/sub.stdin.writeLine and read from it using sub.stdout.readLine():

@ sub.stdin.write("1 + 2")

@ sub.stdin.writeLine("+ 4")

@ sub.stdin.flush()

@ sub.stdout.readLine()
res42: String = "7"

@ sub.stdin.write("'1' + '2'")

@ sub.stdin.writeLine("+ '4'")

@ sub.stdin.flush()

@ sub.stdout.readLine()
res46: String = "124"

You can also exchange binary data with the subprocess via .write/.read:

@ sub.stdin.write("1 * 2".getBytes)

@ sub.stdin.write("* 4\n".getBytes)

@ sub.stdin.flush()

@ sub.stdout.read()
res50: Int = 56

@ res50.toChar
res51: Char = '8'

When we are done with the subprocess, we can destroy it:

@ sub.isAlive()
res1: Boolean = true

@ sub.destroy()

@ sub.isAlive()
res3: Boolean = false

This usage pattern is handy in a few cases:

Conclusion

Working with subprocesses in Scala can be easy and convenient; with OS-Lib's os package, you can quickly call subprocesses with the input you have and extract the output you need, and can set up non-trivial pipelines that process data in a parallel streaming fashion. Compared to using the native java.lang.Process APIs, you can usually achieve what you want in a tiny amount of code, making the code both easier to write and maintain without getting lost in boilerplate.

Effectively working with subprocesses exposes a whole world of possibilities you do not have access to within a single monolithic JVM, and is something that all programmers should have in their toolbox.

This tutorial just gives a quick tour of how to work with subprocesses in Scala. For a more thorough reference, check out the documentation page:

If you're interested in learn about the filesystem side of OS-Lib, check out this other blog post:


About the Author: Haoyi is a software engineer, and the author of many open-source Scala tools such as the Ammonite REPL and the Mill Build Tool. If you enjoyed the contents on this blog, you may also enjoy Haoyi's book Hands-on Scala Programming


How to work with Files in ScalaHow to work with HTTP JSON APIs in Scala