Expedia Group Inc.

09/23/2021 | Press release | Distributed by Public on 09/23/2021 08:22

Actors for Akka Stream Throughput... (Opens in new window)

Actors for Akka Stream Throughput

The problem

Stream processing at scale frequently brings up issues around concurrency, particularly when writes are being made. Consider what happens when we have two events, A and B, that appear in rapid succession in our stream and both modify the same object. If the two events are processed in parallel then it's possible that B will complete its write first and see its results subsequently overwritten by A, despite technically being the fresher of the two data points. Alternatively, it's possible that both will read, modify, and write to the same object, stomping the other's changes if a solution like optimistic concurrency is not in place (and one of the two requests is destined to return an error even if it is).

Solutions to this problem exist in various - though not always obvious - forms. Kafka itself makes our life easier by ensuring that events with the same key always appear on the same partition. So long as the key uniquely identifies the object being modified, and so long as the partition is only processed in one place at a time, then we can be sure that the problem can be managed within the scope of a single instance. Kafka Streams, one popular streaming framework, utilizes these guarantees to solve the problem by mandating that only one event will be processed at a time for each partition. This works, but uses a sledgehammer when a scalpel might suffice - if the events on our partition address different objects then it should be possible for us to process them in parallel, but Kafka Streams outright prevents us from doing so. [1]

In contrast, Akka Streams allows us the flexibility of running multiple events per partition through the use of . That method, in short, allows us to map each incoming event in the stream to a future and run a configurable number of those futures in parallel, all with the guarantee that the resulting events will be emitted in-order (regardless of when the futures actually completed). This is a powerful tool, and one that we've used with considerable success in our services that DON'T have to worry about race conditions. However, for those that DO, we run into the problem described at the start of this blog. Without a solve for that our only defense is to run with a parallelism value of one, locking us into the same one-event-per-partition-at-a-time philosophy that Kafka Streams uses and robbing us of potential throughput.

Akka actors to the rescue

Akka Streams is interesting in that it's possible to make great use of the framework without having the slightest idea what Akka actually is, or how it operates. It is, by and large, an implementation detail. For the uninitiated: Akka is a framework for building highly-concurrent systems using actors and messaging. The number of tools in that particular toolbox is extensive, as even a brief glimpse at the (ample) documentation will make clear, but for the most part Akka allows you to choose what is relevant to your needs and ignore the rest.

In this case, our needs don't go much further than the actors themselves. So - what are actors? A (probably over-simplified) synopsis is that actors are objects that communicate by sending and receiving messages. For the most part Akka dictates very little about how these actors will be used - it's up to the implementer to determine what data they encapsulate, how many there are, their lifecycle, relationship to each other, and how and what they communicate with each other. What Akka DOES provide is some basic guarantees, most crucially that each actor will only process one message at a time. Any others that it might receive in the interim are entered into a "mailbox" queue to be processed later, in the order they were received, just as soon as the previous message completes.

This guarantee is intended first and foremost to eliminate concurrency concerns from actor logic, and underpins a great deal of the framework's power. For us, it has another obvious benefit: it would appear, by all accounts, to be the perfect tool to address our race condition concerns. If we can ensure that each incoming key has a unique actor associated with it then we can use that actor to ensure that events affecting the same object process sequentially while events affecting others run in parallel on separate actors.

Given all that, a solution begins to take form: instead of using to execute a future directly we instead have it call Akka's (which itself returns a future) to message the appropriate actor. That actor can then handle said message by executing the future logic formerly slated for , all with the guarantee that only one executes at a time. In theory it's perfect, even if in practice it needs a bit more work.

The nitty-gritty details

After the actor receives its message and kicks off the desired future it has two obvious choices: 1.) return from the handler immediately or 2.) block until the future completes. Unfortunately, both have flaws. If we to take the first route then the actor will immediately begin processing the next event in its mailbox (assuming one exists). Since we have no guarantee that the previous future has completed at this point (it almost certainly hasn't) we find ourselves right back where we started from: executing multiple events for the same object in parallel. The second approach would seem to be better, as it guarantees that the second event won't process until the first completes, but falls flat when one realizes that Akka sticks multiple actors on the same software thread. Though the context switching necessary to manage that shared thread is embedded within framework itself and generally invisible to the engineer, the ramification here is that blocking that thread for one actor risks robbing other actors of their own access to the thread, thereby starving them. In short: actors shouldn't block.

It may seem, then, that we're stuck between Scylla and Charybdis, and so it would be if not for two other Akka features: Stashing and Behavior Switching. The first allows us to buffer ("stash") messages for later processing and then unbuffer ("unstash") them in FIFO fashion at a time of our choosing. The second feature allows the actor to change its behavior in response to different messages, effectively making the actor a sort of finite-state-machine. Taken together, they allow our actor to alternate between two roles: a "receiving" mode which processes incoming messages, and a "buffering" mode which stashes them. Each actor starts in receiving mode, but switches to buffering mode as soon as it receives a message and initiates its first future. At that point any further messages would be buffered - that is, until the future completes, at which point the actor would unstash the first message in its buffer (if it exists) and return to receiving mode. From there the pattern repeats. In this way we accomplish our goal of ensuring that only one future gets processed at a time, all without blocking. [2]

Why Akka? A consideration of alternatives

You may be asking: why use Akka for this at all? What value do actors really provide, here? You could, after all, attack this problem by mapping each key to a mutex, or possibly a blocking queue. That would meet the broad goal of preventing parallel execution of each key, and might be faster to implement (or at least use more familiar constructs). It would work IF you can guarantee that all incoming events for a given object would be processed on a single box.

But consider if they weren't. What if the key we cared about wasn't the same key used by the topic, so that we might expect events from different partitions? Or what if we were aggregating events from separate topics altogether? If we scale up to multiple boxes then suddenly our mutex doesn't work so well anymore, and we hit a whole host of other problems. Leadership election and cross-node coordination become necessary to ensure that all events with the same key end up in a single, correct place. We need logic to ensure that the resulting distribution is balanced. Even the simple act of sending the message across the network to another box becomes a source of complexity as we have to worry about serialization and networking protocol.

Which brings us back to Akka. Part of the appeal of the framework is that the constructs it uses are innately scalable. Sending a message to another box is easy because, hey, messages are how everything communicates, anyways, and Akka does the dirty work of figuring out how to get it there [3]. Ensuring that all instances of a key get handled on a single instance with a single actor is easy with Cluster Sharding (see documentation here), and there are features we haven't even utilized yet for persistence, recovery, and reliable delivery that are just waiting in the wings. So even if we could maybe get away with not using Akka for some of our simpler use-cases we certainly can't for others, and having a single, shareable approach to support both gives us big advantages.

Conclusion

Our team has utilized this approach in a number of internal streaming services with functions ranging from stream aggregation to database writes. With it, we can safely process hundreds of events at a time from each partition, increasing our throughput far beyond what might have been possible otherwise.