Why do we need Databus?
There is no one type of data management system that meets every needs. In most cases we will have a primary source-of-truth system and some other data systems. But we need to maintain the consistency between the primary system and other systems. There are two possible type of solutions:
- Application-driven Dual writes: Application writes to both systems. But it could be hard to handle the error cases. For example, if application succeed writing to the main system but fail to write to the another one, it should have the logic to roll back the change in main system. This leads to the complexity of applications.
- Database Log Mining: We make the database the single source-of-truth and extract changes from its transaction or commit log.
Databus follows “log mining” option.
Requirement for Databus
- No additional point of failure.
- Source consistency preservation.
- Capture transaction boundaries.
- Capture Commit order.
- Capture Consistent state: We can miss changes but we can’t miss the last update.
- User-space processing. This means that the processing of the change set is in user side. Benefits are as follows:
- Reduce the load on the database server.
- Avoids affecting the stability of the primary data store.
- Decouples the subscriber implementation from the specifics of the database server implementation.
- Enabled independent scaling of the subscribers.
- No assumption about consumer uptime.
- Isolation between Data-source and consumers.
- Allow multiple subscribers.
- Support different types of partitioning for computation tasks.
- Isolate the source database from the number of subscribers. (Increasing number of subscribers does not impact the performance of database.)
- Isolate the source database from slow or failing subscribers.
- Isolate the subscribers from the operational aspects of the source database: database system choice, partitioning, schema evolution.
- Low latency of the pipeline.
- Scalable and Highly available.
Architecture
Components
- A fetcher which extracts changes from the data source or another Databus component.
- A log store which caches this change stream.
- A snapshot store which stores a moving snapshot of the stream.
- A subscription client which pulls change events seamlessly across the various components and surfaces them up to the application.
Consistency Semantics
- Support guaranteed at-least once delivery semantics.
- An event may be delivered several times only in the case of failures in the communication channel between the relay and the client, or in case of a hard failure in the consumer application.
- Consumers need to be idempotent in the application of the delivered events.
External Clock and Pull Model
- Each change set is annotated with monotonically increasing system change number(SCN), which is assigned by the data source and typically system specific.
- States are in consumer side since we want to support a large number of consumer.
Source Data Extract
- Mapping from change set to SCN is immutable and assigned at commit time by the data source.
The Relay
- Changes extracted by the fetcher are serialized into a binary format that is independent of the data source. They are grouped together within transaction window boundaries, annotated with the clock value or SCN associated with the transaction and buffered in the transient log.
- Relay does not maintain consumer-related state. Consumer application progress is tracked through a consumer checkpoint maintained by the subscription client and passed on each pull request. The checkpoint is portable across relays.
- Relay does not know if a given change set has been processed by all interested consumers. A time or size-based retention policy at the relay tier is used to age out old change sets. Even if there are consumer in a very bad state for a long time, they can still pull the changes from bootstrap service.
- Relay Cluster Deployment
- Connect all relays to source: 100% availability if one of the relay do not fail. But it increase the load on data source.
- One leader connecting to source, and other followers connecting to leader: Very small impact to source. But small down time when leader fails. (When leader fails, a follower is elected to be the new leader and connect to source. Other followers will disconnect from the failed leader and connect to new leader.)
- New relay servers can be added: Some streams are transferred from the old relay. Managed by Helix.
The Bootstrap Service
Why we need bootstrap service?
- Dumping all data from database leads to greatly increased load on the database that is serving online traffic.
- Getting a consistent snapshot of all rows by running a long running query is difficult.
- Much efficient to catch up using a snapshot store which is compacted representation of the changes.
How to do bootstrap?
- If we read it in one time, it’s too big to process.
- We should allow batch read while new changes are applied to the snapshot store.
- After reading the snapshot, we can just apply the new changes.
Partitioned Stream Consumption
Three primary categories of partitioning scenarios
- Single consumer: A consumer subscribing to the change stream from a logical database must be able to do so independently of the physical partitioning database. This is supported by just doing a merge of the streams.
- Partition-aware consumer: Consumer can chooser which partition it’s interested in.
- Consumer groups: When change stream is too fast for a single consumer to process, we can have a group of consumer to consume the change events.
Implementation Notes
Oracle Adapter
Oracle 10g and later version provide a feature that provides ora_rowscn pseudo column which contains the internal Oracle clock at transaction commit time. But this column is not indexable. To make it available to capture the transactions spanning multiple tables, we need to have a txn column to all the tables that we wish to get change from. We also have a table call TxLog for the trigger that is triggered on every transaction.
Txlog table has following columns: txn, scn, ts ,mask, ora_rowscn.
Changes can be pulled with the query:
1 2 3 4 |
select src.* from T src, TxLog where scn > lastScn AND ora_rowscn > lastScn AND src.txn = TxLog.txn; |
Drawbacks of trigger-based approach:
- It can miss intermediate changes to row because it only guaranteed to return the latest state of every changed row, which is not ideal but fine.
- Triggers and the associated tables that they update cause additional load in terms of read and writes on the source database.