Haoyi's Programming Blog

Table of Contents

Standardizing IO Interfaces for Scala Libraries

Posted 2019-12-28
Beyond Liskov: Type Safe Equality in Scala

The latest version of my open source libraries have standardized on two new interfaces - Writable and Readable - that allow efficient streaming data exchange between libraries. This blog post will explore the origin of these two interfaces, what purpose they serve, and how they can enable more efficient inter-operabilty between a wide range of different libraries and frameworks.


The Problem

The basic issue that Writable and Readable solve is that many libraries and frameworks need to exchange data with one another, but the number of ways they can do so is limited enough to cause inefficiency. For example:

These libraries are all not aware of each other, and thus you end up with only a few options to exchange data between them:

Because InputStream is often too difficult to implement, a developer using these libraries will typically end up materializing many short-lived Strings and Array[Byte]s in memory when sending data between these two libraries. While not the end of the world - the garbage collector cleans them up once they are no longer needed - this adds unnecessary overhead to do work and use memory that should really be unnecessary.

One point to note is that many libraries are both data sources as well as data sinks: uPickle generates JSON data, but it also parses it. Requests-Scala both uploads and downloads data, and Cask HTTP endpoints both receive and return data to browsers. In all these cases, it was common to materialize short lived Strings or Array[Byte]s to do the data exchange between the libraries.

Streaming Workarounds

Because materializing large short-lived Strings and Array[Byte]s in memory is wasteful and inefficient, most of these libraries had already grown a menagerie of ad-hoc ways to stream the data. For example:

def writeTo[T: Writer](t: T,
                       out: java.io.Writer,
                       indent: Int = -1,
                       escapeUnicode: Boolean = false): Unit
                       
def writeBinaryTo[T: Writer](t: T, out: java.io.OutputStream): Unit 
def writeTo(strb: java.io.Writer): Unit
requests.get.stream("https://api.github.com/events")(
  onUpload = outputStream => {...},
  onDownload = inputStream => {...}
)
fastparse.parse(Iterator("i", "am", "cow"), parser(_))
os.read.chunks(p: ReadablePath, chunkSize: Int): os.Generator[(Array[Byte], Int)]
os.read.chunks(p: ReadablePath, buffer: Array[Byte]): os.Generator[(Array[Byte], Int)]

os.read.lines.stream(arg: os.ReadablePath): os.Generator[String]
os.read.lines.stream(arg: os.ReadablePath, charSet: Codec): os.Generator[String]

Streaming Interop Complexity

All these APIs work, and serve their purpose of allowing a developer to perform streaming reads and writes in scenarios where efficient data transfer was important. However, the ad-hoc nature of the APIs meant that trying to connect these libraries together was clunky.

For example, let us imagine I wanted to upload a file directly up to a HTTP request, download the response into my JSON parser, and then modify the JSON and write the modified JSON to a file? It would look something like this:

val response = requests.put(
  "http://httpbin.org/put",
   data = os.read(os.pwd / "input.txt")
)
val json = ujson.read(response.text())
val interestingJson = json("data")
os.write(os.pwd / "output.json", interestingJson.render())

Looks easy enough. But this code does the unnecessary work of reading input.txt into an in-memory string, aggregating the HTTP response into an in-memory string for parsing into JSON, and then marshalling the JSON structure into an in-memory string before writing it to output.json. That's 3 places where we are aggregating large in-memory strings unnecessarily.

What if we wanted to do this in a streaming fashion, to avoid creating these throw-away in memory strings? It would look like this:

requests.put.stream("http://httpbin.org/put")(
  onUpload = outputStream => {
    for((buf, n) <- os.read.chunks(os.pwd / "input.txt")) {
      outputStream.write(buf, 0, n)
    }
  },
  onDownload = inputStream => {
    val json = ujson.read(java.nio.channel.Channels.newChannel(inputStream))
    val interestingJson = json("form")
    val out = os.write.outputStream(os.pwd / "output.json")
    val writer = new java.io.OutputStreamWriter(out)
    try ujson.writeTo(interestingJson, writer)
    finally {
      writer.close()
      out.close()
    }
  }
)

Doable, but clunky and verbose: even though we are conceptually doing the same thing as earlier, we end up with tons of boilerplate to deal with chunks, channels, OutputStreams and OutputStreamWriters, and so on.

This is because even though the libraries conceptually all expose the same functionality "receive streaming data" and "return streaming data", the ad-hoc nature of these APIs means that a good amount of glue code needs to be written to e.g. write a Generator[(Array[Byte], Int)] into a java.io.OutputStream for upload, turn the download java.io.InputStream into a java.nio.channel.Channel so it can be parsed into JSON, and wrap the OutputStream of a file in a java.io.Writer so the modified JSON can be streamed into it.

With the standard Readable and Writable interfaces introduced in the latest versions of all these libraries, this entire flow is as simple as:

val response = requests.put.stream(
  "http://httpbin.org/put",
   data = os.read.stream(os.pwd / "input.txt")
)
val json = ujson.read(response)
val interestingJson = json("data")
os.write(os.pwd / "output.json", interestingJson)

Performing these streaming workflows becomes just as easy as the previous batch equivalents: we simply need to add some .streams, but otherwise the code is almost identical. You can try this out yourself in version 2.0.0 of the Ammonite Scala REPL:

Welcome to the Ammonite Repl 2.0.0
(Scala 2.13.1 Java 11.0.2)
If you like Ammonite, please support our development at www.patreon.com/lihaoyi
@ val response = requests.put.stream(
    "http://httpbin.org/put",
     data = os.read.stream(os.pwd / "input.txt")
  )

response: geny.Readable = requests.Requester$$anon$1@4b770e40

@ val json = ujson.read(response)
json: ujson.Value.Value = Obj(
  LinkedHashMap(
    "args" -> Obj(LinkedHashMap()),
    "data" -> Str("hello"),
...

@ val interestingJson = json("data")
interestingJson: ujson.Value = Str("hello")

@ os.write(os.pwd / "output.json", interestingJson)

@ os.read(os.pwd / "output.json")
res4: String = "\"hello\""

Readable and Writable

The way this works is that rather than every library implementing their own ad-hoc way of receiving or returning streaming data, they all standardize on the following two interfaces:

trait Writable{
  def writeBytesTo(out: OutputStream): Unit
}

trait Readable extends Writable{
  def readBytesThrough[T](f: InputStream => T): T
  def writeBytesTo(out: OutputStream): Unit = readBytesThrough(Internal.transfer(_, out))
}

These two minimal interfaces encapsulate the existing ad-hoc conventions that the various methods that library data types have already grown to manage streaming data, and categorize them into two groups:

Note that Readable doesn't provide an InputStream directly, but only within the callback to readBytesThrough. This allows the data source to perform any necessary cleanup actions after the reading is complete: closing files, terminating HTTP connections, and so on.

Push v.s. Pull

Writable and Readable essentially categorize data sources into pull-based and push-based data sources. This idea of "pull" v.s. "push" is core to the idea of streaming data exchange:

Any pull-based Readable can be trivially used as a push-based Writable, and any method that can receive a push-based Writable can also receive a pull-based Readable. This is reflected in type inheritence hierarchy, with trait Readable extends Writable.

Migrating APIs to Readable and Writable

Libraries can both produce as well as receive instances of Readable and Writable. For example, the old requests.stream API that takes callbacks for both streaming upload and download:

def stream(...)(onUpload: OutputStream => Unit,
                onDownload: InputStream => Unit): Unit

Has been replaced by a method that receives a Writable and returns a Readable:

def stream(data: Writable): Readable

This makes it clear that streaming uploads can use any data source - push or pull - and in return provides a streaming download that is a pull-based source suitable for use in APIs that require pull-based streams like ujson.read or fastparse.parse

In general, Writable and Readable do not attempt to modify how the various libraries work internally: they simply apply a standard interface on libraries with similar properties so they can be used interchangeably. Trying to refactor Scalatags or uPickle to become pull-based data sources that expose InputStreams, or trying to refactor FastParse to allow "push parsing" or "async parsing", is beyond the scope of this standardization.

Seamless Streaming Interop

Now that we have seen how Readable and Writable work, we can re-visit the cleaned up streaming code snippet for earlier and understand how these interfaces allow the seamless interoperability between libraries:

val response = requests.put.stream(
  "http://httpbin.org/put",
  data = os.read.stream(os.pwd / "input.txt")
)
val json = ujson.read(response)
val interestingJson = json("data")
os.write(os.pwd / "output.json", interestingJson)

Essentially, the (simplified) signatures we are looking at are:

os.read.stream(p: os.Path): Readable
requests.put.stream(url: String, data: Writable): Readable
ujson.read(data: Readable): ujson.Value extends Writable
os.write(p: os.Path, data: Writable): Unit

Thus, because all our library methods accept and return Readable/Writable data types, we can seamlessly pass data from one library to another in a streaming fashion without needing any boilerplate to convert data back and forth.

Writable and Readable are provided by the tiny Geny library, and are now broadly supported by all libraries I maintain:

The above listing of which methods accept or return Writable and Readable tells you exactly which streaming workflows are possible: streaming JSON to disk, streaming files on disk to Requests-Scala HTTP requests or Cask HTTP responses, using FastParse directly on a file without loading the whole in memory, and so on.

You can easily put together pretty elaborate streaming data processing workflows due to the standardization. For example, here is a snippet that streams data from one HTTP service to another, streams the data into a JSON parser, selects a portion of the parsed JSON, and streams the JSON structure to a wc subprocess that performs a word count:

val events = requests.get.stream("https://api.github.com/events")
val httpBinResponse = requests.post.stream("https://httpbin.org/post", data = events)
val mangledJson = ujson.read(httpBinResponse).apply("form")
val wordCount = os.proc("wc").call(stdin = mangledJson).out.text()

These streaming workflows would previously have been a lot of error-prone boilerplate: we are orchestrating multiple web services, a subprocess, along with some in-memory processing, all in a streaming fashion. But thanks to the standard Writable/Readable interfaces creating them is now just as easy as the previous batch-oriented workflows that allocated temporary Strings or Array[Byte]s: you simply plug your method calls together, and the compiler makes sure the push/pull-based nature of your streams line up and everything streams smoothly at run time.

Conclusion

In this blog post, we have seen how two tiny interfaces, Writable and Readable, allow our wide range of libraries to seamlessly exchange data in a streaming fashion. We have also seen how these two interfaces simply standardize the existing conventions that the various libraries already provide for streaming IO, allowing them to interoperate seamlessly using the functionality they already have built in.

Writable and Readable, despite being tiny interfaces, reflect a deep structure of how streaming data works:

Pull-based data sources are harder to write than push-based data sources, while push-based data processors are harder to write than pull-based data processors. The Writable and Readable interfaces do not attempt to force you one way or the other, and instead allow you to declare what kinds of data sources and processors you have so the compiler can check to make sure your data sources and data processors line up.

The Geny library is tiny and stable, and can be depended upon without any risk of breakage. If you happen to maintain libraries that you would like to have seamless streaming interop with the broader Scala ecosystem, I encourage you to accept and implement the Writable and Readable interfaces in your own code!


About the Author: Haoyi is a software engineer, an early contributor to Scala.js, and the author of many open-source Scala tools such as the Ammonite REPL and FastParse.

If you've enjoyed this blog, or enjoyed using Haoyi's other open source libraries, please chip in (or get your Company to chip in!) via Patreon so he can continue his open-source work


Beyond Liskov: Type Safe Equality in Scala

Updated 2019-12-29 2019-12-29 2019-12-29 2019-12-29 2019-12-29 2019-12-28 2019-12-28