Lots of “log” data generated every day, including
- user activities like login, page views, clicks, likes, and other queries
- machine metrics like CPU, memory usage.
This is not only for offline analytics, but also very useful in online services. Usage may includes
- search relevance
- recommendation performance
- ad targeting and reporting
- security things.
The traditional way is to dump the log file on each machine. But it’s time consuming and not efficient. And it only works for offline analytics. There are other distributing log aggregators including Facebook’s Scribe, Yahoo’s Data Highway but they are primarily designed for data ware house and hadoop usage. We have the needs for online usage, with the delays of no more than a few seconds.
- Distributed and scalable, offering high throughput.
- API is similar to messaging system. Application can consume it in real time.
Why traditional messaging systems don’t work?
- Mismatch in features: they are focusing on offering delivery guarantees, which is overkill for collecting log data.
- Cannot meet throughput requirement: very high cost when sending a message.
- Weak in distributed support.
- Assuming near immediate consumption of messages: queue of unconsumed message is always small. If it increases, their performance downgrades.
Architecture and Design
- A stream of messages of a particular type is defined by a topic.
- A producer can publish message to a topic.
- Published message are stored at a set of services call brokers.
- A consumer can subscribe to one or more topics from the brokers and consume the subscribed messages by pulling the data from brokers.
How to balance
- Topic is divided into multiple partitions and each broker stores one or more of those partitions.
Why it’s efficient?
- Simple Storage
- Each partition of a topic corresponds to a logical log, and each log is implemented as a set of segment files of approximately same size.
- When there is a new message from producer, the broker just append it to the last segment file.
- Only flush after a configurable number of messages.
- A message is only exposed to consumers after it’s flushed.
- No message id: each message is addressed by its logical offset in the log.
- Consumer always consumes messages from a particular partition sequentially:
- If a particular message offset is acknowledged, it means all messages before this offset are consumed.
- Pull request contains the offset of the message and acceptable number of bytes to fetch.
- Broker has a sorted list of offsets, including the offset of first message in every segment file.
- Efficient transfer
- Producer can submit a set of messages in a single send request. Consumer will receive several messages in a request even when they are processing the message one by one.
- No caching: Relying on the file system page cache. Both producer and consumer access the segment files sequentially and consumer often lagging the producer by a small amount.
- An API “sendfile” is used to reduce unnecessary 2 copies and 1 system call.
- Stateless Broker
- How much each consumer has consumed is not maintained by the broker, but by the consumer it self.
- But how to delete the message? Time-based SLA for retention policy: A message is automatically deleted if it has been retained in the broker longer than a certain period, typically 7 days.
- Side benefit: A consumer can deliberately rewind back to an old offset and re-consume the data.
- Each producer can publish a message to either a randomly selected partition or a partition semantically determined by a partitioning key and partitioning function.
- Consumer groups:
- Each message is delivered to only one of the consumers within the group.
- Different groups each independently consume the full set of subscribed messages and no coordination is needed across consumer groups. (Reduce the complexness of locking and state maintenance. To make it to be truly balanced, we need many more partitions in a topic than the consumers in each group)
- No central “master” node. Zookeeper is used here. Zookeeper’s feature:
- Create a path, set the value of path, read the value of path, delete path, and list the children of a path.
- One can register a watcher on a path and get notified when the children of a path or the value of a path has changed.
- A path can be created as ephemeral, which will be automatically removed when creating client is gone.
- It replicas data to multiple servers.
- What is Zookeeper used for?
- Detecting the addition and removal of brokers an consumers.
- Triggering the rebalance process in each consumer.
- Maintaining the consumption relationship and keeping track of the consumed offset of each partition.
- Detail about Zookeeper:
- When broker or consumer starts up, it stores its information in a broker or consumer registry in Zookeeper. Broker registry contains host name and port, and the set of topics and partitions stored on it. The consumer registry includes the consumer group to which a consumer belongs and the set of topics that it subscribes to.
- Each consumer group is associated with an ownership registry and an offset registry. Ownership registry has one path for every subscribed partition and the path value is the id of the consumer currently consuming from this partition. The offset registry stores for each subscribed partition, the offset of the last consumed message in the partition.
- Paths are created in Zookeeper are ephemeral for the broker registry, the consumer registry, and the ownership registry, and persistent for the offset registry.
- Rebalancing happens when the initial startup of a consumer or when the consumer is notified about a broker/consumer change through the watcher.
- Rebalancing algorithm:
- Calculate the set of available partitions each subscribed topic T.
- Calculate the set of consumers subscribe to T.
- Calculate N = (number of available partitions)/(number of available consumers).
- Each consumer will be assigned N partitions: Writes to owner and start consuming data from the offset registry.
- Delivery guarantees
- Only guarantee at-least-once delivery.
- Consumer can have their own de-duplication logic if they cares about duplicates.
- Messages from a single partition are delivered to a consumer in order. But not guarantee on the ordering of messages coming from different partitions.
- CRC for each message is used to avoid log corruption.
- If a broker goes down, any messages stored on it not yet consumed becomes unavailable. If the storage system is permanently damaged, message is lost forever.
Usage in LinkedIn
- Online consuming and offline jobs.
- Tracking: Monitoring event is used to validate data loss.
- Avro is used as serialization protocol.
Why it’s efficient comparing to other products in real experiments?
- Producer doesn’t wait for acknowledgment from the broker.
- More efficient storage format.
What can be improved?/Next steps.
- Data replication.
- Window-based counting.