I've been looking into Kafka lately, because I have a need for "the thing that Kafka is in practice" (not a message queue per se; rather, an indefinitely-durable event store that supports message-queuing API semantics for consumers.)
And, while Kafka does do this, its infrastructure needs are kind of ridiculous when your producers 1. produce events in clean batches, and 2. could re-produce those events, in the exact same batches, if prompted.
Ideally, rather than having to basically have a whole second DBMS in the form of a Kafka cluster (stateful nodes with big HDDs, carefully backed up, needing repair when any die), what I would love is a Kafka-like "event store with MQ semantics", which is in turn backed by an object store. Basically, like Datomic's segment-storage architecture; or like WAL-E in its use-case as an intermediary for Postgres replication. Unlike either of those, though, MQ log-segments don't need any additional metadata (e.g. checkpoints, indices), so they can just be chucked straight in object storage, and MQ nodes can base their view of the world entirely on what happens to be in the object-storage bucket.
Of course, this kind of system only makes sense if you have no need for durability of new messages (because e.g. the producer still has the data that went into them, and can resend them), only old messages. Which is kind of an unusual requirement, I suppose, so I don't really expect anyone else to go building this sort of thing for me.
Agreed. For me, Kafka's reliance on ZooKeeper is the most annoying one. ZooKeeper is difficult to manage. And both Kafka and ZK, being JVM apps, are memory hungry, and the heap is difficult to tune — they need lots of headroom to avoid accidental GC pauses or OOM events.
Kafka also puts some of the complexity burden on the client. It's been a while since I wrote anything for it, and I believe you can tell it to manage the log position for you now. But by default I believe a client needs to talk to both Kafka and ZooKeeper (or store the log position somewhere else).
There's definitely room for a simpler solution for those who can't afford the complexity of managing Kafka.
I looked at Apache BookKeeper [1] recently. It has a nice design and now supports Etcd for consensus. Unfortunately, like Kafka, it relies on a fat client, and that client is currently written in Java, making BK impossible to use from a language like Go or Node.js.
Apache Pulsar [2] is a layer on top of BookKeeper that provides pub/sub semantics and high-level features like functions and schemas, and offers client support for multiple languages. On the other hand, Pulsar requires both BookKeeper and ZooKeeper (and doesn't support Etcd). In theory you could run zetcd to emulate ZK with Etcd, but... it gets a little bit ridiculous at this point.
I've looked at NATS Streaming, but its clustering story looks a little unfinished.
I've previously built a system that built a log on top of Postgres. You can achieve a decent approximation of Kafka using transactions and LISTEN/NOTIFY, and with table partitioning and replication you can even get some scale out of it, especially if you're able to distribute independent logs across completely separate Postgres instances. But it's ultimately not as good as a dedicated store.
I particularly like your idea that — if I understand it correctly — data can be stored independently of the log itself.
As of v0.9 (about 4 years ago), clients have no use for a direct connection to Zookeeper, and don't maintain one.
Fat clients are more effort to make, which is a significant downside, but this is abstracted from the user and Kafka has critical mass - the client story outside Java is improving dramatically. A fat client allows for optimal availability, throughput, latency.
A fat client makes a lot of sense, but if it's written in Java, then only JVM-based languages benefit. A client written in C, C++ or Rust — or any other language with a similarly good C-interop story — could easily be consumed by just about anything except, ironically, Go.
there are pros and cons to having a C dependency. the main benefit is reliability (get it right once, leverage that across languages). Another benefit is usually performance. The main practical gotcha is that security dependencies are incompatible across platforms, and the best you can do is distribute a number of builds of the C dependency compatible with common platforms (but that is workable).
the go story isn't as bad as you might think. there is a performance hit with the interop, which is of the order of 20% iirc - but throughput still massive, and this is not of practical significance. The Confluent go client (a binding to librdkafka), is used to push much of the volume through two of the largest Kafka installations in existence.
there is a PR open on the confluent go client that will include builds of librdkafka in the github repo, meaning this will 'just work', without the need to install librdkafka separately in most common scenarios.
note: i work on kafka clients (including minor contributions to the confluent go client).
To me, a "log" is just a directory of files (= log segments), where only one file (the newest segment) is ever opened for writing (specifically appending), and those appends are only ever of whole, valid records. Given a "log", you can treat everything except for the newest file/segment in it as an object (i.e. object-store backed local cache file, with atomic insert/replace/delete), rather than a file (= inode containing buffer of extents) per se.
Mind you, in Kafka's design, all the old segments get "compacted" together, meaning that they get converted into a single on-disk hash table of the newest value for each key. That's cool and all, but the compacted-log hash table doesn't have object-like semantics; it suffers constant random writes every time new segments get "compacted into" it.
IMHO, rather than this kind of compaction, I'd much rather just have streaming compaction, sort of like LevelDB's level-based compaction. Ideally, every time a broker finalized a segment that caused it to now have 2^N contiguous historical segments of level N in a log, it would queue up a streaming-compaction job into another topic; and then some arbitrary node would work that job by streaming through the relevant segments with a de-duping merge sort, producing a new combined segment of level N+1 and then purging the level-N source segments from the object store.
I had to read your earlier comment a couple of times, and I think I understand! When you said object store, I was thinking something like S3, which wouldn't be sufficient for the semantics you outline.
I don't know the internals of Kafka too well, so I'm surprised it doesn't already do what you suggest. An LSM tree system would be a more natural fit in my mind than fixed-sized segments. I wasn't thinking of compaction at all — I had to read a bit about why Kafka compaction wasn't just about deleting log entries beyond the retention horizon.
I suspect that in any Kafka-type application, one of the main challenges is balancing the storage, access performance and consistency of "old" and "new" objects. The head of the stream tends to be where contention is, everything else is read-only and "baked" with indexes and other mechanisms that make range searches quick, and can be heavily cached with long TTLs.
The reason I mentioned external data storage is that if the data never mutates, but it is the log structure which is mutable, then your log is just a sequence of metadata:
Then you can optimize for the structural aspect of the database — you can have very fast data store handling the sequencing of the data and the querying of ranges and so on — and simply offload the physical storage of the log entry data to something hyperoptimized to take care of large blobs efficiently.
I've not thought about what the overhead of storing many very small objects in something like S3 would be. Like if you asked for a range of log entries from one hour back and it's 1 million objects, you can certainly parallelize the fetching efficiently, but you'd also be incurring 1 million S3 requests and doing quite a lot of HTTP traffic (nothing that a local LRU cache couldn't help you with, but still).
So it's probably worth merging them — so you could have a two-level system where newly inserted objects lived in a "fresh" store and then got slowly merged into bigger chunks that would be offloaded to a "frozen" store.
I wouldn't mind sitting down and trying to build something like this in Go using Badger.
I’m confused on your JVM heap concerns. Not trying to call you out, just want to understand.
You should never need to provision more than 6GB of heap for a Kafka JVM. 90% of the OS memory leftover it will use for file handlers / page cache which is all buffered and which you don’t need to manage. Zookeeper JVM you might need to worry about but I’m running 5k partitions with hundreds of consumers just fine on 6GB heap on those as well. I use on average like 400MB. It’s not like running ElasticSearch.
Resource consumption matters a lot more in a small, self-funded company that can't just throw boxes at a problem.
My data from non-JVM apps is that we can cram a whole lot of them into small 16GB VMs via Kubernetes — Go apps especially. With JVM apps, we typically have to dedicate larger 24-32GB boxes to 1-2 apps because they absolutely need the legroom. Since RAM isn't "elastic" like CPU is, you get less flexibility in managing big pieces than with small ones, especially big pieces that must stay up. We have to preallocate a certain amount and watch our limits.
I've never actually run ZK or Kafka in production, so this is pure speculation, but I didn't get any fuzzy feelings from playing around with a test cluster recently. When you combine ZooKeeper with Kafka we're probably talking about 3+ dedicated boxes because they don't fit into existing ones.
These things add up, and right now I'm trying to reduce our monthly GCP bill, not add to it!
(We do run Elasticsearch. Its memory use seems particularly egregious, indeed. I'm sure these other things are lighterweight.)
> what I would love is a Kafka-like "event store with MQ semantics", which is in turn backed by an object store.
I use Druid for somewhat of an off-kafka store with object store flushing. It is not an MQ log, but the ingestion flushes to object store as each segment gets finalized.
And it will directly pull off Kafka & keep the data ready to be sliced/diced easily.
I set my Druid segments to get flushed out every 24h and the Kafka queue to be kept for 7 days, which means if I lose my Druid, it will rehydrate from Kafka on startup (and I can scale the real time ingestion nodes for a faster start, because "cloud" means money is time).
Though, I've been poking about Pulsar[1] and Pravega[2], which are not exactly Kafka, but red wine/white wine in choice.
That said, there's a fair bit of ZK in it, I'm starting to tire of ZK related issues in general (like you need to declare entire ensemble before JVM startups). Also patching it is harder than usual, since the devs have gotten used to saying "No" to FB asks from ZK. It doesn't make any progress, because patches sit on an "are we sure" pile.
You might be interested in Gazette (https://github.com/gazette/core). It presents durable, append-only, transactional byte streams ("journals") while delegating all actual storage to S3/GCS/Azure etc. Brokers themselves are fully ephemeral. Written journal content ends up in predictably-named "fragments" in a configured bucket path, where each fragment holds only raw written bytes (eg, if you write JSONL messages, your fragments are also just JSONL).
The broker is pure Go, and Etcd is used for distributed consensus. The project has been around for years (as closed source), but still lacks a bunch of polish as an open-source offering. Working on that...
Having read it I love the approach - it's very similar to how we handled data at Common Crawl / Web Archive (WARC) files / inspired by MapReduce + object storage interactions. I'll keep it in mind for the next opportunity :)
> I've been looking into Kafka lately, because I have a need for "the thing that Kafka is in practice" (not a message queue per se; rather, an indefinitely-durable event store that supports message-queuing API semantics for consumers.)
Don't think that is a secret feature. It was an intended feature, from the author himself [1].
You know you can put a key with a null value onto a topic. If your backend object store gives you data by key, this should give you exactly what you want.
Later on, you can put a small subset of the data in value to assist with filtering or caching (if you want).
I'm not clear on what you're trying to suggest, here. I would like a system that presents itself the same as a Kafka broker, to both producers and consumers (who are just working with regular messages and topics, not object-handles), but where the MQ broker they're talking to is internally just a shared-nothing stateful write-through WAL proxy for an object store. (Sort of like how, with actual logging, you could think of the local rsyslog daemon on an instance as a "shared-nothing stateful WAL-writeback-caching proxy for an object store." Except rsyslog lives on the producer, whereas this write-through WAL proxy would live on the broker.)
In such a design, "partitions" and partition ownership would only apply to the newest segment of the log; all older segments would be globally-available, because as soon as a log-segment is finalized, it would get pushed to the object store and become an object.
Every node would then be doing Hierarchical Storage Management, with a local LRU cache of segments, populated just-in-time in response to requests, by the object store. (This is what I meant by the comparison to Datomic's storage architecture.)
A lot of the technical parts of this could be achieved by just FUSE-mounting an S3 bucket to a directory on each Kafka node, and convincing the node to move log segments into that directory once they're done. But Kafka would still be wasting time and resources replicating segments between nodes that already now "have" them; and also, without further hacking, nodes wouldn't realize that they now also "have" 99% of the segments of every log potentially-available to serve. A system that was built this way from the ground up would end up a lot simpler than Kafka.
And, while Kafka does do this, its infrastructure needs are kind of ridiculous when your producers 1. produce events in clean batches, and 2. could re-produce those events, in the exact same batches, if prompted.
Ideally, rather than having to basically have a whole second DBMS in the form of a Kafka cluster (stateful nodes with big HDDs, carefully backed up, needing repair when any die), what I would love is a Kafka-like "event store with MQ semantics", which is in turn backed by an object store. Basically, like Datomic's segment-storage architecture; or like WAL-E in its use-case as an intermediary for Postgres replication. Unlike either of those, though, MQ log-segments don't need any additional metadata (e.g. checkpoints, indices), so they can just be chucked straight in object storage, and MQ nodes can base their view of the world entirely on what happens to be in the object-storage bucket.
Of course, this kind of system only makes sense if you have no need for durability of new messages (because e.g. the producer still has the data that went into them, and can resend them), only old messages. Which is kind of an unusual requirement, I suppose, so I don't really expect anyone else to go building this sort of thing for me.