Elixir Dynamo
Elixir Dynamo
final-project-suggestions.pdf
PDF
amazon-dynamo-sosp2007.pdf
PDF
When implementing a distributed system like Dynamo, Elixir tends to be a more natural fit
than C++ for several reasons:
Project Focus:
For a project like Dynamo, the primary challenges are often related to correctly modeling
distributed behaviors (such as gossip protocols, anti-entropy, and vector clocks) rather
than raw performance. Elixir’s ecosystem is well-suited for these kinds of problems.
In summary, unless your project has very specific performance requirements or your team
has deep expertise in C++’s concurrency models, Elixir is generally the better choice for a
Dynamo-like distributed system project. This recommendation is supported by the project
guidelines, which encourage the use of Elixir for its strengths in distributed computing.
1/50
Can you explain dynamo paper to me in great detail
The Dynamo paper presents a highly available key-value storage system that Amazon
developed to support internal services with very strict availability and performance
requirements. Here’s a detailed explanation of the paper’s main components and design
ideas:
Eventual Consistency:
Instead of enforcing strict consistency, Dynamo allows replicas to become temporarily
inconsistent with the guarantee that, eventually, all updates will propagate. This trade-
off is essential in environments where downtime or rejected operations are
unacceptable.
2/50
Virtual Nodes:
To improve load distribution and accommodate heterogeneous node capacities, each
physical node is assigned multiple tokens (or virtual nodes). This technique helps
balance the key-space more uniformly across the system.
Preference Lists:
For every key, Dynamo defines a preference list—a set of nodes (usually N nodes)
responsible for storing that key. This replication strategy supports high durability and
availability by spreading the key’s data across multiple nodes.
Conflict Resolution:
Dynamo delegates the responsibility for resolving conflicts to the application. This can
be done using simple strategies like “last write wins” or more complex, application-
specific merge logic. The design ensures that no update is lost, even if the client must
later reconcile multiple versions.
3/50
4. Failure Handling and Recovery Techniques
Gossip-based Membership and Failure Detection:
Nodes use a decentralized, gossip-based protocol to exchange information about the
status of other nodes. This allows the system to quickly detect node failures or
recoveries without a centralized monitor.
Hinted Handoff:
In the event that a node is temporarily unavailable during a write, the coordinator will
send the update to an alternate node along with a “hint” indicating the intended
destination. Once the failed node recovers, these hints are used to repair and
synchronize the data.
Tuning Parameters:
The flexibility of configuring N, R, and W allows operators to balance the trade-offs
between availability, consistency, and durability based on the specific needs of each
application.
4/50
6. Summary of Key Innovations
Decentralized Architecture:
There is no single master; every node plays an equivalent role in processing requests,
which simplifies scaling and fault tolerance.
In essence, the Dynamo paper provides a blueprint for designing a distributed storage
system that remains operational under adverse conditions by carefully balancing the trade-
offs between availability, consistency, and performance. This approach has not only
influenced the design of other modern distributed databases but has also demonstrated
how eventual consistency models can be effectively applied in large-scale, real-world
systems.
If you’d like to dive even deeper into any specific aspect—such as the inner workings of
vector clocks, the detailed performance measurements, or the implementation of anti-
entropy protocols—feel free to ask!
A network error occurred. Please check your connection and try again. If this issue persists
please contact us through our help center at help.openai.com.
Below is an overview for a project where you implement a weakly consistent key-value store
inspired by Dynamo—in Elixir. This project plan outlines both the system’s design and your
evaluation strategy.
5/50
Project Title
Implementing a Dynamo-Inspired Distributed Key-Value Store in Elixir
Motivation:
Modern applications (e.g., shopping carts) require that writes are never rejected, even
during network or node failures. Inspired by Dynamo’s design, the system will allow
temporary inconsistencies with the guarantee that updates eventually propagate to all
replicas.
Why Elixir:
Elixir’s lightweight processes, OTP framework, and built-in support for distributed
computing make it an excellent choice for modeling isolated processes and handling
fault tolerance easily.
6/50
B. Consistency and Versioning
Vector Clocks:
Each key-value pair carries a vector clock to capture causal relationships between
updates. This allows your system to detect concurrent updates and manage conflicts.
Conflict Resolution:
The application (or a reconciliation process) will merge divergent versions. You can
experiment with strategies ranging from “last write wins” to custom merge logic.
Hinted Handoff:
When a node is temporarily unreachable, another node will store the update along with
a “hint” so that the original node can be updated once it recovers.
Anti-Entropy:
Use background synchronization (for example, via Merkle trees or an equivalent
mechanism) to detect and reconcile differences among replicas.
3. Implementation Roadmap
Phase 1: Core Key-Value Operations
Develop the basic get and put operations.
Return all conflicting versions upon a read, letting the application decide on
reconciliation.
7/50
Phase 3: Distributed Process Management
Leverage OTP to manage node processes and supervisors.
Metrics Collection:
Latency Measurements: Average and high-percentile latencies for read and write
operations.
Consistency Metrics: Track how quickly and effectively the system converges after
failures.
Hypothesis:
Under increased network delays or failure rates, you expect a higher occurrence of stale
reads; however, the system should maintain high availability and eventually reconcile
inconsistent states.
Emulation Environment:
Utilize the provided emulation setup (or create your own test harness) to simulate real-
world conditions.
8/50
Data Analysis:
Log operations and use Elixir’s built-in tools to analyze metrics. Compare the observed
behavior against your hypothesis.
5. Conclusion
This project will demonstrate:
By following this roadmap, you’ll not only implement a functional distributed key-value store
but also gain practical insights into the complexities of designing systems that balance
consistency, availability, and performance.
Feel free to ask if you need further clarification on any component of this project plan!
Protocol.ex :-
defmodule Dynamo do
# Importing modules
import Emulation, only: [send: 2, whoami: 0]
import Kernel,
except: [spawn: 3, spawn: 1, spawn_link: 1, spawn_link: 3,
send: 2]
# import Logger
alias ExHashRing.Ring
alias ExHashRing.Node
alias ExHashRing.Hash
alias __MODULE__
9/50
# Requiring modules
require Fuzzers
10/50
)
#
@doc """
Create a new configuration
"""
@spec new_configuration(list(), non_neg_integer(), list(),
non_neg_integer(), non_neg_integer(), non_neg_integer()) ::
%Dynamo{
nodes: list(),
clients: list(),
read_quorum: non_neg_integer(),
write_quorum: non_neg_integer(),
replication_factor: non_neg_integer()
}
def new_configuration(nodes, vnodes, clients,
replication_factor, read_quorum, write_quorum) do
{:ok, ring} = Ring.start_link()
%Dynamo{
nodes: nodes,
vnodes: vnodes,
clients: clients,
replication_factor: replication_factor,
read_quorum: read_quorum,
write_quorum: write_quorum,
ring: ring,
kv_store: %{},
seq: -1,
response_count: %{},
responses: %{},
vnodeToKeys: %{},
status_of_nodes: Enum.into(nodes, %{}, fn node ->
{node, {"Healthy", 0}} end),
requestTimerMap: %{},
clientRequestTimerMap: %{},
inFailedState: false
}
end
#
@doc """
Make a node as server
11/50
"""
@spec make_server(%Dynamo{}) :: no_return()
def make_server(state) do
Ring.add_node(state.ring, whoami(), state.vnodes)
timer = Emulation.timer(25, :antientropy)
timer = Emulation.timer(50, :gossip)
state = %{state | status_of_nodes:
Map.put(state.status_of_nodes, whoami(), {"Healthy",
Emulation.now()})}
server(%{state | node: whoami()})
end
12/50
case MapSet.member?(distinct_items, element) do
true ->
next_index = rem(index + 1, length(list))
traverse_circular(list, next_index, count,
unhealthy_nodes, distinct_items, preference_list,
distinct_skipped_items, skipped_items_list)
false ->
case MapSet.member?(unhealthy_nodes, element) do
true ->
case MapSet.member?(distinct_skipped_items,
element) do
true ->
next_index = rem(index + 1, length(list))
traverse_circular(list, next_index, count,
unhealthy_nodes, distinct_items, preference_list,
distinct_skipped_items, skipped_items_list)
false ->
skipped_items_list = skipped_items_list ++ [index]
next_index = rem(index + 1, length(list))
distinct_items = MapSet.put(distinct_items,
element)
traverse_circular(list, next_index, count,
unhealthy_nodes, distinct_items, preference_list,
distinct_skipped_items, skipped_items_list)
end
false ->
preference_list = preference_list ++ [index]
next_index = rem(index + 1, length(list))
distinct_items = MapSet.put(distinct_items, element)
traverse_circular(list, next_index, count - 1,
unhealthy_nodes, distinct_items, preference_list,
distinct_skipped_items, skipped_items_list)
end
end
end
13/50
{:ok, nodes} = Ring.get_nodes_with_replicas(ring)
{hashList, nodeList} = Enum.unzip(Node.expand(nodes))
start_index = find_start_index(hashList, Hash.of(key))
unhealthy_nodes = state.status_of_nodes
|> Enum.filter(fn {_node, {status, _timestamp}}
-> status == "Failed" end)
|> Enum.map(fn {node, _} -> node end)
unhealthy_nodes = MapSet.new(unhealthy_nodes)
if length(state.nodes) - MapSet.size(unhealthy_nodes) <
state.replication_factor do
raise("No sufficient healthy nodes")
end
{pref_list_indices, skip_list_indices} =
circular_traversal(nodeList, start_index,
state.replication_factor, unhealthy_nodes)
pref_list = Enum.map(pref_list_indices, fn index ->
Enum.at(nodeList, index) end)
skip_list = Enum.map(skip_list_indices, fn index ->
Enum.at(nodeList, index) end)
{pref_list, pref_list_indices, skip_list, skip_list_indices}
end
14/50
end)
|> Enum.reject(&is_nil/1)
Enum.reduce(sender_receiver_vnode_pairs,
receiver_kv_store, fn {sender_vnode, receiver_vnode},
acc_kv_store ->
sender_vnode_kv_store = sender_kv_store
|> Enum.filter(fn {_, {_, _, {_, vnode}, _, _}} -> vnode ==
sender_vnode end)
Enum.reduce(sender_vnode_kv_store, acc_kv_store, fn
{key, sender_key_data}, acc_inner_kv_store ->
case Map.get(acc_inner_kv_store, key) do
nil ->
{v, sender_vector_clock, {s, sv}, _, sender_clock} =
sender_key_data
updated_receiver_data = {v, sender_vector_clock, {s,
sv}, receiver_vnode, sender_clock}
acc_inner_kv_store = Map.put(acc_inner_kv_store,
key, updated_receiver_data)
receiver_key_data ->
{v, sender_vector_clock, {s, sv}, _, sender_clock} =
sender_key_data
{_, receiver_vector_clock, {_, _}, _, receiver_clock} =
receiver_key_data
if
Dynamo.VectorClock.lessThanEqualTo(sender_vector_clock,
receiver_vector_clock) do
# Receiver is more or equally updated so do
nothing
acc_inner_kv_store
else
if
Dynamo.VectorClock.greaterThan(sender_vector_clock,
receiver_vector_clock) do
# Receiver can be less updated.
updated_receiver_data = {v, sender_vector_clock,
{s, sv}, receiver_vnode, sender_clock}
acc_inner_kv_store = Map.put(acc_inner_kv_store,
key, updated_receiver_data)
else
15/50
# Reciver and Sender are unrelated. Compare
clocks
if sender_clock > receiver_clock do
updated_receiver_data = {v, sender_vector_clock,
{s, sv}, receiver_vnode, sender_clock}
acc_inner_kv_store = Map.put(acc_inner_kv_store,
key, updated_receiver_data)
else
# Do nothing
acc_inner_kv_store
end
end
end
end
end)
end)
end
16/50
Map.put(state.status_of_nodes, sender, {"Healthy",
Emulation.now()})}
end
def server(state) do
receive do
{sender,
%Dynamo.ServerPutRequest{
key: key,
value: value,
client: client,
replication: replication,
seq: seq,
context: context,
vnodeIndex: vindex
}} ->
17/50
if context != nil do
state = %{state | kv_store:
Map.put(state.kv_store,key,{value, context,{whoami(),
start_index}, start_index, Emulation.now()})}
end
state = %{state | kv_store:
Dynamo.VectorClock.updateVectorClock(state.kv_store, key,
value, whoami(), state.seq + 1, start_index)}
state = %{state | vnodeToKeys:
Map.put(state.vnodeToKeys, start_index,
Enum.uniq(Enum.concat(Map.get(state.vnodeToKeys,
start_index,[]), [key])))}
t = Emulation.timer(500,
{:clientPutRequestTimeOut, whoami(), client, key, value,
state.seq+1})
state = %{state | clientRequestTimerMap:
Map.put(state.clientRequestTimerMap, {whoami(), client,
key, value, state.seq+1}, t)}
IO.puts("Put done at for key:#{key} and value:#
{value} at seq:#{state.seq + 1} at #{whoami()}")
state =
if state.write_quorum == 1 do
requestTimer =
Map.get(state.clientRequestTimerMap, {whoami(), client,
key, value, state.seq + 1}, nil)
Emulation.cancel_timer(requestTimer)
state = %{state | clientRequestTimerMap:
Map.delete(state.clientRequestTimerMap, {whoami(), client,
key, value, state.seq + 1})}
send(client, {:ok, key})
state
else
state
end
preference_list_tuple = Enum.zip(preference_list,
preference_list_indices)
new_state = Enum.reduce(preference_list_tuple,
state, fn {node, vnode_index}, acc_state ->
if node != whoami() do
t = Emulation.timer(25,
{:requestTimeout, whoami(), node, key, state.seq + 1})
new_state = %{acc_state |
18/50
requestTimerMap:
Map.put(acc_state.requestTimerMap, {whoami(), node, key,
state.seq + 1}, t)
}
send(node,
%Dynamo.ServerPutRequest{
key: key,
value: value,
client: client,
replication: true,
seq: state.seq + 1,
context:
Map.get(new_state.kv_store, key),
vnodeIndex: vnode_index
})
new_state
else
acc_state
end
end)
19/50
IO.puts("Put replicated at for key:#{key} and value:#
{value} at seq:#{seq} at #{whoami()}")
new_context = case context do
{a, b, c, _, e} -> {a, b, c, vindex, e}
_ -> context end
state = %{state | kv_store: Map.put(state.kv_store,
key, new_context)}
state = %{state | vnodeToKeys:
Map.put(state.vnodeToKeys, vindex,
Enum.uniq(Enum.concat(Map.get(state.vnodeToKeys,
vindex,[]), [key])))}
send(sender, %Dynamo.ServerPutResponse{
key: key,
value: value,
status: :ok,
client: client,
seq: seq
})
server(%{state | seq: seq})
end
end
{sender,
%Dynamo.ServerGetRequest{
key: key,
client: client,
replication: replication,
seq: seq,
context: context
}} ->
if state.inFailedState == true do
if not replication do
new_server = Enum.random(state.nodes)
send(new_server, %Dynamo.ServerGetRequest{
key: key,
client: client,
replication: replication,
seq: seq,
context: context
})
end
20/50
server(state)
else
if not replication do
IO.puts("Get request received at #{whoami()}")
# IO.inspect("Getting preference list")
{preference_list, preference_list_indices, skip_list,
skip_list_indices} = get_preference_list(state,key)
# IO.inspect("Done preference list")
# if(Enum.at(preference_list,0) == whoami()) do
if Enum.member?(preference_list, whoami()) do
IO.inspect(preference_list)
IO.inspect(skip_list)
value =
if Map.get(state.kv_store,key,nil) == nil do
[]
else
Map.get(state.kv_store,key,nil)
end
state = %{state | responses:
Map.put(state.responses, state.seq + 1, [value])}
t = Emulation.timer(500,
{:clientGetRequestTimeOut, whoami(), client, key,
state.seq+1})
state = %{state | clientRequestTimerMap:
Map.put(state.clientRequestTimerMap, {whoami(), client,
key, state.seq+1}, t)}
IO.puts("Got into co-ordinator and started
broadcast")
# IO.inspect(state.clientRequestTimerMap)
state =
if state.read_quorum == 1 do
requestTimer =
Map.get(state.clientRequestTimerMap, {whoami(), client,
key, state.seq + 1}, nil)
Emulation.cancel_timer(requestTimer)
state = %{state | clientRequestTimerMap:
Map.delete(state.clientRequestTimerMap, {whoami(), client,
key, state.seq + 1})}
send(client, {key, Map.get(state.responses,
state.seq + 1)})
state
else
21/50
state
end
new_state = Enum.reduce(preference_list, state, fn
node, acc_state ->
if node != whoami() do
t = Emulation.timer(25,
{:requestTimeout, whoami(), node, key, state.seq + 1})
new_state = %{acc_state |
requestTimerMap:
Map.put(acc_state.requestTimerMap, {whoami(), node, key,
state.seq + 1}, t)
}
send(node,
%Dynamo.ServerGetRequest{
key: key,
client: client,
replication: true,
seq: state.seq + 1
})
new_state
else
acc_state
end
end)
IO.inspect(new_state.clientRequestTimerMap)
server(%{ new_state | seq: state.seq + 1})
else
send(Enum.at(preference_list,0),
%Dynamo.ServerGetRequest{
key: key,
client: client,
replication: false,
seq: nil
})
server(state)
end
else
IO.puts("Get replicated at for key:#{key} at seq:#
{seq} at #{whoami()}")
value = if Map.get(state.kv_store,key,nil) == nil do
22/50
[]
else
Map.get(state.kv_store,key,nil)
end
send(sender, %Dynamo.ServerGetResponse{
key: key,
value: value,
status: :ok,
client: client,
seq: seq
})
server(%{ state | seq: seq})
end
end
{sender,
%Dynamo.ServerPutResponse{
key: key,
status: status,
client: client,
value: value,
seq: seq,
context: context
}} ->
if state.inFailedState == true do
server(state)
else
IO.puts("Got into server put response at #{whoami()}
with seq: #{seq}")
makeHealthy(state, sender)
if status == :ok and Map.get(state.requestTimerMap,
{whoami(), sender, key, seq}, nil) != nil do
requestTimer = Map.get(state.requestTimerMap,
{whoami(), sender, key, seq}, nil)
Emulation.cancel_timer(requestTimer)
state = %{state | requestTimerMap:
Map.delete(state.requestTimerMap, {whoami(), sender, key,
seq})}
state = if Map.get(state.response_count, seq, nil) ==
nil do
%{state | response_count:
23/50
Map.put(state.response_count, seq, 1)}
else
count = Map.get(state.response_count, seq)
%{state | response_count:
Map.put(state.response_count, seq, count + 1)}
end
if Map.get(state.response_count, seq, nil) ==
state.write_quorum - 1 do
requestTimer =
Map.get(state.clientRequestTimerMap, {whoami(), client,
key, value, seq}, nil)
IO.inspect("---------------------------------------------------------
-")
IO.puts("Cancelling timer at #{whoami()} for #
{inspect(requestTimer)}")
IO.inspect("---------------------------------------------------------
-")
Emulation.cancel_timer(Map.get(state.clientRequestTimerM
ap, {whoami(), client, key, value, seq}, nil))
state = %{state | clientRequestTimerMap:
Map.delete(state.clientRequestTimerMap, {whoami(), client,
key, value, seq})}
IO.inspect(state.clientRequestTimerMap)
send(client, {:ok, key})
server(state)
end
server(state)
end
server(state)
end
{sender,
%Dynamo.ServerGetResponse{
key: key,
value: value,
status: status,
client: client,
seq: seq,
context: context
}} ->
24/50
if state.inFailedState == true do
server(state)
else
IO.puts("Got into server get response at #{whoami()}
with seq: #{seq}")
makeHealthy(state, sender)
if status == :ok and Map.get(state.requestTimerMap,
{whoami(), sender, key, seq}, nil) != nil do
requestTimer = Map.get(state.requestTimerMap,
{whoami(), sender, key, seq}, nil)
Emulation.cancel_timer(requestTimer)
state = %{state | requestTimerMap:
Map.delete(state.requestTimerMap, {whoami(), key, seq})}
state = if Map.get(state.response_count, seq, nil) ==
nil do
%{state | response_count:
Map.put(state.response_count, seq, 1)}
else
count = Map.get(state.response_count, seq)
%{state | response_count:
Map.put(state.response_count, seq, count + 1)}
end
responses =
if value == [] do
Map.get(state.responses, seq, [])
else
Map.get(state.responses, seq, []) ++ [value]
end
state = %{state | responses:
Map.put(state.responses, seq, responses)}
state =
if Map.get(state.response_count, seq, 0) ==
state.read_quorum - 1 do
requestTimer =
Map.get(state.clientRequestTimerMap, {whoami(), client,
key, seq}, nil)
IO.inspect("-------------------------------------------------------
---")
IO.puts("Cancelling timer at #{whoami()} for #
{inspect(requestTimer)}")
25/50
IO.inspect("-------------------------------------------------------
---")
Emulation.cancel_timer(requestTimer)
state = %{state | clientRequestTimerMap:
Map.delete(state.clientRequestTimerMap, {whoami(), client,
key, seq})}
reconciledResponses =
Dynamo.VectorClock.syntaticReconcilationWithValues(Map.
get(state.responses, seq))
state = %{state | responses:
Map.put(state.responses, seq, reconciledResponses)}
send(client, {key, Map.get(state.responses, seq)})
state
else
state
end
server(state)
end
server(state)
end
26/50
Map.delete(state.clientRequestTimerMap, {node, client, key,
value, seq})}
server = Enum.random(state.nodes)
if state.inFailedState == true do
server(state)
else
send(server, %Dynamo.ServerPutRequest{
key: key,
value: value,
client: client,
replication: false,
seq: nil,
context: nil
})
server(state)
end
if state.inFailedState == true do
IO.inspect("Client Request timed out for Get at #
{node} with #{client} with key : #{key}")
IO.inspect(Map.get(state.clientRequestTimerMap,
{node, client, key, seq},nil))
IO.inspect(state.clientRequestTimerMap)
state = %{state | clientRequestTimerMap:
Map.delete(state.clientRequestTimerMap, {node, client, key,
seq})}
server = Enum.random(state.nodes)
server(state)
else
IO.inspect("Client Request timed out for Get at #
{node} with #{client} with key : #{key}")
IO.inspect(Map.get(state.clientRequestTimerMap,
{node, client, key, seq},nil))
IO.inspect(state.clientRequestTimerMap)
if Map.get(state.clientRequestTimerMap, {node, client,
key, seq},nil) != nil do
state = %{state | clientRequestTimerMap:
Map.delete(state.clientRequestTimerMap, {node, client, key,
seq})}
server = Enum.random(state.nodes)
27/50
send(server, %Dynamo.ServerGetRequest {
key: key,
client: client,
replication: false,
seq: nil
})
server(state)
else
server(state)
end
end
:recover ->
state = %{state | status_of_nodes:
Map.put(state.status_of_nodes, whoami(), {"Healthy",
Emulation.now()})}
state = %{state | inFailedState: false}
server(state)
:antientropy ->
if state.inFailedState == true do
server(state)
else
other_nodes = List.delete(state.nodes, whoami())
select_node = Enum.random(other_nodes)
send(select_node, {:merkle_request, state.kv_store})
timer = Emulation.timer(50, :antientropy)
server(state)
end
28/50
server(state)
else
makeHealthy(state, sender)
receiver_kv_store = state.kv_store
updated_kv_store = syncronisation(sender, whoami(),
sender_kv_store, receiver_kv_store)
server(%{state | kv_store: updated_kv_store})
end
:gossip ->
if state.inFailedState == true do
server(state)
else
other_nodes = List.delete(state.nodes, whoami())
select_node = Enum.random(other_nodes)
send(select_node, {:gossip_request,
state.status_of_nodes})
timer = Emulation.timer(50, :gossip)
server(state)
end
29/50
whoami(), sender_nodes_status, receiver_nodes_status)
server(%{state | status_of_nodes:
updated_status_of_nodes})
end
end
end
end
#-------------------------------------------------------------------------------------
---------------------
defmodule Dynamo.Client do
import Emulation, only: [send: 2, whoami: 0]
import Kernel,
except: [spawn: 3, spawn: 1, spawn_link: 1, spawn_link: 3,
send: 2]
@moduledoc """
A client that can be used to connect and send
requests to the Dynamo servers.
"""
alias __MODULE__
@enforce_keys [:client_list]
defstruct(
client_list: nil,
client: nil,
# Used to store last get operations vector clock to send
back as context
global_vector_clock: nil
)
@doc """
Construct a new Dynamo Client Configuration with list of
clients.
"""
@spec new_client_configuration(list()) :: %Client{client_list:
30/50
list()}
def new_client_configuration(client_list) do
%Client{client_list: client_list,
global_vector_clock: %{}}
end
#
@doc """
Make a node as client
"""
@spec make_client(%Client{}) :: no_return()
def make_client(state) do
client(state)
end
{sender,
%Dynamo.ClientPutRequest{
key: key,
value: value,
server_list: server_list
}} ->
server = Enum.random(server_list)
context =
if Map.get(state.global_vector_clock, key, nil) != nil do
Dynamo.VectorClock.clientReconcilation(Map.get(state.glob
al_vector_clock,key))
else
nil
end
send(server, %Dynamo.ServerPutRequest{
key: key,
value: value,
client: whoami(),
replication: false,
seq: nil,
31/50
context: context
})
client(%{state | client: sender})
{sender,
%Dynamo.ClientGetRequest{
key: key,
server_list: server_list
}} ->
server = Enum.random(server_list)
send(server, %Dynamo.ServerGetRequest {
key: key,
client: whoami(),
replication: false,
seq: nil
})
client(%{state | client: sender})
32/50
end
#-------------------------------------------------------------------------------------
---------------------
defmodule Dynamo.VectorClock do
33/50
def lessThan(clock1, clock2) do
if Enum.sort(Map.keys(clock1)) ==
Enum.sort(Map.keys(clock2)) do
merged_clock = Map.merge(clock1, clock2, fn _k, c1,
c2 -> c1 < c2 end)
compare_results = Map.values(merged_clock)
Enum.all?(compare_results, fn x -> x == true end)
else
false
end
end
#-----------------------------------------------------------------------
true ->
syntaticReconcilationMerger(currIndex + 1, clock,
34/50
result)
end
end
end
result =
if succ do
result
else
result ++ [Enum.at(clockList, currIndex)]
end
syntaticReconcilationHelper(currIndex + 1, clockList,
result)
end
end
def syntaticReconcilation(clockList) do
syntaticReconcilationHelper(0, clockList, [])
end
#-----------------------------------------------------------------------------------
-----------
def syntaticReconcilationMergerWithValues(currIndex,
clock, result) do
if currIndex == length(result) do
{result, false}
else
cond do
lessThanEqualTo(elem(clock, 1), elem(Enum.at(result,
currIndex), 1)) ->
{result, true}
35/50
lessThan(elem(Enum.at(result, currIndex), 1),
elem(clock, 1)) ->
result = List.update_at(result, currIndex, fn _ -> clock
end)
{result, true}
true ->
syntaticReconcilationMergerWithValues(currIndex + 1,
clock, result)
end
end
end
defp syntaticReconcilationWithValuesHelper(currIndex,
divergedValues, result) do
if currIndex == length(divergedValues) do
result
else
{result, succ} =
syntaticReconcilationMergerWithValues(0,
Enum.at(divergedValues, currIndex), result)
result =
if succ do
result
else
result ++ [Enum.at(divergedValues, currIndex)]
end
syntaticReconcilationWithValuesHelper(currIndex + 1,
divergedValues, result)
end
end
def syntaticReconcilationWithValues(divergedValues) do
divergedValues = Enum.reject(divergedValues, &(&1 ==
[]))
syntaticReconcilationWithValuesHelper(0,
divergedValues, [])
end
#-----------------------------------------------------------------------------------
36/50
-------------------
#Client reconcilation
defp clientReconcilationHelper([], result) do
result
end
def clientReconcilation(divergedClocks) do
clientReconcilationHelper(divergedClocks, %{})
end
end
messages.ex
defmodule Dynamo.ClientPutRequest do
@moduledoc """
Put Request for Client
"""
alias __MODULE__
@enforce_keys [:key, :value, :server_list]
defstruct(
key: nil,
value: nil,
server_list: nil,
seq: nil,
context: nil
)
@doc """
Create a new Client Put Request.
"""
@spec new(non_neg_integer(), non_neg_integer(), list(),
non_neg_integer()) ::
%ClientPutRequest{
key: non_neg_integer(),
value: non_neg_integer(),
37/50
server_list: list()
}
def new(key, value, server_list, seq) do
%ClientPutRequest{
key: key,
value: value,
server_list: server_list,
seq: seq,
context: nil
}
end
end
defmodule Dynamo.ClientGetRequest do
@moduledoc """
Get Request for Client
"""
alias __MODULE__
@enforce_keys [:key, :server_list]
defstruct(
key: nil,
server_list: nil,
seq: nil,
context: nil
)
@doc """
Create a new Client Get Request.
"""
@spec new(non_neg_integer(), list(), non_neg_integer()) ::
%ClientGetRequest{
key: non_neg_integer(),
server_list: list()
}
def new(key, server_list, seq) do
%ClientGetRequest{
key: key,
server_list: server_list,
seq: seq,
context: nil
}
end
38/50
end
defmodule Dynamo.ServerPutRequest do
@moduledoc """
Put Request for Client
"""
alias __MODULE__
@enforce_keys [:key, :value, :client, :seq]
defstruct(
key: nil,
value: nil,
client: nil,
replication: nil,
seq: nil,
context: nil,
vnodeIndex: nil
)
@doc """
Create a new Client Put Request.
"""
@spec new(non_neg_integer(), non_neg_integer(), atom(),
non_neg_integer()) ::
%ServerPutRequest{
key: non_neg_integer(),
value: non_neg_integer(),
client: atom()
}
def new(key, value, client, seq) do
%ServerPutRequest{
key: key,
value: value,
client: client,
seq: seq,
replication: false,
context: nil,
vnodeIndex: nil
}
end
end
defmodule Dynamo.ServerGetRequest do
39/50
@moduledoc """
Get Request for Client
"""
alias __MODULE__
@enforce_keys [:key, :client, :seq]
defstruct(
key: nil,
client: nil,
replication: nil,
seq: nil,
context: nil
)
@doc """
Create a new Client Get Request.
"""
@spec new(non_neg_integer(), atom(), non_neg_integer())
::
%ServerGetRequest{
key: non_neg_integer(),
client: atom()
}
def new(key, client, seq) do
%ServerGetRequest{
key: key,
client: client,
replication: false,
seq: seq,
context: nil
}
end
end
defmodule Dynamo.ServerPutResponse do
@moduledoc """
Put Request for Client
"""
alias __MODULE__
@enforce_keys [:key, :value, :status, :client, :seq]
defstruct(
key: nil,
client: nil,
40/50
status: nil,
seq: nil,
context: nil,
vnodeIndex: nil,
value: nil
)
@doc """
Create a new Client Put Request.
"""
@spec new(non_neg_integer(), atom(), atom(),
non_neg_integer(), non_neg_integer()) ::
%ServerPutResponse{
key: non_neg_integer(),
status: atom(),
client: atom(),
seq: non_neg_integer()
}
def new(key, value, client, status, seq) do
%ServerPutResponse{
key: key,
client: client,
status: status,
value: value,
seq: seq,
context: nil,
vnodeIndex: nil
}
end
end
defmodule Dynamo.ServerGetResponse do
@moduledoc """
Put Request for Client
"""
alias __MODULE__
@enforce_keys [:value, :status, :client, :seq]
defstruct(
key: nil,
value: nil,
client: nil,
status: nil,
41/50
seq: nil,
context: nil
)
@doc """
Create a new Client Put Request.
"""
@spec new(any(), non_neg_integer(), atom(), atom(),
non_neg_integer()) ::
%ServerGetResponse{
key: any(),
value: non_neg_integer(),
status: atom(),
client: atom(),
seq: non_neg_integer()
}
def new(key, value, client, status, seq) do
%ServerGetResponse{
key: key,
value: value,
client: client,
status: status,
seq: seq,
context: nil
}
end
end
mix.exs :-
defmodule Raft.MixProject do
use Mix.Project
def project do
[
app: :dynamo,
version: "0.1.0",
build_path: "../../_build",
config_path: "../../config/config.exs",
deps_path: "../../deps",
lockfile: "../../mix.lock",
elixir: "~> 1.10",
42/50
start_permanent: Mix.env() == :prod,
deps: deps()
]
end
Below is a detailed project report document that outlines the objectives, design,
implementation details, testing strategies, and future directions for your Dynamo-inspired
distributed key‐value store project in Elixir. The report also references the key parts of the
provided code.
43/50
1. Abstract
This project implements a highly available, weakly consistent distributed key-value store
inspired by Amazon’s Dynamo. Built in Elixir, the system leverages the BEAM virtual machine’s
strengths—lightweight processes, fault tolerance, and robust concurrency—to manage data
partitioning, replication, versioning, and failure recovery. Core techniques include consistent
hashing (with virtual nodes), quorum-based read and write operations, vector clocks for
tracking causality, and mechanisms such as hinted handoff, gossip-based membership, and
anti-entropy synchronization.
2. Introduction
Distributed systems today must balance performance, availability, and consistency. Dynamo
was designed to guarantee an “always writeable” system even in the presence of failures.
This project re-implements key ideas from Dynamo in Elixir with the following goals:
High Availability: Allowing writes to succeed despite network partitions or node failures.
Fault Tolerance: Detecting and recovering from node failures using gossip protocols and
anti-entropy techniques.
Scalability: Using consistent hashing (via virtual nodes) to distribute load evenly and
scale out incrementally.
Elixir is chosen due to its native support for concurrency, fault isolation through lightweight
processes, and the OTP framework, all of which map naturally to the distributed system
challenges addressed by Dynamo.
44/50
Server Nodes: Each server runs a process (or set of processes) that handles client
requests. They maintain a local key-value store, vector clocks for versioning, and manage
replication across a set of nodes.
Client Nodes: Clients send get and put requests. They aggregate responses and perform
local reconciliation based on vector clock information.
Consistent Hash Ring: The project uses the ExHashRing library to assign keys to nodes.
Virtual nodes (vnodes) help balance the load across heterogeneous servers.
Preference Lists:
The get_preference_list/2 function in the Dynamo module computes which nodes
(based on their positions in the hash ring) are responsible for a given key. This list is
used to determine the nodes for replication.
45/50
Gossip Protocol:
The server periodically exchanges status information with other nodes. Functions such
as gossipExchange/4 and related message handling in the Dynamo module ensure that
nodes are aware of each other’s health.
Hinted Handoff:
When a node is detected as failed, updates destined for it are temporarily stored at an
alternate node with a “hint” so they can be forwarded later.
Anti-Entropy Synchronization:
Using a simplified mechanism (inspired by Merkle trees), nodes periodically compare
portions of their key-value store (using timers like :antientropy ) and synchronize
discrepancies.
4. Implementation Details
4.1 Code Organization
Protocol Module ( protocol.ex ):
Contains the main logic for server operations, including:
Request Handling:
Receives and processes put and get requests from clients. It distinguishes between
coordinator actions (first replica) and replication steps.
Request Dispatch:
Clients randomly choose a server from a provided list to send get/put requests.
46/50
Vector Clock Reconciliation:
Upon receiving responses, the client updates its global vector clock to maintain
causality.
Message structures ensure that all necessary fields (key, value, client, sequence
number, context, etc.) are included.
2. The coordinator node (determined by the preference list) processes the request,
updates the key’s vector clock using Dynamo.VectorClock.updateVectorClock/5 ,
and stores the key-value pair.
3. The request is replicated to other nodes in the preference list. Timers are set to
detect timeouts.
4. Once the write quorum is met, a success response is sent back to the client.
2. The coordinator gathers responses from the nodes in the preference list.
3. If multiple versions exist (due to concurrent updates), the system performs syntactic
reconciliation using vector clocks.
47/50
Failure and Recovery:
The code uses timers to simulate node failures. For example, receiving a {:failNode,
failTime} message transitions a node to a failed state. Recovery is scheduled via a
timer that triggers the :recover message.
Metrics Collection:
You can measure:
Availability: System behavior under simulated node failures and during recovery
phases.
5.2 Experimentation
Failure Injection:
Simulate various failure scenarios (e.g., node crash, network partition) and observe how
the gossip protocol and hinted handoff mechanisms restore consistency.
Load Distribution:
Evaluate the effectiveness of consistent hashing and virtual nodes by examining key
distribution and load balancing across the nodes.
48/50
6. Conclusion and Future Work
6.1 Achievements
Robust Distributed Key-Value Store:
The project demonstrates a working implementation of a Dynamo-inspired system in
Elixir, with distributed operations, replication, and fault tolerance.
Leveraging Elixir/OTP:
By using lightweight processes and OTP supervision, the system achieves natural
concurrency and resilience.
Dynamic Reconfiguration:
Implement dynamic adjustment of replication factors and quorum settings based on
observed load and failure conditions.
7. References
Dynamo: Amazon’s Highly Available Key-value Store – Provides the foundational
design principles and trade-offs.
49/50
ExHashRing Library Documentation – For consistent hashing and virtual node
management.
Elixir and OTP Documentation – For insights into building fault-tolerant distributed
systems in Elixir.
This report offers a comprehensive overview of your project’s goals, design rationale,
implementation specifics, and testing strategies. It ties together the provided code structure
and highlights how each module contributes to the overall system, ensuring a robust,
scalable, and highly available distributed key-value store.
Here's a comprehensive and detailed project proposal document for your Dynamo-inspired
distributed key-value store implementation in Elixir.
Here's your detailed project proposal document. Let me know if there's anything else you'd
like refined or expanded!
50/50