Haoyi's Programming Blog

Table of Contents

Easy Parallel Programming with Scala Futures

Posted 2019-09-03
How to create Build Pipelines in ScalaBuild your own Programming Language with Scala

The Scala programming language comes with a Futures API. Futures make parallel programming much easier to handle than working with traditional techniques of threads, locks, and callbacks. This blog post dives into Scala's Futures: how to use them, how they work, and how they can give you much more flexibility to leverage parallelism in your code.


About the Author: Haoyi is a software engineer, and the author of many open-source Scala tools such as the Ammonite REPL and the Mill Build Tool. If you enjoyed the contents on this blog, you may also enjoy Haoyi's book Hands-on Scala Programming


The easiest way to start trying out Futures is via the Ammonite Scala REPL. To begin with, I will install Ammonite:

$ sudo sh -c '(echo "#!/usr/bin/env sh" && curl -L https://github.com/lihaoyi/Ammonite/releases/download/1.6.8/2.13-1.6.8) > /usr/local/bin/amm && chmod +x /usr/local/bin/amm'

And then open the REPL using amm --class-based:

$ amm --class-based
Loading...
Welcome to the Ammonite Repl 1.6.8
(Scala 2.13.0 Java 11.0.2)
@ scala.concurrent.<tab>
Await                            Channel                          Promise
AwaitPermission                  DelayedLazyVal                   SyncChannel
Awaitable                        ExecutionContext                 SyncVar`
BatchingExecutorStatics          ExecutionException               blocking
BlockContext                     Future                           duration
...

@ import scala.concurrent._, ExecutionContext.Implicits.global, duration._
import scala.concurrent._

Note the --class-based flag, which is necessary in order to use Futures in the REPL. This flag is not necessary when using Scala in larger projects (e.g. built with Mill or SBT)

Creating Futures

The basic building block is scala.concurrent.Future, which we have imported under as simply Future:

@ Future
res7: Future.type = scala.concurrent.Future$@2cdb1581

Futures define lightweight tasks that run on a thread pool, performing some computation, and returning a value. We imported the default thread pool above (ExecutionContext.Implicits.global) but you can also define your own thread pool if you want to customize it.

To create a future, use the Future{ ... } syntax:

@ val f = Future{ "hello" + 123 + "world" }
f: Future[String] = Future(Success(hello123world))

Here we can see that the Future is already completed when printed out. That makes sense, as the computation is simple and would complete almost instantly. We create Futures that take a bit longer using Thread.sleep:

@ val f = Future{ Thread.sleep(10000); "hello" + 123 + "world" }
f: Future[String] = Future(<not completed>)

Here, we can see that the Future is listed as <not completed>, which makes sense since it should take about 10,000 milliseconds to run. However, the line of code val f = completes instantly! This is because when we create a Future, it runs in the background on a thread pool, and we can continue doing other things while it is working.

Awaiting Futures

While you can easily make Futures that perform their work via side effects - writing to disk, storing data in global variables, etc. - the simplest and most common case is for a Future's computation to return a single value at the end of the Future{ ... } block. When we want the return value of the Future, we use Await.result to get it:

@ val f = Future{ Thread.sleep(10000); "hello" + 123 + "world" }
f: Future[String] = Future(<not completed>)

@ Await.result(f, Duration.Inf)
res17: String = "hello123world"

Await.result waits for a future to complete before extracting its value. Above, you should see the val f = complete instantly, but the aAwait.result should have waited about 10 seconds (10,000 milliseconds) before the future became ready.

Even just creating a single Future to run in the background can be useful, as you do not need to call Await.result immediately: instead, you can have your code do other things while the Future is running, and only calling Await.result when it's other work is done and it finally needs to make use of the value returned from the Future.

Creating Multiple Futures

The fact that Futures run in the background means you can run multiple Futures running at once. Consider the following code, where slowFunction is a stand-in for some compute-intensive operation:

@ {
    def slowFunction(i: Int) = { Thread.sleep(10000); "hello" + i + "world" }
    println(slowFunction(123))
    println(slowFunction(456))
  }

Here, we expect this to take about 20 seconds to run, because each call to slowFunction happens sequentially. We can verify this using the time{} REPL builtin function:

@ time{
    def slowFunction(i: Int) = { Thread.sleep(10000); "hello" + i + "world" }
    println(slowFunction(123))
    println(slowFunction(456))
  }
hello123world
hello456world
res21: (Unit, FiniteDuration) = ((), 20005278617 nanoseconds)

Here, we can see it took 20,005,278,617 nanoseconds, or about 20 seconds, as expected.

Using Futures, we can instead spin off multiple background processes, running the two 10-second slowFunction calls in parallel, and waiting for their results together:

@ time{
    def slowFunction(i: Int) = { Thread.sleep(10000); "hello" + i + "world" }
    val f1 = Future{ slowFunction(123) }
    val f2 = Future{ slowFunction(456) }
    println(Await.result(f1, Duration.Inf))
    println(Await.result(f2, Duration.Inf))
  }
hello123world
hello456world
res22: (Unit, FiniteDuration) = ((), 10003783961 nanoseconds)

This time we'll see hello123world and hello456world printed almost simultaneously after 10 secnds, since the two Future{ slowFunction(...) } calls run parallel in the background.

Parallel Computation

While working with dummy Thread.sleep functions is simple, it doesn't reflect what we really do day-to-day as programmers, so let us consider a slightly more realistic example: I want to run an expensive hash function (BCrypt) over a bunch of files. In this case, over every file in the post/ folder of this blog:

@ {
  import $ivy.`org.springframework.security:spring-security-crypto:5.1.6.RELEASE`
  import org.springframework.security.crypto.bcrypt.BCrypt
  val (hashes, duration) = time{
    val base64 = java.util.Base64.getEncoder()
    for(p <- os.walk(os.pwd / "post") if os.isFile(p)) yield {
      println(p)
      BCrypt.hashpw(base64.encodeToString(os.read.bytes(p)), BCrypt.gensalt())
    }
  }
  }
/Users/lihaoyi/Github/blog/post/9 - Micro-optimizing your Scala code.md
/Users/lihaoyi/Github/blog/post/24 - How to conduct a good Programming Interview.md
/Users/lihaoyi/Github/blog/post/23 - Scala Vector operations aren't "Effectively Constant" time.md
...
/Users/lihaoyi/Github/blog/post/37 - How to work with Subprocesses in Scala.md
/Users/lihaoyi/Github/blog/post/18 - What's Functional Programming All About?.md
/Users/lihaoyi/Github/blog/post/ammonite-releases.json

hashes: IndexedSeq[String] = ArraySeq(
  "$2a$10$soScdNKZkK97H3U9Y.E7keBUBRNdapcmWCuiwrEsqhzHofq7L7Ut.",
  "$2a$10$xa7lXv1plFSNgSeHk27/b.uXAIcZ1pITRttO9oH0pucIfqD4v8IJq",
  "$2a$10$WPLPwIp39z6rrj16RB/HUenJUrgc8WngOEdhBwm/I5NYDA/H1wV8i",
...
duration: FiniteDuration = 15214007639 nanoseconds

Here, we use import $ivy to pull in the BCrypt implementation from org.sprintframework.security, and use that on every file in the os.pwd / "post" folder. Note that we have to base64-encode the binary contents of each file into a string before hashing, since this particular BCrypt library can only work with strings and not arbitrary byte arrays.

In this case, we can see that the process overall took about 15 seconds (15,214,007,639 nanoseconds). Let's see how long it takes running the computations in parallel with Futures:

     val base64 = java.util.Base64.getEncoder()
-    for(p <- os.walk(os.pwd / "post") if os.isFile(p)) yield {
+    val futures = for(p <- os.walk(os.pwd / "post") if os.isFile(p)) yield Future{
     println(p)
     }
+    futures.map(Await.result(_, Duration.Inf))
   }
@
{
  import $ivy.`org.springframework.security:spring-security-crypto:5.1.6.RELEASE`
  import org.springframework.security.crypto.bcrypt.BCrypt
  val (hashes, duration) = time{
    val base64 = java.util.Base64.getEncoder()
    val futures = for(p <- os.walk(os.pwd / "post") if os.isFile(p)) yield Future{
      println(p)
      BCrypt.hashpw(base64.encodeToString(os.read.bytes(p)), BCrypt.gensalt())
    }
    futures.map(Await.result(_, Duration.Inf))
  }
  }
/Users/lihaoyi/Github/blog/post/9 - Micro-optimizing your Scala code.md
/Users/lihaoyi/Github/blog/post/24 - How to conduct a good Programming Interview.md
/Users/lihaoyi/Github/blog/post/23 - Scala Vector operations aren't "Effectively Constant" time.md
...
/Users/lihaoyi/Github/blog/post/37 - How to work with Subprocesses in Scala.md
/Users/lihaoyi/Github/blog/post/18 - What's Functional Programming All About?.md
/Users/lihaoyi/Github/blog/post/ammonite-releases.json

hashes: IndexedSeq[String] = ArraySeq(
  "$2a$10$1kb1dETuW.12Uby.Edt0Gecj9.kEv2eDuZIxZTqwqtjG8yGnovUSC",
  "$2a$10$V8eEKaorYm9/L.zCOsFv0uQ7zcTInuveKLYFnFMk/OHxIGHqv.ENa",
  "$2a$10$qXd/OAKb7kBQjzTxEvo3RuZw.71iFskTRo9MJZiIwdThQsc00O31C",
...
duration: FiniteDuration = 2542520755 nanoseconds

Note that while earlier we had two Futures which we Awaited on individually, now we have a whole list of Futures (val futures) that we map over to await on all of them.

Here, we can see that the total is down from 15 seconds to 2.5 seconds (2,542,520,755 nanoseconds). This is about expected, given the Futures running on a background threadpool should be able to utilize all CPU cores in order to run in parallel. In many cases, using Futures is an easy way to make use of parallelism for minimal extra effort: here a three-line code change!

Unlike Threads, which are relatively expensive (you generally want to stay at <1000 threads for performance/memory reasons), Futures are cheap - you can easily have 100,000s or 1,000,000s of them without issue - so you generally do not need to worry about how many you allocate.

Parallel HTTP requests

Apart from CPU-bound tasks, Futures can be useful for parallelizing slow network-bound operations as well.

A Single HTTP Request

For example, consider the following code to make a HTTP request and fetch all the links on a Wikipedia page:

@ {
  val resp = requests.get(
    "https://en.wikipedia.org/w/api.php",
    params = Seq(
      "action" -> "query",
      "titles" -> "Albert Einstein",
      "prop" -> "links",
      "format" -> "json"
    )
  )
  val links = ujson.read(resp.text()).render(indent = 2)
  }
links: String = """{
  "continue": {
    "plcontinue": "736|0|Absorption_refrigerator",
    "continue": "||"
  },
  "query": {
    "pages": {
      "736": {
        "pageid": 736,
        "ns": 0,
        "title": "Albert Einstein",
        "links": [
          {
            "ns": 0,
            "title": "20th Century Press Archives"
          },
          {
            "ns": 0,
            "title": "2dF Galaxy Redshift Survey"
          },
...

We can extract the actual list of links as follows:

@ {
  val resp = requests.get(
    "https://en.wikipedia.org/w/api.php",
    params = Seq(
      "action" -> "query",
      "titles" -> "Albert Einstein",
      "prop" -> "links",
      "format" -> "json"
    )
  )
  val links = ujson
    .read(resp.text())("query")("pages")
    .obj
    .values
    .filter(_.obj.contains("links"))
    .flatMap(_("links").arr).map(_("title").str)
    .toSeq
  }

links: Seq[String] = Seq(
  "20th Century Press Archives",
  "2dF Galaxy Redshift Survey",
  "A priori and a posteriori",
  "Aage Bohr",
  "Aarau",
  "Aargau",
  "Abba Eban",
  "Abdominal aortic aneurysm",
  "Abdus Salam",
  "Absent-minded professor"
)

Here, links is a list of page-titles, which we can then feed back into the "titles" param of the call to api.php to fetch that page's metadata. For simplicity, let's ignore the fact that Wikipedia only returns the first 10 links on each page by default.

Sequential HTTP Requests

If we wanted to fetch all the outgoing links from those pages, we could simply repeat the code in a loop:

@ val (nextLinks, duration) = time{
    for (link <- links) yield {
      println(link)
      val resp = requests.get(
        "https://en.wikipedia.org/w/api.php",
        params = Seq(
          "action" -> "query",
          "titles" -> "Albert Einstein",
          "prop" -> "links",
          "format" -> "json"
        )
      )
      ujson
        .read(resp.text())("query")("pages")
        .obj
        .values
        .filter(_.obj.contains("links"))
        .flatMap(_("links").arr).map(_("title").str)
    }
  }
20th Century Press Archives
2dF Galaxy Redshift Survey
A priori and a posteriori
...
Abdominal aortic aneurysm
Abdus Salam
Absent-minded professor

nextLinks: Seq[Iterable[String]] = List(
  View(
    "20th Century Press Archives",
    "2dF Galaxy Redshift Survey",
    ...
    "Abdus Salam",
    "Absent-minded professor"
  ),
  View(
    "20th Century Press Archives",
    "2dF Galaxy Redshift Survey",
    ...
    "Abdominal aortic aneurysm",
    "Abdus Salam",
...
duration: FiniteDuration = 4125358026 nanoseconds

Here, we see the process took about 4 seconds (4,125,358,026 nanoseconds). Each of the 10 page's links are fetched in sequential HTTP requests, one after the other.

Parallel HTTP using Futures

With Futures, we can do some of this fetching in parallel:

@ val (nextLinks, duration) = time{
    val futures = for (link <- links) yield Future{
      println(link)
      val resp = requests.get(
        "https://en.wikipedia.org/w/api.php",
        params = Seq(
          "action" -> "query",
          "titles" -> "Albert Einstein",
          "prop" -> "links",
          "format" -> "json"
        )
      )
      ujson
        .read(resp.text())("query")("pages")
        .obj
        .values
        .filter(_.obj.contains("links"))
        .flatMap(_("links").arr).map(_("title").str)
        .toSeq
    }
    futures.map(Await.result(_, Duration.Inf))
  }
A priori and a posteriori
Absent-minded professor
Abdus Salam
...
20th Century Press Archives
Aargau
2dF Galaxy Redshift Survey

nextLinks: Seq[Iterable[String]] = List(
  Seq(
    "20th Century Press Archives",
    "2dF Galaxy Redshift Survey",
    ...
    "Abdus Salam",
    "Absent-minded professor"
  ),
  Seq(
    "20th Century Press Archives",
    "2dF Galaxy Redshift Survey",
    ...
    "Abdominal aortic aneurysm",
    "Abdus Salam",
...
duration: FiniteDuration = 622621511 nanoseconds

Again, we are spinning off a new Future for each link in links, and then maping over the list of Futures with Await.result to get the final output. From 4 seconds, the total duration has dropped to about 0.6 seconds

Note that while the actual requests may happen in different orders due to parallelism (e.g. where previously we saw "20th Century Press Archives" being fetched first, this time it's "A priori and a posteriori") the results are aggregated back into the order you requested them in before being returned. That means that as long as your Futures do not perform side effects and simply return a single return value, you do not need to worry at all about the fact that they are running in parallel in arbitrary orders.

Writing a Parallel Web Crawler

So far we have learned to use Futures in a variety of ways: running code in the background, running two background tasks in parallel, and then running arbitrary lists of parallel tasks (whether compute- or network-bound) and aggregating their results. We will now leverage these skills to show how Futures can solve one common task (and a common interview question!): writing a parallel web crawler.

Let us assume task is as follows:

Given the title of a page on Wikipedia (e.g. "Albert Einstein"), write a program that will fetch the set of all pages within a certain number depth of links from the root page, and for performance do so in parallel.

There are several ways of approaching this exercise, which is essentially to implement a traversal of the graph of Wikipedia pages and the links between them. You can do the traversal either breadth-first or depth-first, and then there are many ways you could implement parallelism. For now I will consider just one approach: a breadth first traversal, parallelized using Futures.

To begin with, let us take the code we had earlier to fetch the links from a single Wikipedia page, and put it into a function:

@ def fetchLinks(title: String): Seq[String] = {
    val resp = requests.get(
      "https://en.wikipedia.org/w/api.php",
      params = Seq(
        "action" -> "query",
        "titles" -> title,
        "prop" -> "links",
        "format" -> "json"
      )
    )
    ujson
      .read(resp.text())("query")("pages")
      .obj
      .values
      .filter(_.obj.contains("links"))
      .flatMap(_("links").arr).map(_("title").str)
      .toSeq
  }
defined function fetchLinks

We can now call this one any page we'd like:

@ fetchLinks("Albert Einstein")
res54: Seq[String] = List(
  "20th Century Press Archives",
  "2dF Galaxy Redshift Survey",
  "A priori and a posteriori",
  "Aage Bohr",
  "Aarau",
  "Aargau",
  "Abba Eban",
  "Abdominal aortic aneurysm",
  "Abdus Salam",
  "Absent-minded professor"
)
@ fetchLinks("Singapore")
res53: Seq[String] = List(
  "+65",
  ".sg",
  "126 Squadron, Republic of Singapore Air Force",
  "16th Summit of the Non-Aligned Movement",
  "1915 Singapore Mutiny",
  "1954 National Service riots",
  "1959 Singaporean general election",
  "1962 Merger Referendum of Singapore",
  "1964 race riots in Singapore",
  "1969 race riots of Singapore"
)

Sequential Crawling

Next, we will write the function that does a simple breadth-first traversal of the page-link graph:

@ def fetchAllLinks(startTitle: String, depth: Int) = {
    val seen = collection.mutable.Set(startTitle)
    var currentTitles = Set(startTitle)
    for(i <- 0 until depth){
      val nextTitleLists = for(title <- currentTitles) yield fetchLinks(title)
      currentTitles = nextTitleLists.flatten.filter(!seen.contains(_))
      currentTitles.foreach(seen.add)
    }
    seen.toSet
  }

Now we can call this on various pages, traversing the page-link graph to various depths:

@ fetchAllLinks("Singapore", 0)
res73: Set[String] = Set("Singapore")
@ fetchAllLinks("Singapore", 1)
res74: Set[String] = HashSet(
  "1962 Merger Referendum of Singapore",
  "1954 National Service riots",
  "16th Summit of the Non-Aligned Movement",
  "126 Squadron, Republic of Singapore Air Force",
  "+65",
  "1969 race riots of Singapore",
  "1915 Singapore Mutiny",
  "Singapore",
  "1964 race riots in Singapore",
  "1959 Singaporean general election",
  ".sg"
)
@ fetchAllLinks("Singapore", 2)
res75: Set[String] = HashSet(
  "Telephone numbers in Singapore",
  "14th G-15 summit",
  "2007 Iranian petrol rationing riots",
  "Cougar",
  "1915 Singapore Mutiny",
  "1895 Singaporean Municipal Commission election",
  ".ai",
  "1892 Singaporean Municipal Commission election",
  "Abdelillah Benkirane",
  "A. K. Fazlul Huq",
  "2013 Little India riots",
  ".ad",
  "1962 Merger Referendum of Singapore",
  "1889 Singaporean Municipal Commission election",
  "Eurocopter AS532 Cougar",
  "1894 Singaporean Municipal Commission election",
...

Here we do not maintain an explicit Queue of links to process, and instead simply process all depth 1 links, then all depth 2 links, etc. until we get to the depth we want.

Parallel Crawling

Adding parallelism is as simple as performing each batch of fetches in parallel using Future{ ... }, then aggregating the results using Await.result:

-       val nextTitleLists = for(title <- currentTitles) yield fetchLinks(title)
+       val futures = for(title <- currentTitles) yield Future{ fetchLinks(title) }
+       val nextTitleLists = futures.map(Await.result(_, Duration.Inf))

The final code looks like this:

@ def fetchAllLinksParallel(startTitle: String, depth: Int) = {
    val seen = collection.mutable.Set.empty[String]
    var currentTitles = Set(startTitle)
    for(i <- 0 until depth){
      val futures = for(title <- currentTitles) yield Future{ fetchLinks(title) }
      val nextTitleLists = futures.map(Await.result(_, Duration.Inf))
      currentTitles = nextTitleLists.flatten.filter(!seen.contains(_))
      currentTitles.foreach(seen.add)
    }
    seen.toSet
  }

We can then use this function the same way as the non-parallel version, returning the same results:

@ fetchAllLinksParallel("Singapore", 0)
res79: Set[String] = Set("Singapore")
@ fetchAllLinksParallel("Singapore", 1)
res80: Set[String] = HashSet(
  "1962 Merger Referendum of Singapore",
  "1954 National Service riots",
  "16th Summit of the Non-Aligned Movement",
  "126 Squadron, Republic of Singapore Air Force",
  "+65",
  "1969 race riots of Singapore",
  "1915 Singapore Mutiny",
  "Singapore",
  "1964 race riots in Singapore",
  "1959 Singaporean general election",
  ".sg"
)
@ fetchAllLinksParallel("Singapore", 2)
res81: Set[String] = HashSet(
  "Telephone numbers in Singapore",
  "14th G-15 summit",
  "2007 Iranian petrol rationing riots",
  "Cougar",
  "1915 Singapore Mutiny",
  "1895 Singaporean Municipal Commission election",
  ".ai",
  "1892 Singaporean Municipal Commission election",
  "Abdelillah Benkirane",
  "A. K. Fazlul Huq",
  "2013 Little India riots",
  ".ad",
  "1962 Merger Referendum of Singapore",
  "1889 Singaporean Municipal Commission election",
  "Eurocopter AS532 Cougar",
  "1894 Singaporean Municipal Commission election",
...

In all cases the output of the two functions is identical:

@ fetchAllLinks("Singapore", 2) == fetchAllLinksParallel("Singapore", 2)
res102: Boolean = true

@ fetchAllLinks("Singapore", 3) == fetchAllLinksParallel("Singapore", 3)
res103: Boolean = true

@ fetchAllLinks("Singapore", 4) == fetchAllLinksParallel("Singapore", 4)
res104: Boolean = true

Except the parallel version proceeds significantly faster:

@ time{fetchAllLinks("Singapore", 2)}._2
res82: FiniteDuration = 4719789996 nanoseconds

@ time{fetchAllLinksParallel("Singapore", 2)}._2
res83: FiniteDuration = 1342978751 nanoseconds
@ time{fetchAllLinks("Singapore", 3)}._2
res96: FiniteDuration = 31061249346 nanoseconds

@ time{fetchAllLinksParallel("Singapore", 3)}._2
res97: FiniteDuration = 4569134866 nanoseconds
@ time{fetchAllLinks("Singapore", 4)}._2
res98: FiniteDuration = 102334529065 nanoseconds

@ time{fetchAllLinksParallel("Singapore", 4)}._2
res99: FiniteDuration = 13377793844 nanoseconds

At depth 2, the parallel crawler takes 1.3s instead of 4.7s. At depth 3, the parallel crawler takes 4.6s instead of 31.0s. And at depth 4, the parallel crawler takes 13.3s instead of 102.3s. This could be a significant amount of speedup if your web-crawler is to be used in real-world scenarios, requiring only a three-line code change to enable parallelism!

As written, the amount of parallelism we can get is still sub-optimal: If one HTTP request takes a long time, all other parallel HTTP requests from that batch may have completed, but the next batch of requests cannot start even though some of the pages are already known and some of the threads in the thread pool are already free. Improving fetchAllLinksParallel to better handle this case is left as an exercise to the reader.

Conclusion

In this blog post, we have introduced the usage of Scala's Futures as a way of easily starting parallel background tasks and aggregating their results. We have covered:

This blog post intentionally does not cover all there is to know about Futures: apart from parallel programming, Futures also are a good model for asynchronous code, which is useful in event-based code or especially high-performance/high-concurrency scenarios. Even regarding parallelism, there is a lot you can learn about configuring the threadpools and ExecutionContexts, and controlling parallelism so e.g. you do not get rate-limited by the service you are querying.

Nevertheless, for many common use cases, what has been presented here is enough. Hopefully this blog post gives you a good feel for what Futures are, and how they can be used to simplify parallel programming to make parallelism easy and intuitive.

The complete code listing for the parallel web crawler is reproduced below:

@ {
  import scala.concurrent._, ExecutionContext.Implicits.global, duration._

  def fetchLinks(title: String): Seq[String] = {
    val resp = requests.get(
      "https://en.wikipedia.org/w/api.php",
      params = Seq(
        "action" -> "query",
        "titles" -> title,
        "prop" -> "links",
        "format" -> "json"
      )
    )
    ujson
      .read(resp.text())("query")("pages")
      .obj
      .values
      .filter(_.obj.contains("links"))
      .flatMap(_("links").arr).map(_("title").str)
      .toSeq
  }

  def fetchAllLinksParallel(startTitle: String, depth: Int) = {
    val seen = collection.mutable.Set.empty[String]
    var currentTitles = Set(startTitle)
    for(i <- 0 until depth){
      val futures = for(title <- currentTitles) yield Future{ fetchLinks(title) }
      val nextTitleLists = futures.map(Await.result(_, Duration.Inf))
      currentTitles = nextTitleLists.flatten.filter(!seen.contains(_))
      currentTitles.foreach(seen.add)
    }
    seen.toSet
  }

  val fetched = fetchAllLinksParallel("Singapore", 3)
  }

fetched: Set[String] = HashSet(
  "1895 Singaporean Municipal Commission election",
  "November 2015 Paris attacks",
  ".ai",
  "1948 Arab\u2013Israeli War",
  "Fort Canning Hill",
  "10th G-15 summit",
  "1892 Singaporean Municipal Commission election",
  "1953 Iranian coup d'\u00e9tat",
  "Airbus A321",
  "Iraq",
  "Abdelillah Benkirane",
  "Wikipedia:Maintenance",
  "Fansur",
  "Counter-Terrorism",
  "1981 Thai military rebellion",
  "1925 Iranian Constituent Assembly election",
  "A. K. Fazlul Huq",
  "Abdelilah Benkirane",
  "1973 Thai popular uprising",
  "Celcom",
  "Wikipedia:Categorization",
  "Ayer Rajah",
...

About the Author: Haoyi is a software engineer, and the author of many open-source Scala tools such as the Ammonite REPL and the Mill Build Tool. If you enjoyed the contents on this blog, you may also enjoy Haoyi's book Hands-on Scala Programming


How to create Build Pipelines in ScalaBuild your own Programming Language with Scala

Updated 2019-09-03