Backpressure
Or, How to Melt Your CPU Doing ETL With Clojure core.async
The job was simple. Come to think of it, half my work lately has been simple in the same way: ETL. Or ELT, no one cares. We need to get (extract) data from one place and load it someplace else, prolly transformed in there somewhere.
How did ETL become half the work we do? Moral somewhere.
Come to think of it, in this case I did not need to transform the data. I just needed to move a crap ton of shapefile data out of the Postgres database where it was being handcrafted by geodata experts into an ElasticSearch cloud instance. Daily.
Shapefile? What it says:
“points, lines, and polygons representing, eg, water wells, rivers, and lakes.”
Do not know how they missed coastline. Speaking of coastlines, and we will before we are done, I got a huge kick out of the ones that crashed my 16gb MacBook Pro JVM. In order:
- Canada, 202km: duh. Look at a map;
- Indonesia, 99km: they cheat. They have a zillion islands;
- Norway, 58km: they cheat. Think “fjords”; and
- Russia, 38km: just a big ass country.
How did these nice countries crash my JVM? I was handling these things in parallel, and if my memory does not fail me, the bigger shapefiles were over a hundred megabytes.
Really, JVM? Even after tweaking memory allocation? We shoulda used Lisp.
Why was I handling the shapefiles in parallel? Why not? They can go in any order, there are a lot of them, this has to run every day, and incremental loading “need not apply” for reasons outside the scope of this jam.
We Don’t Need No Stinkin’ Go!
And parallel is easy, with Clojure core.async channels. We can do blocking and non-blocking puts and takes, specify a buffer size of things to hold in a channel, and process the channels in trivial go loops. Channels can have multiple writers and multiple readers.
Yes, Go was the inspiration. Lisps tend to do that. They see an interesting idea and implement it. Without changing the language. See CLOS.
Postgres Cursors
OK, great. I will use a Postgres cursor to pull shapefiles, then put them on a channel where multiple readers are waiting to package them up for relay to something we have not discussed, a generic data aggregation EC2 instance that would accept data from many providers, not just us, and forward them again to the ElasticSearch instance.
Anyway, my job was not really to write to ElasticSearch, it was to write to the data aggregator.
Spoiler alert: I ended up beating on the aggregator anyway.
ElasticSearch
Earlier I provided lame excuses for doing this in parallel. Secretly I just wanted to play with core.async. But another justification is that ElasticSearch is wicked cool about accepting data and indexing when it can get around to it, thank you very much. We wanted to leverage that by shoving shapefiles down its endpoint in parallel. So that is how I ended up beating on the aggregator, but I am ahead of myself.
Bucket Brigades
So now I have four go-loops pulling from the channel fed by the loop pulling from the Postgres cursor, packaging things up nicely for the aggregator, and…hang on! They need to grab the next shapefile from that channel. They have no time to deal with the aggregator! Just dump the packaged shapefile in another channel! Create four workers to pull the packages off and send them out on the Interweb to the data aggregator!
Can you say “bucket brigade”? Sher ya can.
The Astute Reader
My astute reader protests, “Who says four packagers mean four transmitters? Maybe one task takes longer than another. Maybe…”
At some point I got the damn thing working, as far as shuttling over a couple hundred shapefiles. (Little did I know what Norwegian fjords lurked.) And then I wondered about these magic numbers 4 and 4.
I considered for a nanosecond experimenting with different numbers before deciding, “Science!”
Enter Numbers
I had been here before in other applications, twiddling with how many bucket emptiers to have on each brigade, and I knew I could fiddle for hours like that.
Instead, it occurred to me that I could have each worker jot down the time before it did a blocking take from a channel and then, when it got some data, record how long it had waited before going ahead and processing its data.
Neato! I now can eyeball the numbers and reallocate workers accurately in one or two goes!
The Bad News
Duh. It turned out the first layer of workers was waiting on the one worker pulling from the Postgres DB.
Golly, I/O bound, when has that ever happened?
The Good News: Postgres Shenanigans
The good news is that I rule the world. Specifically, I can give the source PG table a new serial column id, a second column channel (sound familiar?), decide on n PG readers (not just one), then issue a single update setting the channel of a row to its id mod n.
Now each worker n reads the PG table where channel = <n>. Too easy?
Good Bye JVM
The good news is, we are cooking with gas. The bad news is, we let ‘er rip and the JVM goes belly up.
Hours later…
Parallelism: The Two-edged Sword
Remember when I said core.async would let me jam multiple shapefiles in one channel by specifying a buffer size? cough Well, not if there is no room in the channel. So I had each channel accept what I guessed would be “a lot” so the putters could put and move on instead of waiting. Cannot hurt right?
Tell that to the JVM crashed and burning at my feet.
Thanks to the Mac OS X bizarrely named Activity Monitor and sufficient print statements and prolly something that gave me the size of a JSON object I eventually worked out that my problem was the JVM running out of memory.
The stacktrace mentioning OOM was another hint.
I worked out the incantations to provision the JVM out the wazoo, but somehow it was never enough. And I still did not know what Indonesia had in mind for me.
I did know I was staring single-threaded ETL in the face. Because what number of objects in a channel would at once move data in parallel and avoid crashing the JVM?
It was too late to switch to Common Lisp.
One
One. One shapefile in each bucket brigade channel at a time.
But there goes our parallelism! We will block on a put until the prior shapefile gets processed! Run away!
Backpressure
Run away? No, that is why we metered the wait time of the pullers. If they are not waiting for data, there are not enough of them. Specify more. Hell, I might have metered the pushers as well. At any rate, we can achieve all the parallelism we can handle by tuning the number of workers at each end of each bucket brigade.
We will not overwhelm the JVM because only one shapefile occupies a channel at a time. OK, if we have a thousand workers we will have a thousand shapefiles in memory at some point, but we are tuning the worker numbers, we can still control that kind of excess.
Bye-bye Aggregator
Now we lost the aggregator. It got overwhelmed trying to move data along to ElasticSearch. My extractor-transformer was so powerful that it flooded the aggregator with async HTTP requests to absorb data.
Rinse, Repeat: More Backpressure
Once more into the backpressure breach. Contrary to the backpressure link cited above, yes, we artificially generate backpressure by making puts to the aggregator instance synchronous. Likewise the puts to the ElasticSearch instance.
Optimal parallelism through synchronous operations. Inconceivable? Nope. The synchronicity lets us casually load workers with data out the wazoo, knowing backpressure will prevent more data than the JVM can handle from getting into the pipelines at the same time.
Until the JVM crashes. Mommas…sigh
Blame Canada!
Thanks again to the Activity Monitor and sufficient print statements and prolly something that gave me the size of a JSON object I eventually worked out that my problem was…Canada! Or one of those big ass geometries.
With four workers on each of two bucket brigades, we would never have Clojure handling more than eight shapefiles. Which is fine, until Russia comes along.
The good news is that we just had a few of these beasts, so I simply:
- sorted the shapetables by size (Postgres could see their size);
- processed them by increasing size; and
- once we got to a certain size, abandoned all parallelism and even core.async.
The good news? Single-threaded, the JVM could handle Canada.
Summary
I mentioned melting my CPU. I have been at this game a while, and shy of a tight infinite loop never made a multi-core system really sweat. Peaks of 40–50% overall were about it. With core.async and backpressure, I saw my MacBook Pro at 90% CPU with fans roaring. That was fun.
Left as an exercise? Having the system dynamically tune the number of workers on each bucket brigade by watching the statistics. Maybe we could hit 95% CPU?
Postfix
Looking for a nice image to go with this story I googled “backpressure”, and inadvertently discovered an astonishing amount of literature on backpressure in software. If anyone can read, let me know what I reinvented badly in the comments.