[go: up one dir, main page]

0% found this document useful (0 votes)
45 views50 pages

Elixir Dynamo

The document discusses the advantages of using Elixir over C++ for implementing a distributed system like Dynamo, highlighting Elixir's built-in concurrency, fault tolerance, and ease of use. It also provides a detailed overview of the Dynamo paper, outlining its design goals, data partitioning, replication mechanisms, and failure handling techniques. Additionally, it presents a project plan for creating a Dynamo-inspired key-value store in Elixir, detailing system architecture, implementation phases, and testing strategies.

Uploaded by

daiveekiitkgp
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
45 views50 pages

Elixir Dynamo

The document discusses the advantages of using Elixir over C++ for implementing a distributed system like Dynamo, highlighting Elixir's built-in concurrency, fault tolerance, and ease of use. It also provides a detailed overview of the Dynamo paper, outlining its design goals, data partitioning, replication mechanisms, and failure handling techniques. Additionally, it presents a project plan for creating a Dynamo-inspired key-value store in Elixir, detailing system architecture, implementation phases, and testing strategies.

Uploaded by

daiveekiitkgp
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 50

Elixir vs C++ for Dynamo

final-project-suggestions.pdf
PDF

amazon-dynamo-sosp2007.pdf
PDF

We are planning to do dynamo as our project in elixir/C++

which is better and why

When implementing a distributed system like Dynamo, Elixir tends to be a more natural fit
than C++ for several reasons:

Built-In Concurrency & Fault Tolerance:


Elixir runs on the BEAM virtual machine, which is designed for lightweight, isolated
processes and robust fault tolerance. These features match well with the distributed
nature of Dynamo, where handling many concurrent operations and failures is key.

Ease of Use & Development Speed:


The course materials even suggest using Elixir because it lets you reuse existing
emulation setups and leverages familiarity you might already have. This can significantly
reduce development complexity compared to setting up similar infrastructure in C++.

Abstraction Over Low-Level Details:


While C++ offers greater control and potential for high performance, it requires
manually handling many low-level aspects of concurrency, memory management, and
error recovery. In contrast, Elixir’s abstractions (like OTP and supervision trees) simplify
building and maintaining a resilient distributed system.

Project Focus:
For a project like Dynamo, the primary challenges are often related to correctly modeling
distributed behaviors (such as gossip protocols, anti-entropy, and vector clocks) rather
than raw performance. Elixir’s ecosystem is well-suited for these kinds of problems.

In summary, unless your project has very specific performance requirements or your team
has deep expertise in C++’s concurrency models, Elixir is generally the better choice for a
Dynamo-like distributed system project. This recommendation is supported by the project
guidelines, which encourage the use of Elixir for its strengths in distributed computing. ​

1/50
Can you explain dynamo paper to me in great detail

The Dynamo paper presents a highly available key-value storage system that Amazon
developed to support internal services with very strict availability and performance
requirements. Here’s a detailed explanation of the paper’s main components and design
ideas:

1. Motivation and Design Goals


High Availability:
Dynamo was built to ensure that write operations are never rejected—even under
conditions of network partitions or node failures. For example, customer-facing services
like shopping carts must remain “always writeable” so that user actions are never lost.

Eventual Consistency:
Instead of enforcing strict consistency, Dynamo allows replicas to become temporarily
inconsistent with the guarantee that, eventually, all updates will propagate. This trade-
off is essential in environments where downtime or rejected operations are
unacceptable.

Scalability and Decentralization:


The system is designed to scale incrementally, handling thousands of servers spread
across multiple data centers. It avoids centralized coordination by using a peer-to-peer
(decentralized) architecture, which eliminates single points of failure.

2. Data Partitioning and Load Balancing


Consistent Hashing:
Dynamo employs consistent hashing to map both data keys and nodes onto a circular
“ring.” This method minimizes the amount of data that needs to be relocated when
nodes join or leave the system.

2/50
Virtual Nodes:
To improve load distribution and accommodate heterogeneous node capacities, each
physical node is assigned multiple tokens (or virtual nodes). This technique helps
balance the key-space more uniformly across the system.

Preference Lists:
For every key, Dynamo defines a preference list—a set of nodes (usually N nodes)
responsible for storing that key. This replication strategy supports high durability and
availability by spreading the key’s data across multiple nodes.

3. Data Replication and Consistency Mechanisms


Replication Factors and Quorum-like Operations:

N: Total number of replicas for each key.

W: Minimum number of nodes that must acknowledge a write for it to be


considered successful.

R: Minimum number of nodes that must participate in a read operation.


Typically, the parameters are tuned such that R + W > N, which is a “quorum”
approach—but with flexibility to lower R or W when needed to favor availability or
latency.

Versioning with Vector Clocks:


To manage concurrent updates and track causality between versions, Dynamo uses
vector clocks. Each update is tagged with a vector clock that records the history of
writes. When conflicting versions occur (for instance, due to concurrent writes during a
partition), the system returns all divergent versions so that the application can resolve
the conflict.

Conflict Resolution:
Dynamo delegates the responsibility for resolving conflicts to the application. This can
be done using simple strategies like “last write wins” or more complex, application-
specific merge logic. The design ensures that no update is lost, even if the client must
later reconcile multiple versions.

3/50
4. Failure Handling and Recovery Techniques
Gossip-based Membership and Failure Detection:
Nodes use a decentralized, gossip-based protocol to exchange information about the
status of other nodes. This allows the system to quickly detect node failures or
recoveries without a centralized monitor.

Hinted Handoff:
In the event that a node is temporarily unavailable during a write, the coordinator will
send the update to an alternate node along with a “hint” indicating the intended
destination. Once the failed node recovers, these hints are used to repair and
synchronize the data.

Anti-Entropy with Merkle Trees:


To ensure that replicas eventually converge, Dynamo employs a background
synchronization mechanism based on Merkle trees. This technique efficiently identifies
and reconciles differences between replicas without transferring the entire data set.

5. Operational Considerations and Performance Trade-offs


Latency and Performance:
Dynamo’s design is optimized for low latency—especially at the high percentiles (e.g.,
99.9th percentile response times). By allowing operations to complete based on a subset
of replicas, the system maintains high performance even under load.

Trade-off Between Consistency and Availability:


Dynamo deliberately sacrifices strong consistency to ensure that services remain
available. This means that, under certain failure conditions, reads might return stale or
divergent data until the system’s anti-entropy processes reconcile differences.

Tuning Parameters:
The flexibility of configuring N, R, and W allows operators to balance the trade-offs
between availability, consistency, and durability based on the specific needs of each
application.

4/50
6. Summary of Key Innovations
Decentralized Architecture:
There is no single master; every node plays an equivalent role in processing requests,
which simplifies scaling and fault tolerance.

Flexible Consistency Model:


By using vector clocks and application-level conflict resolution, Dynamo supports a
system where availability is prioritized while still ensuring eventual consistency.

Robust Failure Handling:


Techniques such as hinted handoff and anti-entropy ensure that even when parts of the
system experience temporary failures, the overall system can recover without data loss.

In essence, the Dynamo paper provides a blueprint for designing a distributed storage
system that remains operational under adverse conditions by carefully balancing the trade-
offs between availability, consistency, and performance. This approach has not only
influenced the design of other modern distributed databases but has also demonstrated
how eventual consistency models can be effectively applied in large-scale, real-world
systems.

If you’d like to dive even deeper into any specific aspect—such as the inner workings of
vector clocks, the detailed performance measurements, or the implementation of anti-
entropy protocols—feel free to ask!

A network error occurred. Please check your connection and try again. If this issue persists
please contact us through our help center at help.openai.com.

Can you provide a project overview of what to do for this


project in elixir

Below is an overview for a project where you implement a weakly consistent key-value store
inspired by Dynamo—in Elixir. This project plan outlines both the system’s design and your
evaluation strategy.

5/50
Project Title
Implementing a Dynamo-Inspired Distributed Key-Value Store in Elixir

1. Introduction and Objectives


Goal:
Build a distributed key-value store that prioritizes high availability and eventual
consistency.

Motivation:
Modern applications (e.g., shopping carts) require that writes are never rejected, even
during network or node failures. Inspired by Dynamo’s design, the system will allow
temporary inconsistencies with the guarantee that updates eventually propagate to all
replicas.

Why Elixir:
Elixir’s lightweight processes, OTP framework, and built-in support for distributed
computing make it an excellent choice for modeling isolated processes and handling
fault tolerance easily.

2. System Architecture and Key Components


A. Data Partitioning and Replication
Consistent Hashing:
Map both keys and nodes onto a logical ring to evenly distribute data. Use virtual nodes
to address load imbalance and support heterogeneous hardware.

Preference Lists & Replication:


Each key will be replicated across N nodes. The preference list determines which nodes
store a given key, ensuring durability and resilience.

6/50
B. Consistency and Versioning
Vector Clocks:
Each key-value pair carries a vector clock to capture causal relationships between
updates. This allows your system to detect concurrent updates and manage conflicts.

Conflict Resolution:
The application (or a reconciliation process) will merge divergent versions. You can
experiment with strategies ranging from “last write wins” to custom merge logic.

C. Failure Handling and Recovery


Gossip-Based Membership:
Implement a decentralized protocol that allows nodes to share their status (alive, failed,
or recovering). This ensures the system dynamically adapts to changes.

Hinted Handoff:
When a node is temporarily unreachable, another node will store the update along with
a “hint” so that the original node can be updated once it recovers.

Anti-Entropy:
Use background synchronization (for example, via Merkle trees or an equivalent
mechanism) to detect and reconcile differences among replicas.​

3. Implementation Roadmap
Phase 1: Core Key-Value Operations
Develop the basic get and put operations.

Implement consistent hashing to determine data placement.

Set up replication across a configurable number of nodes (N).

Phase 2: Versioning and Conflict Handling


Integrate vector clocks with each write.

Return all conflicting versions upon a read, letting the application decide on
reconciliation.

7/50
Phase 3: Distributed Process Management
Leverage OTP to manage node processes and supervisors.

Implement a gossip protocol for membership and failure detection.

Phase 4: Handling Failures


Add hinted handoff to manage temporary node failures.

Implement anti-entropy routines for background replica synchronization.

Phase 5: Metrics and Testing


Simulation of Faults:
Introduce network latencies, message drop rates, and node failures to test the system’s
robustness.

Metrics Collection:

Stale Data Rate: How frequently a read returns an outdated value.

Latency Measurements: Average and high-percentile latencies for read and write
operations.

Consistency Metrics: Track how quickly and effectively the system converges after
failures.

Hypothesis:
Under increased network delays or failure rates, you expect a higher occurrence of stale
reads; however, the system should maintain high availability and eventually reconcile
inconsistent states.

4. Testing Strategy and Validation


Automated Tests:
Write tests to simulate various failure conditions and measure system performance.

Emulation Environment:
Utilize the provided emulation setup (or create your own test harness) to simulate real-
world conditions.

8/50
Data Analysis:
Log operations and use Elixir’s built-in tools to analyze metrics. Compare the observed
behavior against your hypothesis.

5. Conclusion
This project will demonstrate:

How a Dynamo-inspired system can be built using Elixir’s robust concurrent


programming model.

The trade-offs between availability and consistency in a real-world distributed


environment.

How effective fault tolerance mechanisms (gossip, hinted handoff, anti-entropy)


contribute to a resilient system.

By following this roadmap, you’ll not only implement a functional distributed key-value store
but also gain practical insights into the complexities of designing systems that balance
consistency, availability, and performance.

Feel free to ask if you need further clarification on any component of this project plan!

Protocol.ex :-
defmodule Dynamo do
# Importing modules
import Emulation, only: [send: 2, whoami: 0]

import Kernel,
except: [spawn: 3, spawn: 1, spawn_link: 1, spawn_link: 3,
send: 2]
# import Logger

alias ExHashRing.Ring
alias ExHashRing.Node
alias ExHashRing.Hash
alias __MODULE__

9/50
# Requiring modules
require Fuzzers

@enforce_keys [:nodes, :vnodes, :clients, :read_quorum,


:write_quorum, :replication_factor, :ring]
defstruct(
node: nil,
# List of server names
nodes: nil,
# No of virtual nodes per server
vnodes: nil,
# List of client names
clients: nil,
# N: no of nodes to replicate
replication_factor: nil,
#R
read_quorum: nil,
#W
write_quorum: nil,
# Consistent Hash Ring
ring: nil,
# Key value store
kv_store: nil,
# Seq number from co-ordinator
seq: nil,
# Map from seq number to responses count
response_count: nil,
# Map from seq number to responses list
responses: nil,
# Map from each vnode to keys stored
vnodeToKeys: nil,
# Map from each server to all nodes status
status_of_nodes: nil,
# Map of server request to their timer
requestTimerMap: nil,
# Map of seq to client request to their timer
clientRequestTimerMap: nil,
# Map of hinted handOff with vnode index
hintedHandedOffMap: nil,
# Simulated failure boolean
inFailedState: nil

10/50
)

#
@doc """
Create a new configuration
"""
@spec new_configuration(list(), non_neg_integer(), list(),
non_neg_integer(), non_neg_integer(), non_neg_integer()) ::
%Dynamo{
nodes: list(),
clients: list(),
read_quorum: non_neg_integer(),
write_quorum: non_neg_integer(),
replication_factor: non_neg_integer()
}
def new_configuration(nodes, vnodes, clients,
replication_factor, read_quorum, write_quorum) do
{:ok, ring} = Ring.start_link()
%Dynamo{
nodes: nodes,
vnodes: vnodes,
clients: clients,
replication_factor: replication_factor,
read_quorum: read_quorum,
write_quorum: write_quorum,
ring: ring,
kv_store: %{},
seq: -1,
response_count: %{},
responses: %{},
vnodeToKeys: %{},
status_of_nodes: Enum.into(nodes, %{}, fn node ->
{node, {"Healthy", 0}} end),
requestTimerMap: %{},
clientRequestTimerMap: %{},
inFailedState: false
}
end

#
@doc """
Make a node as server

11/50
"""
@spec make_server(%Dynamo{}) :: no_return()
def make_server(state) do
Ring.add_node(state.ring, whoami(), state.vnodes)
timer = Emulation.timer(25, :antientropy)
timer = Emulation.timer(50, :gossip)
state = %{state | status_of_nodes:
Map.put(state.status_of_nodes, whoami(), {"Healthy",
Emulation.now()})}
server(%{state | node: whoami()})
end

def find_start_index(list, candidate) when is_list(list) do


find_start_index(list, candidate, 0)
end

defp find_start_index([head | tail], candidate, index) do


if head > candidate do
index
else
find_start_index(tail, candidate, index + 1)
end
end

def circular_traversal(list, start_index, count,


unhealthy_nodes) do
length = length(list)
start_index = rem(start_index, length)
traverse_circular(list, start_index, count,
unhealthy_nodes, MapSet.new(), [], MapSet.new(), [])
end

defp traverse_circular(_list, _index, 0, unhealthy_nodes,


distinct_items, preference_list, distinct_skipped_items,
skipped_items_list) do
{preference_list, skipped_items_list}
end

defp traverse_circular(list, index, count, unhealthy_nodes,


distinct_items, preference_list, distinct_skipped_items,
skipped_items_list) do
element = Enum.at(list, index)

12/50
case MapSet.member?(distinct_items, element) do
true ->
next_index = rem(index + 1, length(list))
traverse_circular(list, next_index, count,
unhealthy_nodes, distinct_items, preference_list,
distinct_skipped_items, skipped_items_list)
false ->
case MapSet.member?(unhealthy_nodes, element) do
true ->
case MapSet.member?(distinct_skipped_items,
element) do
true ->
next_index = rem(index + 1, length(list))
traverse_circular(list, next_index, count,
unhealthy_nodes, distinct_items, preference_list,
distinct_skipped_items, skipped_items_list)

false ->
skipped_items_list = skipped_items_list ++ [index]
next_index = rem(index + 1, length(list))
distinct_items = MapSet.put(distinct_items,
element)
traverse_circular(list, next_index, count,
unhealthy_nodes, distinct_items, preference_list,
distinct_skipped_items, skipped_items_list)
end

false ->
preference_list = preference_list ++ [index]
next_index = rem(index + 1, length(list))
distinct_items = MapSet.put(distinct_items, element)
traverse_circular(list, next_index, count - 1,
unhealthy_nodes, distinct_items, preference_list,
distinct_skipped_items, skipped_items_list)
end
end
end

def get_preference_list(state, key) do


ring = state.ring

13/50
{:ok, nodes} = Ring.get_nodes_with_replicas(ring)
{hashList, nodeList} = Enum.unzip(Node.expand(nodes))
start_index = find_start_index(hashList, Hash.of(key))
unhealthy_nodes = state.status_of_nodes
|> Enum.filter(fn {_node, {status, _timestamp}}
-> status == "Failed" end)
|> Enum.map(fn {node, _} -> node end)
unhealthy_nodes = MapSet.new(unhealthy_nodes)
if length(state.nodes) - MapSet.size(unhealthy_nodes) <
state.replication_factor do
raise("No sufficient healthy nodes")
end
{pref_list_indices, skip_list_indices} =
circular_traversal(nodeList, start_index,
state.replication_factor, unhealthy_nodes)
pref_list = Enum.map(pref_list_indices, fn index ->
Enum.at(nodeList, index) end)
skip_list = Enum.map(skip_list_indices, fn index ->
Enum.at(nodeList, index) end)
{pref_list, pref_list_indices, skip_list, skip_list_indices}
end

# Broadcast a message to all nodes in the configuration


(excluding the sender.)
@spec bcast(list(), any()) :: list()
defp bcast(node_list, msg) do
node_list
|> Enum.map(fn node -> if node != whoami() do
send(node, msg) end end)
end

# AntiEntropy syncronisation, same intution as Merkle


tress, can be optimised through Merkle Tree comparision
def syncronisation(sender, receiver, sender_kv_store,
receiver_kv_store) do
# Collecting all {sender_vnode, receiver_vnode} where
server_node matches sender_node
sender_receiver_vnode_pairs = receiver_kv_store
|> Enum.map(fn {_, {_, _, {server, sender_vnode},
receiver_vnode, _}} ->
if server == sender, do: {sender_vnode, receiver_vnode},
else: nil

14/50
end)
|> Enum.reject(&is_nil/1)

Enum.reduce(sender_receiver_vnode_pairs,
receiver_kv_store, fn {sender_vnode, receiver_vnode},
acc_kv_store ->
sender_vnode_kv_store = sender_kv_store
|> Enum.filter(fn {_, {_, _, {_, vnode}, _, _}} -> vnode ==
sender_vnode end)

Enum.reduce(sender_vnode_kv_store, acc_kv_store, fn
{key, sender_key_data}, acc_inner_kv_store ->
case Map.get(acc_inner_kv_store, key) do
nil ->
{v, sender_vector_clock, {s, sv}, _, sender_clock} =
sender_key_data
updated_receiver_data = {v, sender_vector_clock, {s,
sv}, receiver_vnode, sender_clock}
acc_inner_kv_store = Map.put(acc_inner_kv_store,
key, updated_receiver_data)

receiver_key_data ->
{v, sender_vector_clock, {s, sv}, _, sender_clock} =
sender_key_data
{_, receiver_vector_clock, {_, _}, _, receiver_clock} =
receiver_key_data
if
Dynamo.VectorClock.lessThanEqualTo(sender_vector_clock,
receiver_vector_clock) do
# Receiver is more or equally updated so do
nothing
acc_inner_kv_store
else
if
Dynamo.VectorClock.greaterThan(sender_vector_clock,
receiver_vector_clock) do
# Receiver can be less updated.
updated_receiver_data = {v, sender_vector_clock,
{s, sv}, receiver_vnode, sender_clock}
acc_inner_kv_store = Map.put(acc_inner_kv_store,
key, updated_receiver_data)
else

15/50
# Reciver and Sender are unrelated. Compare
clocks
if sender_clock > receiver_clock do
updated_receiver_data = {v, sender_vector_clock,
{s, sv}, receiver_vnode, sender_clock}
acc_inner_kv_store = Map.put(acc_inner_kv_store,
key, updated_receiver_data)
else
# Do nothing
acc_inner_kv_store
end
end
end
end
end)
end)
end

# Implementing Gossip Protocol


def gossipExchange(sender, receiver, sender_nodes_status,
receiver_nodes_status) do
Enum.reduce(sender_nodes_status,
receiver_nodes_status, fn {node, {sender_node_status,
sender_node_timestamp}}, acc_nodes_status ->
case Map.get(acc_nodes_status, node) do
nil ->
acc_nodes_status = Map.put(acc_nodes_status, node,
{sender_node_status, sender_node_timestamp})
{receiver_node_status, receiver_node_timestamp} ->
if sender_node_timestamp >
receiver_node_timestamp do
acc_nodes_status = Map.put(acc_nodes_status, node,
{sender_node_status, sender_node_timestamp})
else
acc_nodes_status
end
end
end)
end

def makeHealthy(state, sender) do


%{state | status_of_nodes:

16/50
Map.put(state.status_of_nodes, sender, {"Healthy",
Emulation.now()})}
end

def server(state) do
receive do
{sender,
%Dynamo.ServerPutRequest{
key: key,
value: value,
client: client,
replication: replication,
seq: seq,
context: context,
vnodeIndex: vindex
}} ->

IO.puts("Put request received at #{whoami()}")


if state.inFailedState == true do
if not replication do
new_server = Enum.random(state.nodes)
send(new_server, %Dynamo.ServerPutRequest{
key: key,
value: value,
client: client,
replication: replication,
seq: seq,
context: context,
vnodeIndex: vindex
})
end
server(state)
else
if not replication do
# IO.inspect("Getting preference list")
{preference_list, preference_list_indices, skip_list,
skip_list_indices} = get_preference_list(state,key)
IO.inspect("Done preference list")
start_index = Enum.at(preference_list_indices,0)
if(Enum.at(preference_list,0) == whoami()) do
IO.inspect(preference_list)
IO.inspect(skip_list)

17/50
if context != nil do
state = %{state | kv_store:
Map.put(state.kv_store,key,{value, context,{whoami(),
start_index}, start_index, Emulation.now()})}
end
state = %{state | kv_store:
Dynamo.VectorClock.updateVectorClock(state.kv_store, key,
value, whoami(), state.seq + 1, start_index)}
state = %{state | vnodeToKeys:
Map.put(state.vnodeToKeys, start_index,
Enum.uniq(Enum.concat(Map.get(state.vnodeToKeys,
start_index,[]), [key])))}
t = Emulation.timer(500,
{:clientPutRequestTimeOut, whoami(), client, key, value,
state.seq+1})
state = %{state | clientRequestTimerMap:
Map.put(state.clientRequestTimerMap, {whoami(), client,
key, value, state.seq+1}, t)}
IO.puts("Put done at for key:#{key} and value:#
{value} at seq:#{state.seq + 1} at #{whoami()}")
state =
if state.write_quorum == 1 do
requestTimer =
Map.get(state.clientRequestTimerMap, {whoami(), client,
key, value, state.seq + 1}, nil)
Emulation.cancel_timer(requestTimer)
state = %{state | clientRequestTimerMap:
Map.delete(state.clientRequestTimerMap, {whoami(), client,
key, value, state.seq + 1})}
send(client, {:ok, key})
state
else
state
end
preference_list_tuple = Enum.zip(preference_list,
preference_list_indices)
new_state = Enum.reduce(preference_list_tuple,
state, fn {node, vnode_index}, acc_state ->
if node != whoami() do
t = Emulation.timer(25,
{:requestTimeout, whoami(), node, key, state.seq + 1})
new_state = %{acc_state |

18/50
requestTimerMap:
Map.put(acc_state.requestTimerMap, {whoami(), node, key,
state.seq + 1}, t)
}

send(node,
%Dynamo.ServerPutRequest{
key: key,
value: value,
client: client,
replication: true,
seq: state.seq + 1,
context:
Map.get(new_state.kv_store, key),
vnodeIndex: vnode_index
})

new_state
else
acc_state
end
end)

# IO.puts("Got into co-ordinator and started


broadcast")
server(%{ new_state | seq: state.seq + 1})
else
send(Enum.at(preference_list,0),
%Dynamo.ServerPutRequest{
key: key,
value: value,
client: client,
replication: false,
seq: nil,
context: context
})
server(state)
end
else
# IO.puts("Got into replication at #{whoami()}")
# LAST WRITE WINS

19/50
IO.puts("Put replicated at for key:#{key} and value:#
{value} at seq:#{seq} at #{whoami()}")
new_context = case context do
{a, b, c, _, e} -> {a, b, c, vindex, e}
_ -> context end
state = %{state | kv_store: Map.put(state.kv_store,
key, new_context)}
state = %{state | vnodeToKeys:
Map.put(state.vnodeToKeys, vindex,
Enum.uniq(Enum.concat(Map.get(state.vnodeToKeys,
vindex,[]), [key])))}
send(sender, %Dynamo.ServerPutResponse{
key: key,
value: value,
status: :ok,
client: client,
seq: seq
})
server(%{state | seq: seq})
end
end

{sender,
%Dynamo.ServerGetRequest{
key: key,
client: client,
replication: replication,
seq: seq,
context: context
}} ->

if state.inFailedState == true do
if not replication do
new_server = Enum.random(state.nodes)
send(new_server, %Dynamo.ServerGetRequest{
key: key,
client: client,
replication: replication,
seq: seq,
context: context
})
end

20/50
server(state)
else
if not replication do
IO.puts("Get request received at #{whoami()}")
# IO.inspect("Getting preference list")
{preference_list, preference_list_indices, skip_list,
skip_list_indices} = get_preference_list(state,key)
# IO.inspect("Done preference list")
# if(Enum.at(preference_list,0) == whoami()) do
if Enum.member?(preference_list, whoami()) do
IO.inspect(preference_list)
IO.inspect(skip_list)
value =
if Map.get(state.kv_store,key,nil) == nil do
[]
else
Map.get(state.kv_store,key,nil)
end
state = %{state | responses:
Map.put(state.responses, state.seq + 1, [value])}
t = Emulation.timer(500,
{:clientGetRequestTimeOut, whoami(), client, key,
state.seq+1})
state = %{state | clientRequestTimerMap:
Map.put(state.clientRequestTimerMap, {whoami(), client,
key, state.seq+1}, t)}
IO.puts("Got into co-ordinator and started
broadcast")
# IO.inspect(state.clientRequestTimerMap)
state =
if state.read_quorum == 1 do
requestTimer =
Map.get(state.clientRequestTimerMap, {whoami(), client,
key, state.seq + 1}, nil)
Emulation.cancel_timer(requestTimer)
state = %{state | clientRequestTimerMap:
Map.delete(state.clientRequestTimerMap, {whoami(), client,
key, state.seq + 1})}
send(client, {key, Map.get(state.responses,
state.seq + 1)})
state
else

21/50
state
end
new_state = Enum.reduce(preference_list, state, fn
node, acc_state ->
if node != whoami() do
t = Emulation.timer(25,
{:requestTimeout, whoami(), node, key, state.seq + 1})
new_state = %{acc_state |
requestTimerMap:
Map.put(acc_state.requestTimerMap, {whoami(), node, key,
state.seq + 1}, t)
}

send(node,
%Dynamo.ServerGetRequest{
key: key,
client: client,
replication: true,
seq: state.seq + 1
})

new_state
else
acc_state
end
end)
IO.inspect(new_state.clientRequestTimerMap)
server(%{ new_state | seq: state.seq + 1})
else
send(Enum.at(preference_list,0),
%Dynamo.ServerGetRequest{
key: key,
client: client,
replication: false,
seq: nil
})
server(state)
end
else
IO.puts("Get replicated at for key:#{key} at seq:#
{seq} at #{whoami()}")
value = if Map.get(state.kv_store,key,nil) == nil do

22/50
[]
else
Map.get(state.kv_store,key,nil)
end
send(sender, %Dynamo.ServerGetResponse{
key: key,
value: value,
status: :ok,
client: client,
seq: seq
})
server(%{ state | seq: seq})
end
end

{sender,
%Dynamo.ServerPutResponse{
key: key,
status: status,
client: client,
value: value,
seq: seq,
context: context
}} ->

if state.inFailedState == true do
server(state)
else
IO.puts("Got into server put response at #{whoami()}
with seq: #{seq}")
makeHealthy(state, sender)
if status == :ok and Map.get(state.requestTimerMap,
{whoami(), sender, key, seq}, nil) != nil do
requestTimer = Map.get(state.requestTimerMap,
{whoami(), sender, key, seq}, nil)
Emulation.cancel_timer(requestTimer)
state = %{state | requestTimerMap:
Map.delete(state.requestTimerMap, {whoami(), sender, key,
seq})}
state = if Map.get(state.response_count, seq, nil) ==
nil do
%{state | response_count:

23/50
Map.put(state.response_count, seq, 1)}
else
count = Map.get(state.response_count, seq)
%{state | response_count:
Map.put(state.response_count, seq, count + 1)}
end
if Map.get(state.response_count, seq, nil) ==
state.write_quorum - 1 do
requestTimer =
Map.get(state.clientRequestTimerMap, {whoami(), client,
key, value, seq}, nil)
IO.inspect("---------------------------------------------------------
-")
IO.puts("Cancelling timer at #{whoami()} for #
{inspect(requestTimer)}")
IO.inspect("---------------------------------------------------------
-")

Emulation.cancel_timer(Map.get(state.clientRequestTimerM
ap, {whoami(), client, key, value, seq}, nil))
state = %{state | clientRequestTimerMap:
Map.delete(state.clientRequestTimerMap, {whoami(), client,
key, value, seq})}
IO.inspect(state.clientRequestTimerMap)
send(client, {:ok, key})
server(state)
end
server(state)
end
server(state)
end

{sender,
%Dynamo.ServerGetResponse{
key: key,
value: value,
status: status,
client: client,
seq: seq,
context: context
}} ->

24/50
if state.inFailedState == true do
server(state)
else
IO.puts("Got into server get response at #{whoami()}
with seq: #{seq}")
makeHealthy(state, sender)
if status == :ok and Map.get(state.requestTimerMap,
{whoami(), sender, key, seq}, nil) != nil do
requestTimer = Map.get(state.requestTimerMap,
{whoami(), sender, key, seq}, nil)
Emulation.cancel_timer(requestTimer)
state = %{state | requestTimerMap:
Map.delete(state.requestTimerMap, {whoami(), key, seq})}
state = if Map.get(state.response_count, seq, nil) ==
nil do
%{state | response_count:
Map.put(state.response_count, seq, 1)}
else
count = Map.get(state.response_count, seq)
%{state | response_count:
Map.put(state.response_count, seq, count + 1)}
end

responses =
if value == [] do
Map.get(state.responses, seq, [])
else
Map.get(state.responses, seq, []) ++ [value]
end
state = %{state | responses:
Map.put(state.responses, seq, responses)}

state =
if Map.get(state.response_count, seq, 0) ==
state.read_quorum - 1 do
requestTimer =
Map.get(state.clientRequestTimerMap, {whoami(), client,
key, seq}, nil)
IO.inspect("-------------------------------------------------------
---")
IO.puts("Cancelling timer at #{whoami()} for #
{inspect(requestTimer)}")

25/50
IO.inspect("-------------------------------------------------------
---")
Emulation.cancel_timer(requestTimer)
state = %{state | clientRequestTimerMap:
Map.delete(state.clientRequestTimerMap, {whoami(), client,
key, seq})}
reconciledResponses =
Dynamo.VectorClock.syntaticReconcilationWithValues(Map.
get(state.responses, seq))
state = %{state | responses:
Map.put(state.responses, seq, reconciledResponses)}
send(client, {key, Map.get(state.responses, seq)})
state
else
state
end
server(state)
end
server(state)
end

{:requestTimeout, node, failed_node, key, seq} ->

IO.inspect("Request timed out for #{node} with #


{failed_node} with key : #{key} for seq : #{seq} ")
state = %{state | requestTimerMap:
Map.delete(state.requestTimerMap, {node, failed_node,
key, seq})}
state = %{state | status_of_nodes:
Map.put(state.status_of_nodes, failed_node, {"Failed",
Emulation.now()})}
server(state)

{:clientPutRequestTimeOut, node, client, key, value, seq}


->

IO.inspect("Client Request timed out for Put at #{node}


with #{client} with key : #{key} and value : #{value}")
IO.inspect(Map.get(state.clientRequestTimerMap,
{node, client, key, value, seq}))
IO.inspect(state.clientRequestTimerMap)
state = %{state | clientRequestTimerMap:

26/50
Map.delete(state.clientRequestTimerMap, {node, client, key,
value, seq})}
server = Enum.random(state.nodes)
if state.inFailedState == true do
server(state)
else
send(server, %Dynamo.ServerPutRequest{
key: key,
value: value,
client: client,
replication: false,
seq: nil,
context: nil
})
server(state)
end

{:clientGetRequestTimeOut, node, client, key, seq} ->

if state.inFailedState == true do
IO.inspect("Client Request timed out for Get at #
{node} with #{client} with key : #{key}")
IO.inspect(Map.get(state.clientRequestTimerMap,
{node, client, key, seq},nil))
IO.inspect(state.clientRequestTimerMap)
state = %{state | clientRequestTimerMap:
Map.delete(state.clientRequestTimerMap, {node, client, key,
seq})}
server = Enum.random(state.nodes)
server(state)
else
IO.inspect("Client Request timed out for Get at #
{node} with #{client} with key : #{key}")
IO.inspect(Map.get(state.clientRequestTimerMap,
{node, client, key, seq},nil))
IO.inspect(state.clientRequestTimerMap)
if Map.get(state.clientRequestTimerMap, {node, client,
key, seq},nil) != nil do
state = %{state | clientRequestTimerMap:
Map.delete(state.clientRequestTimerMap, {node, client, key,
seq})}
server = Enum.random(state.nodes)

27/50
send(server, %Dynamo.ServerGetRequest {
key: key,
client: client,
replication: false,
seq: nil
})
server(state)
else
server(state)
end
end

{sender, {:failNode, failTime}} ->


state = %{state | status_of_nodes:
Map.put(state.status_of_nodes, whoami(), {"Failed",
Emulation.now()})}
state = %{state | inFailedState: true}
t = Emulation.timer(failTime, :recover)
server(state)

:recover ->
state = %{state | status_of_nodes:
Map.put(state.status_of_nodes, whoami(), {"Healthy",
Emulation.now()})}
state = %{state | inFailedState: false}
server(state)

:antientropy ->
if state.inFailedState == true do
server(state)
else
other_nodes = List.delete(state.nodes, whoami())
select_node = Enum.random(other_nodes)
send(select_node, {:merkle_request, state.kv_store})
timer = Emulation.timer(50, :antientropy)
server(state)
end

{sender, {:merkle_request, sender_kv_store}} ->


if state.inFailedState == true do

28/50
server(state)
else
makeHealthy(state, sender)
receiver_kv_store = state.kv_store
updated_kv_store = syncronisation(sender, whoami(),
sender_kv_store, receiver_kv_store)
server(%{state | kv_store: updated_kv_store})
end

:gossip ->
if state.inFailedState == true do
server(state)
else
other_nodes = List.delete(state.nodes, whoami())
select_node = Enum.random(other_nodes)
send(select_node, {:gossip_request,
state.status_of_nodes})
timer = Emulation.timer(50, :gossip)
server(state)
end

{sender, {:gossip_request, sender_nodes_status}} ->


if state.inFailedState == true do
server(state)
else
makeHealthy(state, sender)
receiver_nodes_status = state.status_of_nodes
updated_status_of_nodes = gossipExchange(sender,
whoami(), sender_nodes_status, receiver_nodes_status)
send(sender, {:gossip_response,
updated_status_of_nodes})
server(%{state | status_of_nodes:
updated_status_of_nodes})
end

{sender, {:gossip_response, sender_nodes_status}} ->


if state.inFailedState == true do
server(state)
else
makeHealthy(state, sender)
receiver_nodes_status = state.status_of_nodes
updated_status_of_nodes = gossipExchange(sender,

29/50
whoami(), sender_nodes_status, receiver_nodes_status)
server(%{state | status_of_nodes:
updated_status_of_nodes})
end

{sender, :state} ->


send(sender,{whoami(), state})
server(state)

end
end
end

#-------------------------------------------------------------------------------------
---------------------

defmodule Dynamo.Client do
import Emulation, only: [send: 2, whoami: 0]

import Kernel,
except: [spawn: 3, spawn: 1, spawn_link: 1, spawn_link: 3,
send: 2]

@moduledoc """
A client that can be used to connect and send
requests to the Dynamo servers.
"""
alias __MODULE__
@enforce_keys [:client_list]
defstruct(
client_list: nil,
client: nil,
# Used to store last get operations vector clock to send
back as context
global_vector_clock: nil
)

@doc """
Construct a new Dynamo Client Configuration with list of
clients.
"""
@spec new_client_configuration(list()) :: %Client{client_list:

30/50
list()}
def new_client_configuration(client_list) do
%Client{client_list: client_list,
global_vector_clock: %{}}
end

#
@doc """
Make a node as client
"""
@spec make_client(%Client{}) :: no_return()
def make_client(state) do
client(state)
end

@spec client(%Dynamo.Client{}) :: no_return()


def client(state) do
receive do

{sender,
%Dynamo.ClientPutRequest{
key: key,
value: value,
server_list: server_list
}} ->

server = Enum.random(server_list)
context =
if Map.get(state.global_vector_clock, key, nil) != nil do

Dynamo.VectorClock.clientReconcilation(Map.get(state.glob
al_vector_clock,key))
else
nil
end

send(server, %Dynamo.ServerPutRequest{
key: key,
value: value,
client: whoami(),
replication: false,
seq: nil,

31/50
context: context
})
client(%{state | client: sender})

{sender,
%Dynamo.ClientGetRequest{
key: key,
server_list: server_list
}} ->

server = Enum.random(server_list)
send(server, %Dynamo.ServerGetRequest {
key: key,
client: whoami(),
replication: false,
seq: nil
})
client(%{state | client: sender})

{_sender, {:ok, key}} ->


# send(state.client, {:put, :ok, key})
IO.inspect("----------------------------------------")
IO.inspect("Put done for client:#{whoami()} with key:#
{key} ")
IO.inspect("----------------------------------------")
client(state)

{_sender, {key, responses}} ->


IO.inspect("----------------------------------------")
IO.inspect("Get done for client:#{whoami()} with key:#
{key} and value:#{inspect(Enum.at(responses,0))} ")
IO.inspect("----------------------------------------")
clockList = Enum.map(responses, fn {_first, second,
_third, _fourth, _fifth} -> second end)
clientResponse = Enum.map(responses, fn {first,
second, _third, _fourth, _fifth} -> {first, second} end)
send(state.client, {:get, key, clientResponse})
state = %{state | global_vector_clock:
Map.put(state.global_vector_clock, key, clockList)}
client(state)
end
end

32/50
end

#-------------------------------------------------------------------------------------
---------------------

defmodule Dynamo.VectorClock do

def updateVectorClock(store, key, value, node, counter,


vnode) do
case Map.get(store, key) do
{_, vclock, _, _ , _} ->
if Map.has_key?(vclock, node) do
oldCounter = Map.get(vclock, node)
counter =
if counter > oldCounter do
counter
else
oldCounter
end
vclock = Map.put(vclock, node, counter)
Map.put(store, key, {value, vclock, {node, vnode},
vnode, Emulation.now()})
else
vclock = Map.put(vclock, node, counter)
Map.put(store, key, {value, vclock, {node, vnode},
vnode, Emulation.now()})
end
nil ->
vclock = %{node => counter}
Map.put(store, key, {value, vclock, {node, vnode},
vnode, Emulation.now()})
end
end

def equalTo(clock1, clock2) do


Map.equal?(clock1, clock2)
end

def notEqualTo(clock1, clock2) do


not equalTo(clock1, clock2)
end

33/50
def lessThan(clock1, clock2) do
if Enum.sort(Map.keys(clock1)) ==
Enum.sort(Map.keys(clock2)) do
merged_clock = Map.merge(clock1, clock2, fn _k, c1,
c2 -> c1 < c2 end)
compare_results = Map.values(merged_clock)
Enum.all?(compare_results, fn x -> x == true end)
else
false
end
end

def lessThanEqualTo(clock1, clock2) do


lessThan(clock1, clock2) or equalTo(clock1, clock2)
end

def greaterThan(clock1, clock2) do


lessThan(clock2, clock1)
end

def greaterThanEqualTo(clock1, clock2) do


greaterThan(clock1, clock2) or equalTo(clock1, clock2)
end

#-----------------------------------------------------------------------

def syntaticReconcilationMerger(currIndex, clock, result)


do
if currIndex == length(result) do
{result, false}
else
cond do
lessThanEqualTo(clock, Enum.at(result, currIndex)) ->
{result, true}

lessThan(Enum.at(result, currIndex), clock) ->


result = List.update_at(result, currIndex, fn _ -> clock
end)
{result, true}

true ->
syntaticReconcilationMerger(currIndex + 1, clock,

34/50
result)
end
end
end

defp syntaticReconcilationHelper(currIndex, clockList,


result) do
if currIndex == length(clockList) do
result
else
{result, succ} = syntaticReconcilationMerger(0,
Enum.at(clockList, currIndex), result)

result =
if succ do
result
else
result ++ [Enum.at(clockList, currIndex)]
end

syntaticReconcilationHelper(currIndex + 1, clockList,
result)
end
end

def syntaticReconcilation(clockList) do
syntaticReconcilationHelper(0, clockList, [])
end

#-----------------------------------------------------------------------------------
-----------

def syntaticReconcilationMergerWithValues(currIndex,
clock, result) do
if currIndex == length(result) do
{result, false}
else
cond do
lessThanEqualTo(elem(clock, 1), elem(Enum.at(result,
currIndex), 1)) ->
{result, true}

35/50
lessThan(elem(Enum.at(result, currIndex), 1),
elem(clock, 1)) ->
result = List.update_at(result, currIndex, fn _ -> clock
end)
{result, true}

true ->
syntaticReconcilationMergerWithValues(currIndex + 1,
clock, result)
end
end
end

defp syntaticReconcilationWithValuesHelper(currIndex,
divergedValues, result) do
if currIndex == length(divergedValues) do
result
else
{result, succ} =
syntaticReconcilationMergerWithValues(0,
Enum.at(divergedValues, currIndex), result)

result =
if succ do
result
else
result ++ [Enum.at(divergedValues, currIndex)]
end

syntaticReconcilationWithValuesHelper(currIndex + 1,
divergedValues, result)
end
end

def syntaticReconcilationWithValues(divergedValues) do
divergedValues = Enum.reject(divergedValues, &(&1 ==
[]))
syntaticReconcilationWithValuesHelper(0,
divergedValues, [])
end

#-----------------------------------------------------------------------------------

36/50
-------------------

#Client reconcilation
defp clientReconcilationHelper([], result) do
result
end

defp clientReconcilationHelper([clock | rest], result) do


reconciledClock = Map.merge(clock, result, fn _k, c1, c2 ->
max(c1, c2) end)
clientReconcilationHelper(rest, reconciledClock)
end

def clientReconcilation(divergedClocks) do
clientReconcilationHelper(divergedClocks, %{})
end

end

messages.ex
defmodule Dynamo.ClientPutRequest do
@moduledoc """
Put Request for Client
"""
alias __MODULE__
@enforce_keys [:key, :value, :server_list]
defstruct(
key: nil,
value: nil,
server_list: nil,
seq: nil,
context: nil
)

@doc """
Create a new Client Put Request.
"""
@spec new(non_neg_integer(), non_neg_integer(), list(),
non_neg_integer()) ::
%ClientPutRequest{
key: non_neg_integer(),
value: non_neg_integer(),

37/50
server_list: list()
}
def new(key, value, server_list, seq) do
%ClientPutRequest{
key: key,
value: value,
server_list: server_list,
seq: seq,
context: nil
}
end
end

defmodule Dynamo.ClientGetRequest do
@moduledoc """
Get Request for Client
"""
alias __MODULE__
@enforce_keys [:key, :server_list]
defstruct(
key: nil,
server_list: nil,
seq: nil,
context: nil
)

@doc """
Create a new Client Get Request.
"""
@spec new(non_neg_integer(), list(), non_neg_integer()) ::
%ClientGetRequest{
key: non_neg_integer(),
server_list: list()
}
def new(key, server_list, seq) do
%ClientGetRequest{
key: key,
server_list: server_list,
seq: seq,
context: nil
}
end

38/50
end

defmodule Dynamo.ServerPutRequest do
@moduledoc """
Put Request for Client
"""
alias __MODULE__
@enforce_keys [:key, :value, :client, :seq]
defstruct(
key: nil,
value: nil,
client: nil,
replication: nil,
seq: nil,
context: nil,
vnodeIndex: nil
)

@doc """
Create a new Client Put Request.
"""
@spec new(non_neg_integer(), non_neg_integer(), atom(),
non_neg_integer()) ::
%ServerPutRequest{
key: non_neg_integer(),
value: non_neg_integer(),
client: atom()
}
def new(key, value, client, seq) do
%ServerPutRequest{
key: key,
value: value,
client: client,
seq: seq,
replication: false,
context: nil,
vnodeIndex: nil
}
end
end

defmodule Dynamo.ServerGetRequest do

39/50
@moduledoc """
Get Request for Client
"""
alias __MODULE__
@enforce_keys [:key, :client, :seq]
defstruct(
key: nil,
client: nil,
replication: nil,
seq: nil,
context: nil
)

@doc """
Create a new Client Get Request.
"""
@spec new(non_neg_integer(), atom(), non_neg_integer())
::
%ServerGetRequest{
key: non_neg_integer(),
client: atom()
}
def new(key, client, seq) do
%ServerGetRequest{
key: key,
client: client,
replication: false,
seq: seq,
context: nil
}
end
end

defmodule Dynamo.ServerPutResponse do
@moduledoc """
Put Request for Client
"""
alias __MODULE__
@enforce_keys [:key, :value, :status, :client, :seq]
defstruct(
key: nil,
client: nil,

40/50
status: nil,
seq: nil,
context: nil,
vnodeIndex: nil,
value: nil
)

@doc """
Create a new Client Put Request.
"""
@spec new(non_neg_integer(), atom(), atom(),
non_neg_integer(), non_neg_integer()) ::
%ServerPutResponse{
key: non_neg_integer(),
status: atom(),
client: atom(),
seq: non_neg_integer()
}
def new(key, value, client, status, seq) do
%ServerPutResponse{
key: key,
client: client,
status: status,
value: value,
seq: seq,
context: nil,
vnodeIndex: nil
}
end
end

defmodule Dynamo.ServerGetResponse do
@moduledoc """
Put Request for Client
"""
alias __MODULE__
@enforce_keys [:value, :status, :client, :seq]
defstruct(
key: nil,
value: nil,
client: nil,
status: nil,

41/50
seq: nil,
context: nil
)

@doc """
Create a new Client Put Request.
"""
@spec new(any(), non_neg_integer(), atom(), atom(),
non_neg_integer()) ::
%ServerGetResponse{
key: any(),
value: non_neg_integer(),
status: atom(),
client: atom(),
seq: non_neg_integer()
}
def new(key, value, client, status, seq) do
%ServerGetResponse{
key: key,
value: value,
client: client,
status: status,
seq: seq,
context: nil
}
end
end

mix.exs :-
defmodule Raft.MixProject do
use Mix.Project

def project do
[
app: :dynamo,
version: "0.1.0",
build_path: "../../_build",
config_path: "../../config/config.exs",
deps_path: "../../deps",
lockfile: "../../mix.lock",
elixir: "~> 1.10",

42/50
start_permanent: Mix.env() == :prod,
deps: deps()
]
end

# Run "mix help compile.app" to learn about applications.


def application do
[
extra_applications: [:logger]
]
end

# Run "mix help deps" to learn about dependencies.


defp deps do
[
{:emulation, in_umbrella: true},
{:dialyxir, "~> 1.0", only: [:dev], runtime: false},
{:credo, "~> 1.4", only: [:dev, :test], runtime: false},
{:statistics, "~> 0.6.2"},
{:prioqueue, "~> 0.2.0"},
{:ex_hash_ring, "~> 6.0"}
]
end
end

Now create a detailed doc of a report for the project to be


done, given the code as well

Below is a detailed project report document that outlines the objectives, design,
implementation details, testing strategies, and future directions for your Dynamo-inspired
distributed key‐value store project in Elixir. The report also references the key parts of the
provided code.

Project Report: Implementing a


Dynamo-Inspired Distributed Key-Value
Store in Elixir

43/50
1. Abstract
This project implements a highly available, weakly consistent distributed key-value store
inspired by Amazon’s Dynamo. Built in Elixir, the system leverages the BEAM virtual machine’s
strengths—lightweight processes, fault tolerance, and robust concurrency—to manage data
partitioning, replication, versioning, and failure recovery. Core techniques include consistent
hashing (with virtual nodes), quorum-based read and write operations, vector clocks for
tracking causality, and mechanisms such as hinted handoff, gossip-based membership, and
anti-entropy synchronization.

2. Introduction
Distributed systems today must balance performance, availability, and consistency. Dynamo
was designed to guarantee an “always writeable” system even in the presence of failures.
This project re-implements key ideas from Dynamo in Elixir with the following goals:

High Availability: Allowing writes to succeed despite network partitions or node failures.

Eventual Consistency: Permitting temporary data divergence while ensuring eventual


convergence.

Fault Tolerance: Detecting and recovering from node failures using gossip protocols and
anti-entropy techniques.

Scalability: Using consistent hashing (via virtual nodes) to distribute load evenly and
scale out incrementally.

Elixir is chosen due to its native support for concurrency, fault isolation through lightweight
processes, and the OTP framework, all of which map naturally to the distributed system
challenges addressed by Dynamo.

3. System Architecture and Design


3.1 Overall Architecture
The system is composed of several interrelated components:

44/50
Server Nodes: Each server runs a process (or set of processes) that handles client
requests. They maintain a local key-value store, vector clocks for versioning, and manage
replication across a set of nodes.

Client Nodes: Clients send get and put requests. They aggregate responses and perform
local reconciliation based on vector clock information.

Consistent Hash Ring: The project uses the ExHashRing library to assign keys to nodes.
Virtual nodes (vnodes) help balance the load across heterogeneous servers.

Failure Detection & Recovery: Gossip protocols, periodic anti-entropy synchronization,


and hinted handoff techniques ensure that even when nodes fail or become partitioned,
the system eventually recovers and converges.

3.2 Data Partitioning and Replication


Consistent Hashing:
Keys are hashed and placed on a circular ring. Each node (and its multiple virtual nodes)
is assigned positions on the ring. This technique minimizes key movement when nodes
join or leave.

Preference Lists:
The get_preference_list/2 function in the Dynamo module computes which nodes
(based on their positions in the hash ring) are responsible for a given key. This list is
used to determine the nodes for replication.

Replication Factor & Quorums:


The configuration parameters (replication factor, read quorum, write quorum) dictate
how many nodes hold replicas of each key and how many responses are needed for
successful operations.

3.3 Versioning and Conflict Resolution


Vector Clocks:
The module Dynamo.VectorClock handles the creation, update, and comparison of
vector clocks. Every write updates a vector clock, and these clocks are later used to
detect and resolve conflicts. The reconciliation routines (both syntactic and client-side)
merge multiple concurrent updates.

3.4 Fault Tolerance Mechanisms

45/50
Gossip Protocol:
The server periodically exchanges status information with other nodes. Functions such
as gossipExchange/4 and related message handling in the Dynamo module ensure that
nodes are aware of each other’s health.

Hinted Handoff:
When a node is detected as failed, updates destined for it are temporarily stored at an
alternate node with a “hint” so they can be forwarded later.

Anti-Entropy Synchronization:
Using a simplified mechanism (inspired by Merkle trees), nodes periodically compare
portions of their key-value store (using timers like :antientropy ) and synchronize
discrepancies.

4. Implementation Details
4.1 Code Organization
Protocol Module ( protocol.ex ):
Contains the main logic for server operations, including:

Request Handling:
Receives and processes put and get requests from clients. It distinguishes between
coordinator actions (first replica) and replication steps.

Consistent Hash Ring Operations:


Uses functions like find_start_index/2 and circular_traversal/4 to determine
the preference list for a key.

Failure and Timeout Management:


Employs timers (using Emulation.timer ) to detect timeouts, cancel them upon
receiving responses, and trigger recovery actions.

Client Module ( Dynamo.Client ):


Implements the client logic:

Request Dispatch:
Clients randomly choose a server from a provided list to send get/put requests.

46/50
Vector Clock Reconciliation:
Upon receiving responses, the client updates its global vector clock to maintain
causality.

Vector Clock Module ( Dynamo.VectorClock ):


Provides functions to update vector clocks on each put, compare clocks (using less-than,
greater-than, and equality comparisons), and merge clocks for reconciliation.

Messages Module ( messages.ex ):


Defines the structures for all inter-process messages including:

Client and server put/get requests and responses.

Message structures ensure that all necessary fields (key, value, client, sequence
number, context, etc.) are included.

Mix Project File ( mix.exs ):


Configures the application, specifying dependencies such as emulation (for simulating
distributed behavior), ex_hash_ring (for consistent hashing), and others. It defines the
project structure and build configuration.

4.2 Process Flow Examples


Handling a Put Request:

1. A client sends a Dynamo.ClientPutRequest with a key, value, and list of servers.

2. The coordinator node (determined by the preference list) processes the request,
updates the key’s vector clock using Dynamo.VectorClock.updateVectorClock/5 ,
and stores the key-value pair.

3. The request is replicated to other nodes in the preference list. Timers are set to
detect timeouts.

4. Once the write quorum is met, a success response is sent back to the client.

Handling a Get Request:

1. A client sends a Dynamo.ClientGetRequest .

2. The coordinator gathers responses from the nodes in the preference list.

3. If multiple versions exist (due to concurrent updates), the system performs syntactic
reconciliation using vector clocks.

4. The reconciled value is returned to the client.

47/50
Failure and Recovery:
The code uses timers to simulate node failures. For example, receiving a {:failNode,
failTime} message transitions a node to a failed state. Recovery is scheduled via a
timer that triggers the :recover message.

5. Testing Strategy and Evaluation


5.1 Testing and Emulation
Simulated Environment:
Using the Emulation library, the project can simulate network delays, message loss, and
node failures. Timers (set with functions like Emulation.timer/2 ) allow you to inject
failures and test recovery.

Metrics Collection:
You can measure:

Latency: Average and high-percentile latencies for get/put operations.

Consistency: Frequency of stale reads or conflicting versions.

Availability: System behavior under simulated node failures and during recovery
phases.

5.2 Experimentation
Failure Injection:
Simulate various failure scenarios (e.g., node crash, network partition) and observe how
the gossip protocol and hinted handoff mechanisms restore consistency.

Load Distribution:
Evaluate the effectiveness of consistent hashing and virtual nodes by examining key
distribution and load balancing across the nodes.

Conflict Resolution Analysis:


Analyze how vector clocks are updated and reconciled when concurrent updates occur.
Use logs and client feedback to verify that no updates are lost and that eventual
consistency is achieved.

48/50
6. Conclusion and Future Work
6.1 Achievements
Robust Distributed Key-Value Store:
The project demonstrates a working implementation of a Dynamo-inspired system in
Elixir, with distributed operations, replication, and fault tolerance.

Leveraging Elixir/OTP:
By using lightweight processes and OTP supervision, the system achieves natural
concurrency and resilience.

Failure Recovery and Synchronization:


Techniques such as hinted handoff, anti-entropy synchronization, and gossip-based
failure detection are successfully implemented.

6.2 Future Enhancements


Optimized Anti-Entropy:
Incorporate more advanced anti-entropy techniques (e.g., full Merkle tree comparisons)
to reduce synchronization overhead.

Dynamic Reconfiguration:
Implement dynamic adjustment of replication factors and quorum settings based on
observed load and failure conditions.

Extended Conflict Resolution:


Provide more advanced application-level conflict resolution strategies or automated
merging functions.

Monitoring and Logging:


Enhance the system with detailed logging and metrics collection to enable real-time
monitoring of performance and consistency.

7. References
Dynamo: Amazon’s Highly Available Key-value Store – Provides the foundational
design principles and trade-offs.

49/50
ExHashRing Library Documentation – For consistent hashing and virtual node
management.

Elixir and OTP Documentation – For insights into building fault-tolerant distributed
systems in Elixir.

This report offers a comprehensive overview of your project’s goals, design rationale,
implementation specifics, and testing strategies. It ties together the provided code structure
and highlights how each module contributes to the overall system, ensuring a robust,
scalable, and highly available distributed key-value store.

it hasnt been done yet, So the doc should be more like a


proposal doc please in detail

Here's a comprehensive and detailed project proposal document for your Dynamo-inspired
distributed key-value store implementation in Elixir.

Dynamo Proposal Doc

Here's your detailed project proposal document. Let me know if there's anything else you'd
like refined or expanded!

50/50

You might also like