- 26.1 Correctness and Performance Issues with Blocking
- 26.2 Callbacks
- 26.3 Higher-Order Functions on Futures
- 26.4 Function flatMap on Futures
- 26.5 Illustration: Parallel Server Revisited
- 26.6Functional-Concurrent Programming Patterns
- 26.7 Summary
26.6Functional-Concurrent Programming Patterns
Both futures, on the one hand, and higher-order functions, on the other hand, are powerful abstractions. Together, they form a potent combination, though one that can take some effort to master. Even so, it is a worthwhile effort. This section illustrates a few guidelines you should keep in mind as you venture into functional-concurrent programming.
flatMap as an Alternative to Blocking
Higher-order functions are abstractions for code you don’t have to write. They are convenient but could often be replaced with handwritten implementations. Ifoptis an option, for instance,opt.map(f)could also be written:
Scala
optmatch caseSome(value) => Some(f(value))caseNone => None
In the case of futures, however, higher-order functions are an alternative to computations that would be hard to implement directly. Iffutis a future, what can you replacefut.map(f)with? A future cannot simply be “opened” to access its value, since the value may not yet exist. Short of creating—and blocking—additional threads, there is no alternative to using higher-order functions to act inside a future.
You can leverage your functional programming skills with higher-order functions when working with futures. Earlier, for instance, we usedflatMapon options to chain computations that may or may not produce a value. You can useflatMapin a similar way on futures to chain computations that may or may not be asynchronous. Instead of “optional” stages, fromAtoOption[B], you define asynchronous stages, as functions fromAtoFuture[B].
As an illustration, the three optional functions used in Section 10.3 can be changed to represent asynchronous steps:
Scala
defparseRequest(request: Request): Future[User] = ...defgetAccount(user: User): Future[Account] = ...defapplyOperation(account: Account, op: Operation): Future[Int] = ...
The steps can then be chained usingflatMap:
Scala
Listing 26.9: A pipeline of futures usingflatMap.
parseRequest(request) .flatMap(user => getAccount(user)) .flatMap(account => applyOperation(account, op))
The expression in Listing 26.9 isexactlythe same as that in Listing 10.5, except that it produces a value of typeFuture[Int]instead ofOption[Int].
Uniform Treatment of Synchronous and Asynchronous Computations
You could mix synchronous and asynchronous operations by combining steps of typeA => B—usingmap—and steps of typeA => Future[B]—usingflatMap. Instead, it is often more convenient to use only steps of the formA => Future[B]combined withflatMap. When needed, synchronous steps can be implemented as already completed futures. This design increases flexibility: It makes it easier to replace synchronous steps with asynchronous steps, and vice versa.
For instance, if accounts are simply stored in a map, thegetAccountfunction from the earlier example can be implemented synchronously, within the calling thread:
Scala
valallAccounts: Map[User, Account] = ...defgetAccount(user: User): Future[Account] = Future.successful(allAccounts(user))
This function returns an already completed future and does not involve any additional thread. If a need to fetch accounts asynchronously then arises, you can reimplement the function without modifying its signature, and leave all the code that uses it—such as Listing 26.9—unchanged.
Functional Handling of Failures
Exceptions are typically thrown and caught within a thread. They don’t naturally travel from thread to thread, and they are ill suited for multithreaded programming. Instead, you are better off following the functional approach to error handling discussed in Chapter 13.
An added benefit of relying on computations of typeA => Future[B]instead ofA => Bis that futures can also carry failures—in Scala, you can think ofFutureas an asynchronousTry. For example, you can improve thegetAccountfunction by making sure it always produces a future, even when a user is not found:
Scala
defgetAccount(user: User): Future[Account] = Future.fromTry(Try(allAccounts(user)))
This way, an expression likegetAccount(user).onComplete(...)still executes a callback action, which is not true ifgetAccountthrows an exception. Failed futures can be handled functionally, using dedicated functions such asrecoverin Scala orexceptionallyin Java.
For simplicity, the connection-handling function from Listing 26.8 does not deal with errors. You could use standard future functions to add robustness to the server. For instance, failure to create a page could be handled by transforming thepageFfuture:
Scala
valsafePageF: Future[Page] = pageF.recover {caseex: PageException => errorPage(ex) }
or by adding a failure callback:
Scala
pageF.failed.foreach{= >交货connection.write(埃罗rPage(ex)) connection.close() }
使用指定的回调操作pageF.foreachor those specified usingpageF.failed.foreachwill run, but not both.
Non-Blocking “Join” Pattern
In the server example,pageFis created by combining two futures,dataFandadF, usingflatMap. You can use the same approach to combine three or more futures:
Scala
valf1: Future[Int] = ...valf2: Future[String] = ...valf3: Future[Double] = ...valf: Future[(Int,String,Double)] = f1.flatMap(n => f2.flatMap(s => f3.map(d => (n, s, d))))
This won’t scale to larger numbers of futures, though. An interesting and not uncommon case is to combineNfutures of the same type into a single one, for an arbitrary numberN. In the server example, a client might obtain data fromNdatabase queries, which are executed in parallel:
Scala
defqueryDB(requests: List[Request]): Future[Page] =valfutures: List[Future[Data]] = requests.map(request => Future(dbLookup(request)))valdataListF: Future[List[Data]] = Future.sequence(futures) dataListF.map(makeBigPage)
The first line usesmapto create a list of database-querying tasks, one for each request. These tasks, which run in parallel, form a list of futures. The key step inqueryDBis the call toFuture.sequence. This function uses an input of typeList[Future[A]]to produce an output of typeFuture[List[A]]. The future it returns is completed when all the input futures are completed, and it contains all their values as a list (assuming no errors). InvokingFuture.sequenceserves the same purpose as the “join” part of a fork-join pattern, but does so without blocking. The last step uses a functionmakeBigPagefromList[Data]toPageto build the final page.
As of this writing, there is no standardsequencefunction forCompletableFuture, but you can implement your own usingthenCompose(equivalent toflatMap) andthenApply(equivalent tomap):
Scala
Listing 26.10: Joining a list ofCompletableFutures into one without blocking.
defsequence[A](futures: List[CompletableFuture[A]]): CompletableFuture[List[A]] = futuresmatch caseNil => CompletableFuture.completedFuture(List.empty)casefuture :: more => future.thenCompose(first => sequence(more).thenApply(others => first::others))
This function uses recursion to nest calls tothenCompose(flatMap). In the recursive branch,sequence(more)is a future that will contain the values of all the input futures, except the first. This future and the first input future are then combined usingthenComposeandthenApply(flatMapandmap), according to the pattern used earlier to merge two futures (as in Listings 26.5, 26.7, and 26.8).
Non-Blocking “Fork-Join” Pattern
FunctionqueryDBuses a fork-join pattern in whichsequenceimplements the “join” part without blocking. Fork-join is a common enough pattern that Scala defines a functiontraversethat implements both the “fork” and “join” parts of a computation. You can use it for a simpler implementation ofqueryDB:
Scala
defqueryDB(requests: List[Request]): Future[Page] = Future.traverse(requests)(request => Future(dbLookup(request))).map(makeBigPage)
Instead of working on a list of futures, assequencedoes, functiontraverseuses a list of inputs and a function from input to future output. It “forks” a collection of tasks by applying the function to all inputs, and then “joins” the tasks into a single future, as insequence, without blocking.