Page 1 of 39
MIT 6.824 Distributed Systems
Infrastructure
graph TD
Infrastructure --> Storage
Infrastructure --> Communication
Infrastructure --> Computation
Course Contents
- RPC
- Threads
Concurrency Control→ Concurrency Control- Performance
- Scalability
- Fault Tolerance
- Consistency
Page 2 of 39
RPCs & Threads
Why threads?
- I/O concurrency
- Parallelism
- Convenience → ping, background tasks
GFS
graph TD
High_Performance --> Sharding
High_Performance --> Faults
Faults --> Tolerance
Tolerance --> Replication
Replication --> Consistency
Consistency --> Low_Performance
Low_Performance --> High_Performance
Page 3 of 39
MapReduce: Simplified Data Processing on Large Clusters (2004)
Abstract
flowchart LR
A[Map/Reduce Programs] --> B[MAP REDUCE]
B --> C[Parallelized]
C --> D[Output]
- Position input data
- Scheduling program's execution across set of machines
- Handles machine failures
- Manages inter-machine communication
- High performance
Hence,
e.g., Distributed GREP, count of URL access frequency, Inverted Index, Distributed sort, etc.
Implementation
flowchart LR
InputData --> Splits
Splits -->|M sets| ProcessedInParallel
ProcessedInParallel --> IntermediateKeys
IntermediateKeys --> Splits2
Splits2 -->|R sets| Output
- MAP
- REDUCE (user can specify R, partitioning functions)
Page 4 of 39
Implementation (contd.)
The implementation consists of the following:
- Master Data Structure
- For each Map & Reduce task, store the state (idle, in-progress, or completed) and the identity of worker machines for non-idle tasks.
- Fault Tolerance
- Worker (fast reset)
- Master (can make backup to manage the data structure)
- Data Locality & Task Granularity
Refinements
- I/O types
- Monitoring service
MapReduce can solve problems that are trivial but big. Might be difficult for hard problems.
- i.e., batch processing jobs ✓
Interactive jobs→ Interactive jobs ✗
Page 5 of 39
The Google File System (2003)
Abstract
- Scalable distributed file system over an inexpensive commodity hardware.
Traditional file systems + Google's abstractions (extensions) for their workload (distributed).
- (Performance + scalability + reliability + availability)
Introduction
Questioning existing design choices
-
Component failures
- They need to be considered as norm, since it is always fails in thousands of machines.
- Problems can occur by (system) bugs, application bugs, human error, disk fails, networking, power supplies.
- Therefore, constant monitoring, error detection, fault tolerance, & automatic recovery.
-
Huge file sizes
- Billions of KB files are unwieldy, since multi-GB files are common. To operate, block sizes have to be revisited.
-
Write / update considerations in files (Think OASIS)
- Most files are append-only; over-writing existing data is practically non-existent.
- Once written, the files are only read, and often only sequentially.
The workloads are often analytics, logs, etc.
Page 6 of 39
- Co-designing application & FS API
- Adding atomic append operation
Design Overview
Operations Supported
- Create, delete, open, close, read & write
- Snapshot & record append
flowchart LR
subgraph Architecture
GFSMaster[ GFS Master\n(File namespace, chunk info) ]
GFSMaster -->|1| FileNamespace
GFSMaster -->|2| ChunkInfo
GFSMaster -->|3| ChunkHandle, Location
GFSMaster -->|4| ClientCodeApp[Client's code App]
ClientCodeApp -->|API| Users
end
GFSMaster -->|Heart beat| ChunkServer[Chunk servers]
ChunkServer -- Replicated --> ChunkServer
ChunkServer -->|Chunk handle, byte ranges| Data
FS metadata includes:
- Namespace
- Access control information
- Mapping from files to chunks
- Current location of chunks
Operations:
- Chunk lease management
- Garbage collection
- Chunk migration
Page 7 of 39
-
Single master
- Minimize its involvement in read & writes
-
Chunk size
- 64 MB
- Reduces metadata on the server, which allows to store it in memory.
- Reduces request information.
-
Metadata
- All metadata is kept in the master's memory
- They are kept persistent by logging mutations to an operation log.
| In-memory DS | Chunk location | Checkpoints & Operation Log |
|---|---|---|
| Scanning periodically | Polls chunk servers at startup for location | Record critical metadata changes |
| Fast & efficient ops | Can update using heartbeat messages | Persistent & replicated to remote machines |
| Also by periodically asking | Also, master creates a checkpoint whenever log grows beyond a certain size |
Consistency Model
Page 8 of 39
Operations
sequenceDiagram
participant Client
participant Master
participant Primary
participant Secondary
Client->>Master: append(filename, bytes)
Master->>Client: (chunk handle, lease, version#)
Client->>Primary: push(bytes)
Primary->>Secondary: push(bytes)
Secondary->>Primary: ack
Primary->>Client: ack
After "ack", the client sends write request & BOOM
after all "yes", "success" → done
else → not done
This still causes inconsistency; some may append & some not.
Page 9 of 39
Design of Practical System for Fault-Tolerant Machines (2010)
Abstract
- Custom per fail-stop (crash)
- Fault-tolerant VMs via primary/backup specification approach.
- Practical issues + Design choices + Implementation + Alternatives
Introduction
To replicate, we have two approaches:
- Constantly send all changes to the state of primary (CPU, memory, etc.) to backup continuously.
- Bandwidth ↑
- Just send same inputs/requests to both the servers.
Non-deterministic operations such as clock, system interrupts can create trouble.
A VM running on top of hypervisor can implement this approach.
flowchart LR
subgraph Replicated State Machine
PrimaryVM[Primary VM] -->|Non-deterministic operations + Input| LoggingChannel[Logging Channel]
LoggingChannel --> BackupVM[Backup VM]
PrimaryVM -->|Input| SharedDisk
BackupVM -->|Output| SharedDisk
end
Page 10 of 39
Deterministic Replay Implementation
- There are broad set of inputs, including network packets, disk reads, and input from the keyboard & mouse
(e.g. Virtual interrupts, reading clock cycle - non-deterministic)
Challenges
-
Correctly capture all the input & non-determinism
-
Correctly applying the inputs & non-determinism
-
Doing so in a manner without degrading performance
-
All inputs to primary VM are recorded & all possible non-determinism associated with the VM's execution in a stream of log entries written to a log file.
-
For non-deterministic operations, sufficient information is logged as well.
-
For events & interrupts, efficient event recording & event delivery mechanism is implemented.
FT Protocol
To ensure no data is lost when primary fails.
Note: The logs are not written to disk, instead sent via logging channel.
sequenceDiagram
participant Primary
participant Backup
Primary->>Backup: async event
Primary->>Backup: dep output
Primary->>Backup: output
Backup->>Primary: Backup for output
Just make sure output is sent after acknowledgement from backup.
Page 11 of 39
Detecting & Responding to Failure
- Heartbeat + monitoring log traffic
To avoid split brain, it uses an atomic set & lock operation on shared storage. Thus, avoids it.
If backup dies, primary will leave recording mode.
If primary dies, backup waits until log entry is consumed, gets a lock on shared storage, gets another backup VM up.
flowchart TD
Backup -->|informs| ClusteringService
ClusteringService -->|clones primary VM| SetUpBackup[set up backup]
Page 12 of 39
Raft: An Understandable Consensus Algorithm (2014)
- Raft → consensus algorithm
Abstract
- (Works only on majority)
An easier version of Paxos.
It separates the key elements of the consensus such as:
- Leader election
- Log replication
- Safety
- Understandability
- Decomposition
- State reduction
Introduction
flowchart TD
subgraph Consensus Module
c1[Client 1]
c2[Client 2]
c3[Client 3]
s1[Server 1]
s2[Server 2]
s3[Server 3]
Output
c1 --> s1
c2 --> s1
c3 --> s1
s1 --> Output
s1 --> s2
s1 --> s3
end
The consensus algorithm's job is to keep the replicated log consistent.
They (consensus algo.) have the following properties:
- They ensure safety (never incorrect results) under all non-byzantine partitions, including network delays, partitions, and packet loss.
- They are fully functional as long as majority of servers are operational.
- They don't depend on timing to ensure consistency of logs.
Page 13 of 39
Algorithm
- A lot of servers (typically 5)
- Leader
- Follower
- Candidate
In normal state, one is leader and rest are followers.
Leader does lot of things.
Followers respond to requests from leaders & candidates.
Candidates are used to elect new leader.
stateDiagram-v2
[*] --> Follower
Follower --> Candidate: timeout
Candidate --> Leader: receive votes from majority
Leader --> Follower: discover another leader or new term
Follower --> Candidate: discover server with higher term
Server States
-
Raft servers communicate using RPCs
- Request Vote RPCs
- Append Entries RPC
-
Leader only commits an entry once it has been replicated to majority of servers.
-
During election, they will not get elected, who have lower commit index.
Note: Every term has at most one term.
Page 14 of 39
Another requirement for RAFT is timing.
- Broadcast time << election timeout << MTBF
(Call ping)
average time between failures for a single server
Cluster Membership Change
- Can be risked if two leaders for the same term.
To ensure safety, it uses 2-phase approaches.
flowchart LR
OldConfig[Old Configuration] --> JointConsensus[Joint Consensus/Transitional Configuration] --> Commit[Commit] --> NewConfig[New Configuration]
- Log entries are replicated to all servers in both configurations.
- Any server from either configuration
- Agreement (for elections & entry commit) requires separate majority from both the old & new configurations.
Page 15 of 39
Log Compaction
Snapshot current system state & log up to that point is discarded.
flowchart LR
subgraph LogIndex
1 --> 1
2 --> 1
3 --> 2
4 --> 3
5 --> 3
6 --> 3
end
before --> snapshot
snapshot --> after
snapshot
last included index: 5
last included term: 3
state machine state: _
K = committed
Page 16 of 39
Yahoo Zookeeper (2010)
Abstract
- Zookeeper: coordinating service for distributed systems/applications
Elements of group messaging + shared registers + distributed lock service, in a replicated centralized service.
- It has wait-free aspects with an event-driven mechanism to provide powerful coordination service.
- FIFO requests & linearizability for all requests.
Introduction
All distributed systems require coordination.
graph TD
Coordination --> Configuration
Coordination --> LeaderElection[Leader Election/Group Membership]
Coordination --> Locks
There can be several services for each of different coordination services, but ...
- Zookeeper implemented a coordination service and...
Page 17 of 39
Zookeeper Service
- Zookeeper exposes an API that enables
app→ app developers to implement their own primitives. - It also avoids locks and is used with few data objects organized hierarchically as in file systems.
Zookeeper Architecture
flowchart LR subgraph Zookeeper Client Library end subgraph Zookeeper Servers end Zookeeper Client Library <--> Zookeeper Servers
Just like RAFT
Session & Client API
{session}→ Client API
flowchart TD Client -->|"uses ZK service"| Zookeeper Service
- Zookeeper, instead of implementing a more general-purpose system, provides a file system.
- Replaces concept of "file" with "znode", and provides: create, delete, exists, setData, getData, getChildren API.
/master
master1.foobar.com:2273
/workers
worker-1 "worker1"
worker-2 "worker2"
znode is an in-memory data node of zookeeper data.
Page 18 of 39
Znodes
- There are two types of znodes clients can create:
- Regular (persistent, creation & deletion explicit)
- Ephemeral (let system remove automatically)
- It also allows sequential flag & watchers to allow clients to receive timely notifications of changes without requiring polling.
Zookeeper Guarantees
- Linearizable Writes
- All
any→ any request that update the state of Zookeeper are serializable.
- All
- FIFO Client Order
- All requests from a given client are executed the same way they were sent.
Note: The system is not
linearizable→ linearizable; it allows stale data to be read from replicas.
But, since it has FIFO client order, client will read rich data of its own.
Page 19 of 39
Scalability
- It also allows service to scale linearly.
- If performance scales better :)
- Only for read-heavy apps, though.
This can be explained via:
- Leader config update + notification
- Sync operation (write before read)
So, guarantees are acceptable. This works as long as majority of servers are alive (just like Raft).
Building Primitives
- Zookeeper looks cool IMO:
- It can do:
- Group Membership
- Efficient Lock Mechanisms
- Read/Write Locks
- etc.
- It can do:
Applications using Zookeeper
zoo→ Yahoo Fetching (web crawling) service- Message Brokers
- Kafka
- & much more...
Page 20 of 39
Zookeeper Implementation
flowchart LR
subgraph Zookeeper Service
RequestProcessor -->|Txn| AtomicBroadcast
AtomicBroadcast -->|Txn| ReplicatedDB
ReplicatedDB -->|response| RequestProcessor
end
Write request → Request Processor → Atomic Broadcast → Replicated DB → response Read request → Replicated DB → Request Processor → response
Note: Zookeeper can deliver same message twice, but I think they are
too important→ idempotent.
Key Components
- Atomic Broadcast
- Update Zookeeper state → Leader → Broadcast
- Request Processor
- setDataTxn if success, errorTxn if fail
- Replicated DB
- In-memory
- Uses snapshots to recover faster
- Client-Server Interactions
- On write request → notification → watchers
- On read request → tagged with zxid → FIFO order (may return stale value)
Page 21 of 39
Bitcoin (2009)
A peer-to-peer version of e-cash allowing online payments without having to go through a financial institution.
Key Features
- Network timestamp transactions
- Batched into blocks
- Based on proof-of-work
Introduction
Motive
- Having computationally impractical mechanisms to reverse or commit fraud.
- Allows routine escrow mechanisms so both buyer & seller are satisfied.
- Solves double-spending problem.
Transactions
- Electronic coin = chain of digital signatures
sequenceDiagram
participant Owner1
participant Owner2
participant Owner3
Owner1->>Owner2: Transfer (sign with Owner1's Private Key)
Owner2->>Owner3: Transfer (sign with Owner2's Private Key)
Page 22 of 39
- It is basically a digital signature stored on HDD.
- Buyer signs the (public key + previous transaction) of the next owner.
- Payee can verify the transaction but doesn't know if payer double-spent the coin.
Approaches
-
Use a bank-like system (Not decentralized)
-
Use a P2P system
- A system for participants to agree on a single list of the order in which they were received.
Timestamp Server
- To avoid double-spending, it introduces a timestamp server.
- It takes a hash of block of items and timestamp and publishes the hash.
- It forms a chain as follows:
flowchart LR
A[Block: List of Items] --> B[Hash]
B --> C[Block: List of Items]
C --> D[Hash]
Page 23 of 39
Proof of Work
For a distributed timestamp server on a P2P basis, Bitcoin uses proof of work.
- Add nonce to prev hash → starts with number of zero bits.
flowchart LR
subgraph Block
PrevHash --> Nonce
Nonce --> TX1
Nonce --> TXn
end
subgraph Block2
PrevHash2 --> Nonce2
Nonce2 --> TX2_1
Nonce2 --> TX2_n
end
- It avoids IP-based one-vote system and uses computation-power-based government.
Network
- New transactions are broadcast to all nodes.
- Each node collects new transactions into a block.
- Each node works on finding a difficult proof-of-work for its block.
- When a node finds proof-of-work, it broadcasts the block to all nodes.
- Nodes accept the block only if all transactions in it are valid & not already spent.
- Nodes express their acceptance of the block by working on creating the next block in the chain, using the hash of the accepted block as the previous hash.
- Nodes always consider the longest chain to be the correct one & in case of branching, saves both & selects longest one as soon as possible.
Page 24 of 39
Incentive
- Halves every 4 years.
- Plus transaction fees.
Reclaiming Disk Space
- Uses Merkle tree with root included in block hash.
- Allows less space and avoids sequential storing of transactions.
- A user only needs to keep a copy of the block headers of the longest proof-of-work chain, by querying network nodes.
flowchart LR
A[Prev Hash] --> B[Nonce]
B --> C[Merkle Root]
C --> D[Hash01]
C --> E[Hash23]
D --> F[Hash2]
E --> G[Hash3]
F --> H[TX3]
Page 25 of 39
Object Storage on CRAQ (2009)
Princeton University
Chain replication with apportioned queries.
Overview
A distributed object storage system focused on improving chain replication & read throughput while maintaining strong consistency.
sequenceDiagram
participant Client
participant Rep1
participant Rep2
participant Rep3
Client->>Rep1: Write request
Rep1->>Rep2: Write request
Rep2->>Rep3: Write request
Rep3-->>Client: Response
Client->>Rep3: Read request
Rep3-->>Client: Response
- It creates hot-spot as read goes through one node.
- It has stronger consistency & cheaper write creation.
Introduction
- Object stores support:
- Read (retrieve object block under an object name)
- Write (change the state of a single object)
- Just like key-value DBs and are highly scalable.
- Object namespace is partitioned over many machines & each data object is replicated several times.
- Chain replication looks promising but only read throughput pulls it down — that's where CRAQ comes in.
Page 26 of 39
CRAQ Key Features
- Basically, it divides read operations over all nodes in a chain.
- Supporting read operation while preserving strong consistency (load balancing).
- Allows eventual consistency among read operations and specify maximum staleness value.
- Geographical load balancing, pipelined mini-transactions & use of multicast to improve write performance for large object writes.
CRAQ (uses Zookeeper as coordination service)
It works as follows:
- Each node in the chain can store multiple versions of an object: one clean version and a dirty version post recent write. All versions are initially clean.
- When node receives a new version of an object, it appends this latest version to its list for the object:
- If not tail, mark as dirty.
- Otherwise, mark as clean (committed).
Page 27 of 39
-
When an ack message for an object version arrives at a node, the node marks object version as clean and deletes all prior versions of the object.
-
On read request:
- If version (latest) is clean, return value.
- Otherwise, contact tail & ask for tail's last committed version number.
This model allows CRAQ to support three forms of consistency:
- Strong consistency (as above)
- Eventual consistency (don't ask tail for latest committed version no.)
- EC with maximum-bounded inconsistency (as its name suggests)
Problems with CRAQ
- If one node is slow, affects all of the nodes in a chain.
- We can also create multiple chains instead of CRAQ to have even more simplicity.
Page 28 of 39
Aurora (2017)
Design considerations for high throughput cloud-native relational DBs.
Abstract
- A relational DB service for OLTP offered as part of AWS (transaction logs).
- Since, today storage & compute is better today than networks, Aurora addresses this constraint.
- It also uses an efficient asynchronous scheme to achieve consensus without chatty & expensive recovery protocols.
Introduction
Well, problem nowadays is network between DB tier and storage tier.
Before starting, let's talk about internals of DB (SQL):
- SQL server writes modified data page to the disk asynchronously:
- Lazy writing → inject some infrequently used pages.
- Eager writing → during non-logged operations.
- Checkpoint → periodic check to scan the buffer cache & write dirty pages to the disk.
On read request/SQL query (btw SQL server stores data in 8 KB pages).
Page 29 of 39
- It locates the data & retrieves the pages that contain the information.
- By retrieving it first from the disk to buffer cache.
- Finally, returns result to the reader.
flowchart LR
User -->|SQL| BufferCache
BufferCache -->|Disk| Disk
Disk -->|Hit| BufferCache
BufferCache --> User
On write request, update query to be precise:
- Locate page in buffer cache, if not found, retrieve into buffer cache from disk.
- Acquire locks on the page to be updated, update them now.
- Now, pages are called dirty page.
- This modification is recorded in transaction log and an ack is sent to the user.
- Changed page is still in buffer cache, later, by one of the processes, changes are flushed into disk.
Coming back, partial, first page miss or background task can cause context switches & resource contention.
Page 30 of 39
Aurora Architecture
Amazon Aurora addresses these issues by leveraging the redo-log across a highly distributed cloud environment.
flowchart TD
subgraph SQL
Transactions
Caching
end
SQL --> LoggingStorage
LoggingStorage --> AmazonS3
Fig: Move logging & storage off the database engine
flowchart TD
subgraph Compute
SQL
Transactions
Caching
Logs
end
Compute --> AttachedStorage
AttachedStorage -->|Fault tolerant| TraditionalDB
- Network growth is the bottleneck.
The Aurora architecture offers 3 major advantages:
Page 31 of 39
- Protection of the DB from performance variance & transient or permanent failures.
- Reduce network IOPS by only writing redo log records to storage.
- Expensive operations such as (backup & redo recovery) are offloaded to continuous asynchronous operations across a large distributed fleet.
Durability
Option 1: Replication
- 3 ways with 1 copy per AZ
- Use read/write quorums of 2/3
Can it survive?
- i) AZ Failure ✓ (2/3 still live)
- ii) AZ+1 Failure X (lose 2/3)
Option 2: Replicate 6 copies with 2 copy per AZ
- write quorum of 4/6, read quorum 3/6
Can it survive?
- i) AZ failure ✓ (4/6 still live)
- ii) AZ+1 failure (still have 3 copies)
Page 32 of 39
- With 6 copies, how to get the largest DB size?
- Partition volume into n fixed-size segments and replicate each segment 6-ways into a protection pool (CPG).
- If segments are too small, failures are more likely. Otherwise, it takes too long.
- So, they choose 10 GB as segment size, since on a 10Gbps network line, it takes only 10 seconds to repair (so, a total of 1 minute).
Offloading the heavy quarters of traditional DB to this tier. (LOG IS THE DB)
In traditional DBs, replication leads to write amplification & complexity. Aurora, with its different architecture, offloads work by:
flowchart TD
DatabaseInstance -->|log record| StorageNode
StorageNode -->|ack| DatabaseInstance
DatabaseInstance -->|Update| StorageNode
StorageNode -->|ack| DatabaseInstance
StorageNode -->|continuous backup| S3
S3 -->|Backup| StorageNode
StorageNode -->|checksum| DatabaseInstance
Page 33 of 39
Normal Operations: Log Sequential Numbers
- For operations, to verify, it uses LSN (latest) to decide.
- Writes:
- VDL = Volume Durable LSN (Trusted data for each replica)
- VCL = Volume Complete LSN (Guarantees availability of all prior records)