I recently saw a blog talking about the distributed counters and what it would take to make it in java, I was like let’s see what it would take to make the same in the Elixir ❤ and Go 🙂
Distributed systems are inherently complex, but Elixir’s actor model and OTP (Open Telecom Platform) framework make building robust distributed applications surprisingly elegant. Today, let’s see how to build a distributed counter that can handle failures gracefully while maintaining consistency across multiple nodes.
Why Distributed Counters Matter
Before diving into the implementation, let’s understand why we need distributed counters. In modern applications, we often need to track metrics like page views, API calls, or user actions across multiple servers. A simple in-memory counter won’t work because:
- Single point of failure
- Limited by single machine resources
- No persistence across restarts
- No coordination between instances
Our distributed counter will solve these problems by spreading the load across multiple nodes while maintaining consistency.
The Elixir Approach: Embracing the Actor Model
Elixir’s strength lies in its actor model implementation through lightweight processes. Each process is isolated, communicates via message passing, and can fail without affecting others. This makes it perfect for distributed systems.
Setting Up the Project
First, let’s create a new Elixir project:
mix new distributed_counter --sup
cd distributed_counterCode language: Elixir (elixir)
Add the following dependencies to mix.exs to enable the distributed mode in Elixir.
defp deps do
[
{:libcluster, "~> 3.3"},
{:phoenix_pubsub, "~> 2.1"}
]
endCode language: Elixir (elixir)
Core Counter GenServer
Let’s start with our main counter module using GenServer
defmodule DistributedCounter.Counter do
use GenServer
require Logger
@registry DistributedCounter.Registry
@pubsub DistributedCounter.PubSub
defstruct [:name, :value, :node_values]
## Client API
def start_link(name) do
GenServer.start_link(__MODULE__, name, name: via_tuple(name))
end
def increment(name, amount \\ 1) do
GenServer.call(via_tuple(name), {:increment, amount})
end
def decrement(name, amount \\ 1) do
GenServer.call(via_tuple(name), {:decrement, amount})
end
def get_value(name) do
GenServer.call(via_tuple(name), :get_value)
end
def get_distributed_value(name) do
GenServer.call(via_tuple(name), :get_distributed_value)
end
## Server Callbacks
@impl true
def init(name) do
Phoenix.PubSub.subscribe(@pubsub, "counter:#{name}")
state = %__MODULE__{
name: name,
value: 0,
node_values: %{Node.self() => 0}
}
# Sync with other nodes
broadcast_sync_request(name)
{:ok, state}
end
@impl true
def handle_call({:increment, amount}, _from, state) do
new_value = state.value + amount
new_state = %{state | value: new_value}
# Update our node's value in the distributed map
node_values = Map.put(state.node_values, Node.self(), new_value)
final_state = %{new_state | node_values: node_values}
# Broadcast the change
broadcast_change(state.name, Node.self(), new_value)
{:reply, new_value, final_state}
end
@impl true
def handle_call({:decrement, amount}, _from, state) do
new_value = state.value - amount
new_state = %{state | value: new_value}
node_values = Map.put(state.node_values, Node.self(), new_value)
final_state = %{new_state | node_values: node_values}
broadcast_change(state.name, Node.self(), new_value)
{:reply, new_value, final_state}
end
@impl true
def handle_call(:get_value, _from, state) do
{:reply, state.value, state}
end
@impl true
def handle_call(:get_distributed_value, _from, state) do
total = state.node_values |> Map.values() |> Enum.sum()
{:reply, total, state}
end
@impl true
def handle_info({:counter_change, node, value}, state) do
node_values = Map.put(state.node_values, node, value)
new_state = %{state | node_values: node_values}
{:noreply, new_state}
end
@impl true
def handle_info({:sync_request, requesting_node}, state) do
# Send our current value to the requesting node
Phoenix.PubSub.broadcast(@pubsub, "counter:#{state.name}", {:sync_response, Node.self(), state.value})
{:noreply, state}
end
@impl true
def handle_info({:sync_response, node, value}, state) do
node_values = Map.put(state.node_values, node, value)
new_state = %{state | node_values: node_values}
{:noreply, new_state}
end
@impl true
def handle_info({:nodeup, node}, state) do
Logger.info("Node #{node} joined - syncing counter #{state.name}")
broadcast_sync_request(state.name)
{:noreply, state}
end
@impl true
def handle_info({:nodedown, node}, state) do
Logger.info("Node #{node} left - removing from counter #{state.name}")
node_values = Map.delete(state.node_values, node)
new_state = %{state | node_values: node_values}
{:noreply, new_state}
end
## Private Functions
defp via_tuple(name) do
{:via, Registry, {@registry, name}}
end
defp broadcast_change(counter_name, node, value) do
Phoenix.PubSub.broadcast(@pubsub, "counter:#{counter_name}", {:counter_change, node, value})
end
defp broadcast_sync_request(counter_name) do
Phoenix.PubSub.broadcast(@pubsub, "counter:#{counter_name}", {:sync_request, Node.self()})
end
endCode language: Elixir (elixir)
Dynamic Supervisor for Counter Management
Now let’s create a dynamic supervisor to manage multiple counters:
defmodule DistributedCounter.CounterSupervisor do
use DynamicSupervisor
def start_link(init_arg) do
DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
def start_counter(name) do
child_spec = %{
id: DistributedCounter.Counter,
start: {DistributedCounter.Counter, :start_link, [name]},
restart: :transient
}
DynamicSupervisor.start_child(__MODULE__, child_spec)
end
def stop_counter(name) do
case Registry.lookup(DistributedCounter.Registry, name) do
[{pid, _}] ->
DynamicSupervisor.terminate_child(__MODULE__, pid)
[] ->
{:error, :not_found}
end
end
@impl true
def init(_init_arg) do
DynamicSupervisor.init(strategy: :one_for_one)
end
endCode language: Elixir (elixir)
Main Application Module
Let’s set up the main application:
defmodule DistributedCounter.Application do
use Application
@impl true
def start(_type, _args) do
children = [
{Registry, keys: :unique, name: DistributedCounter.Registry},
{Phoenix.PubSub, name: DistributedCounter.PubSub},
{Cluster.Supervisor, [Application.get_env(:distributed_counter, :libcluster, []), [name: DistributedCounter.ClusterSupervisor]]},
{DistributedCounter.CounterSupervisor, []},
{DistributedCounter.NodeMonitor, []}
]
opts = [strategy: :one_for_one, name: DistributedCounter.Supervisor]
Supervisor.start_link(children, opts)
end
endCode language: Elixir (elixir)
Node Monitoring
To handle node joins and leaves:
defmodule DistributedCounter.NodeMonitor do
use GenServer
require Logger
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
@impl true
def init(_) do
:net_kernel.monitor_nodes(true, node_type: :visible)
{:ok, %{}}
end
@impl true
def handle_info({:nodeup, node, _info}, state) do
Logger.info("Node #{node} connected")
Registry.dispatch(DistributedCounter.Registry, :all, fn entries ->
for {pid, counter_name} <- entries do
send(pid, {:nodeup, node})
end
end)
{:noreply, state}
end
@impl true
def handle_info({:nodedown, node, _info}, state) do
Logger.info("Node #{node} disconnected")
Registry.dispatch(DistributedCounter.Registry, :all, fn entries ->
for {pid, counter_name} <- entries do
send(pid, {:nodedown, node})
end
end)
{:noreply, state}
end
endCode language: Elixir (elixir)
High-Level API
Finally, let’s create a convenient API:
defmodule DistributedCounter do
alias DistributedCounter.{Counter, CounterSupervisor}
def create_counter(name) do
CounterSupervisor.start_counter(name)
end
def increment(name, amount \\ 1) do
ensure_counter_exists(name)
Counter.increment(name, amount)
end
def decrement(name, amount \\ 1) do
ensure_counter_exists(name)
Counter.decrement(name, amount)
end
def get_local_value(name) do
case Registry.lookup(DistributedCounter.Registry, name) do
[{_pid, _}] -> Counter.get_value(name)
[] -> {:error, :not_found}
end
end
def get_total_value(name) do
case Registry.lookup(DistributedCounter.Registry, name) do
[{_pid, _}] -> Counter.get_distributed_value(name)
[] -> {:error, :not_found}
end
end
defp ensure_counter_exists(name) do
case Registry.lookup(DistributedCounter.Registry, name) do
[] -> create_counter(name)
_ -> :ok
end
end
endCode language: Elixir (elixir)
Configuration
Add to config/config.exs:
import Config
config :distributed_counter, :libcluster,
topologies: [
gossip: [
strategy: Cluster.Strategy.Gossip,
config: [
port: 45892,
if_addr: "0.0.0.0",
multicast_addr: "230.1.1.251",
multicast_ttl: 1,
secret: "someSecret"
]
]
]
config :logger, :console,
format: "[$level] $message\n"Code language: Elixir (elixir)
Testing the Implementation
Let’s create a simple test:
defmodule DistributedCounterTest do
use ExUnit.Case
test "basic counter operations" do
name = "test_counter_#{:rand.uniform(1000)}"
assert {:ok, _pid} = DistributedCounter.create_counter(name)
assert 5 = DistributedCounter.increment(name, 5)
assert 8 = DistributedCounter.increment(name, 3)
assert 5 = DistributedCounter.decrement(name, 3)
assert {:ok, 5} = DistributedCounter.get_local_value(name)
assert {:ok, 5} = DistributedCounter.get_total_value(name)
end
endCode language: Elixir (elixir)
Running the System
To test with multiple nodes:
# Terminal 1
iex --name node1@127.0.0.1 -S mix
# Terminal 2
iex --name node2@127.0.0.1 -S mix
# In each terminal:
iex>DistributedCounter.create_counter("my_counter")
iex>DistributedCounter.increment("my_counter", 10)
iex>DistributedCounter.get_total_value("my_counter")Code language: Elixir (elixir)
Key Benefits of the Elixir Approach
The Elixir implementation showcases several powerful features:
Fault Tolerance: Each counter runs in its own process. If one fails, others continue working, and the supervisor restarts failed processes automatically.
Concurrency: Thousands of counters can run simultaneously without blocking each other, thanks to the actor model.
Distribution: Built-in clustering support makes it easy to connect nodes and share state.
Consistency: Phoenix PubSub ensures all nodes stay synchronized with counter changes.
Hot Code Updates: You can update counter logic without stopping the system.
Potential Improvements
While our implementation works well, there are areas for enhancement:
- Add persistence with ETS or a database
- Implement conflict resolution for network partitions
- Add rate limiting and back-pressure
- Create more sophisticated clustering strategies
- Add metrics and monitoring
The beauty of Elixir’s approach lies in its simplicity and reliability. The actor model naturally handles the complexities of distributed systems, making our code both readable and robust. In our next post, we’ll see how Go approaches the same problem with a very different philosophy.
