Reading Notes for [On Brewing Fresh Espresso: LinkedIn’s Distributed Data Serving Platform]

Why Espresso?

  • RDBMS has some shortages and it costs a lot both in terms of licensing and hardware costs.
    1. Relational Database installation requires costly, specialized hardware and extensive caching to meet scale and latency requirements.
    2. Adding capacity requires a long planing cycle. Cannot do it with 100% uptime.
    3. Data model (Or schema) don’t readily map to relational normalized forms. Schema changes on the production database incur a lot of DBA time and machine time when the datasets are large.
  • Voldemort store(inspired by Dynamo) is an eventually consistent key-value store. It’s initially used for soft-state and derived data sets and it’s increasingly being used for primary data that does not require a timeline consistent change capture stream.
  • Essential requirement for Espresso
    • Scale and Elasticity.
    • Consistency
    • Integration: ability to consume a timeline consistent change stream directly from a source-0f-truth system
    • Bulk Operations: ability to load/copy all or part of a database from/to other instances, Hadoop and other datacenters, without downtime
    • Secondary Indexing: keyword search, relational predicates
    • Schema Evolution: forward and backward compatible schema evolution
    • Cost to serve: RAM provisioning proportional to active data rather than total data size

Espresso Features

  • Transaction support: MySQL does not support transaction beyond a single record. Expresso supports a hierarchical data model and provides transaction support on related entities.
  • Consistency model: Read and write are served by master node. Replication between master and slave is either asynchronous or semi-synchronous to make it timeline consistent. When master failure happens, the cluster manager promotes one of the slave replicas to be the new master to maintain the availability of system.
  • Integration with the complete data ecosystem: Providing out-of-the-box access to the change stream, both online and offline.
  • Schema awareness and rich functionality: Espresso is not schema-free like other NoSQL stores. Enforcing schema make sure the data is in a consistent way, and also enable key features like secondary indexing and search, partial updates to a documents and projections of fields in a document.

External Interface

Data Model

  • Common use case
    • Nested Entities.
      • Example: All messages that belongs to a mailbox and any statistics associated with the mailbox. All comments that belongs to a discussion and the meta-data associated with the discussion.
      • Primary write pattern: Creating new entities and/or updating existing entities. Mutations often happen in a group and atomicity guarantees here are helpful in simplifying the application logic.
      • Read pattern: Unique-key based lookups of the entities, filtering queries on a collection of like entities or consistent read of related entities.
    • Independent Entities.
      • Example: People and Jobs.
      • Write pattern tend to be independent inserts/updates. Application is more forgiving of atomicity but need guarantees that updates to both entities must eventually happen.
  • Data hierarchy
    • Document: Smallest unit of data represented in Espresso. Just like row in SQL table.
    • Table: Collection of like-schema-ed documents. Just like table in SQL world.
    • Document Group: A collection of documents that live within the same database and share a common partitioning key. It’s not explicitly represented. Document groups span across tables and form the largest unit of transactionality.
    • Database: Largest unit of data management. Just like databases in any RDMBS.


  • Read
  • Write
  • Conditionals: Rarely used.
  •  Multi Operations: Batch operations.
  • Change Stream Listener: Databus is used here to allow observer to observe all mutations happening on the database while preserving the commit-order of the mutations with in a document group.

Bulk Load and Export

  • Load from Hadoop job
    • Hadoop job output a specialized format
    • Hadoop job notice Espresso cluster to load data
    • Progress can be monitored
  • Data Export
    • Databus is used to provide real-time stream of updates which we persist in HDFS.
    • Periodic jobs additionally compact these incremental updates to provide snapshots for downstream consumption.

System Architecture


  • Clients and Routers
    • Client sends a request to Espresso endpoint by sending an HTTP request to a router.
    • Router forwards the request to appropriate storage nodes, and assembles a response.
    • Routing logic use the partitioning method specified in database schema and applies the appropriated partitioning function.
    • If it does not contain a partitioning key, such as index search query on whole data set, router will query all storage nodes and sends the meged result set back.
  • Storage Nodes
    • Replicas are maintained using a change log stream.
    • Consistency checking is performed between master and slave partitions and backups.
  • Databus relays
    • Low replication latency and high throughput.
  • Cluster Managers
    • Apache Helix is used. Given a cluster state model and system constraints as input, it computes an ideal state of resource distribution, monitors the cluster health and redistributes resources upon node failure.
    • Helix assigns partitions to storage nodes with these constraints:
      • Only one master per partition.
      • Master and slave partitions are assigned evenly across all storage nodes.
      • No two replicas of the same partition may be located on the same node or rack.
      • Minimize partition migration during cluster expansion.


Secondary Index

  • Use case: Selecting a set of documents from a document group based on matching certain predicates on the fields of the documents.
  • In key-value model, there are two ways to achieve this:
    • Fetch all rows and perform filtering: Slow.
    • Maintain the primary relationship and reverse-mappings for every secondary key: Create potential divergence.
  • Key requirement
    1. Real-time indexes.
    2. Ease of Schema Evolution.
    3. Query flexibility.
    4. Text search.
  • First attempt with Apache Lucene:
    • Fulfill requirements 2, 3 and 4.
    • Drawbacks
      • Not designed for realtime indexing requirements.
      • Entire index needs to be memory-resident to support low latency query response times.
      • Updates to documents require deleting the old document and re-indexing the new one.
  • Second attempt with Prefix Index.

Partitions and replicas

  • Load balancing during request processing time
  • Efficient and predictable cluster expansion

Internal Clock and the Timeline

  • Each change set is annotated with a monotonically increasing system change number(SCN).
  • SCN has two parts: generation number and sequence number.
    • Each mastership transfer, generation number increase by one.
    • Each new transaction, sequence number increase by one.

Replication and Consistency

  • Replication layer is designed to address MySQL problems.
  • Consistency checker: Calculate the checksum of certain number of rows of master partition and comparing to the checksum of slave partition. On detection of errors, recovery mechanisms are applied.

Fault tolerance

  • When a storage node fails, each master partition on the failed node has to be failed over. A slave partition on a healthy node is selected to take over. The slave partition drains any outstanding change events from databus and then transitions into a master partition. SCN is changed from (g, s) to (g + 1, 1).
  • How to detect storage failures?
    • Zookeeper heartbeat.
    • Monitor performance.
  • Transient unavailability is there for the partitions mastered on the failed node. Helix always promotes a slave partition which is closest in the timeline to the failed master. Router can optionally enabled slave reads to eliminate the read unavailability. After master transition finishes for a partition, routing table is changed on Zookeeper by Helix, which allow router to direct the request accordingly.
  • Databus is fault-tolerant. Each relay has several replicas. One relay is designated as leader and others are followers.
    • The leader relay connects to the data source.
    • The follower relays pull the change from leader.
    • While leader fails, one of the followers is elected as leader and will connect to source. Other followers will disconnect from the failed leader and connect to new leader.
  • Helix itself is stateless.

Cluster Expansion

  • Certain master and slave partitions are selected to migrate to new nodes.
  • Helix will calculate the smallest set of partitions to migrate to minimize the data movement and cluster expansion time.

Use cases at LinkedIn

  • Company pages
  • MailboxDB
  • USCP (User social content platform)

Compare with others

  • Espresso does not provide support for global distributed transactions.
  • Transaction support within an Entity group is richer most distributed data systems, such as MongoDB, HBase and PNUTS.
  • Among the well known NoSQL systems, MongoDB is the only one that offers rich secondary indexing capability like Espresso. But Espresso has better RAM:disk utilization. Except MongoDB, no other NoSQL system offer rich secondary indexing capability that Espresso offers.
  • HBase and BigTable follow a shared-storage paradigm by using a distributed replicated file system for storing data blocks. Espresso uses local shared nothing storage and log shipping between masters and slaves with automatic failover, similar to MongoDB. This guarantees that queries are always served out of local storage and delivers better latency on write operations.
  • The multi-DC operation of Espresso differs significantly from other systems.
    • Voldemort and Cassandra implements quorums that span geographic regions.
    • MegaStore and Spanner implements synchronous replica using Paxos.
    • PNUTS implement record level mastership and allows writes only on geographic master.
    • Espresso relaxes consistency across data centers and allows concurrent writes to the same data in multiple data centers relying on the application layers to minimize write conflicts.
    • Conflict detection and resolution schemes are employed to ensure that data in different data center eventually converges.