Accumulating Responses from Child Actors and Transitive Message Ordering
When using actors, sometimes a parent actor needs to aggregate a result from a list of child actors. And if the parent doesn’t want to wait around forever for the children to get their responses together, it might hand off the responsibility to a temporary actor. Here’s the first stab we made at this pattern here at GA-CCRi:
[code language=”scala”] trait Accumulator[T, ACC] {
def accumulate(newT: T)
def count: Int
def accumulant: ACC
}
def accumulateChildResponse[T, ACC] (accumulator: Accumulator[T, ACC]],
childMessage: ActorRef => Any): Future[ACC] = {
val tempActor = actor(new Act {
val acc = accumulator
become {
case AccumulateStart =>
children.foreach(_ ! childMessage(sender))
case AccumulateData(req, resp) => {
acc.accumulate(resp.asInstanceOf[T])
if (acc.count == numChildren) req ! acc.accumulant
}
}
})
val future: Future[ACC] =
(tempActor ? AccumulateStart).mapTo[ACC] future onComplete {
case _ => context.stop(tempActor)
}
future
}
[/code]
So, the first thing that some people might notice is that we’re using ask, and ask has certain negative performance implications. However, there’s a bigger problem.
Our parent P wants information from its children Ci, and for each request it creates a new temporary actor Tk to handle that accumulation. This approach sends a message from P to T1 to start accumulating, and then T1 sends a message to each child: C1, C2, C3, and so on. Then, when the parent has another request, P sends a message to T2, which sends messages to each child. The problem is that we have no guarantee on the order these messages are received anymore. C1 could receive a message first from T1, then from T2, while C2 could receive first from T2, then from T1. The more requests there are, the more possible orders, and each child could receive its messages in a different order! Though Akka does guarantee the order of messages between any given sender-receiver pair, transitive message ordering is a known problem.
We could try to remove the temporary actors entirely, but that starts running into problems. The obvious approach is for the parent to become an aggregator temporarily, stashing unrelated messages in a queue for handling once the aggregation is complete. This isn’t truly asynchronous, though, and if one of the children goes down we risk smashing the queue waiting for its response.
A lesser problem is the way this method infects the code around it with its own interface. The child message object must be written with a place to put the temporary actor’s reference, the calling code must present not a message object but a method for creating the message object, and the child actors’ receive methods must be written taking the alternative reply into account.
Instead, this is the solution we came up with:
[code language=”scala”] trait Accumulator[T, ACC] {
def accumulate(newT: T)
def count: Int
def accumulant: ACC
}
def accumulateChildResponse[T, ACC] (accumulator: Accumulator[T, ACC],
childMessage: Any): Future[ACC] = {
val result = Promise[ACC]()
val accumulatingActor = actor(new Act {
val acc = accumulator
become {
case AccumulateData(resp) => {
// the type T of AccumulateData returned is lost to type erasure
acc.accumulate(resp.asInstanceOf[T])
if (acc.count == numClassifiers) {
result.success(acc.accumulant)
context.stop(self)
}
}
}
})
classifiers.foreach(_.tell(classifierMessage,
aggregatingActor))
result.future
}
[/code]
We instantiate a Promise[ACC] to produce a response of type ACC, and a temporary actor to accumulate the responses. Instead of telling the temporary actor to send messages, the parent sends messages to its children directly, guaranteeing the children will receive them in the proper order. But we tell the children that these messages have been sent from the accumulatingActor, so when they respond to the sender the response message actually goes back to the correct temporary actor. Using the tell method allows us to change this return address without including it in the message itself, meaning that this change is invisible to the client code.
But there’s also the extra benefit, which I think people might be interested in more generally: we have removed a use of the ask pattern! Instead of asking — which implicitly sets up its own temporary actor which from the general way it’s written incurs certain performance penalties — we write our own Promise and fulfill it ourselves. We also know exactly when the temporary actor can shut itself down, allowing the ActorContext to dispose of it cleanly.
There is still room to complain here: what happens if a child locks up and we never receive enough responses to fulfill the promise? This is room for future work, where we might alter the completion conditions. For instance, we might stop after receiving a certain portion of the child responses, or possibly set a timeout after which we give the best response we can at that point.