Building a Distributed Counter in Elixir: Leveraging the Actor Model for Fault-Tolerant Concurrency

I recently saw a blog talking about the distributed counters and what it would take to make it in java, I was like let’s see what it would take to make the same in the Elixir ❤ and Go 🙂

Distributed systems are inherently complex, but Elixir’s actor model and OTP (Open Telecom Platform) framework make building robust distributed applications surprisingly elegant. Today, let’s see how to build a distributed counter that can handle failures gracefully while maintaining consistency across multiple nodes.

Why Distributed Counters Matter

Before diving into the implementation, let’s understand why we need distributed counters. In modern applications, we often need to track metrics like page views, API calls, or user actions across multiple servers. A simple in-memory counter won’t work because:

  • Single point of failure
  • Limited by single machine resources
  • No persistence across restarts
  • No coordination between instances

Our distributed counter will solve these problems by spreading the load across multiple nodes while maintaining consistency.

The Elixir Approach: Embracing the Actor Model

Elixir’s strength lies in its actor model implementation through lightweight processes. Each process is isolated, communicates via message passing, and can fail without affecting others. This makes it perfect for distributed systems.

Setting Up the Project

First, let’s create a new Elixir project:

mix new distributed_counter --sup
cd distributed_counterCode language: Elixir (elixir)

Add the following dependencies to mix.exs to enable the distributed mode in Elixir.

defp deps do
  [
    {:libcluster, "~> 3.3"},
    {:phoenix_pubsub, "~> 2.1"}
  ]
endCode language: Elixir (elixir)

Core Counter GenServer

Let’s start with our main counter module using GenServer

defmodule DistributedCounter.Counter do
  use GenServer
  require Logger

  @registry DistributedCounter.Registry
  @pubsub DistributedCounter.PubSub

  defstruct [:name, :value, :node_values]

  ## Client API
  def start_link(name) do
    GenServer.start_link(__MODULE__, name, name: via_tuple(name))
  end

  def increment(name, amount \\ 1) do
    GenServer.call(via_tuple(name), {:increment, amount})
  end

  def decrement(name, amount \\ 1) do
    GenServer.call(via_tuple(name), {:decrement, amount})
  end

  def get_value(name) do
    GenServer.call(via_tuple(name), :get_value)
  end

  def get_distributed_value(name) do
    GenServer.call(via_tuple(name), :get_distributed_value)
  end

  ## Server Callbacks
  @impl true
  def init(name) do
    Phoenix.PubSub.subscribe(@pubsub, "counter:#{name}")

    state = %__MODULE__{
      name: name,
      value: 0,
      node_values: %{Node.self() => 0}
    }

    # Sync with other nodes
    broadcast_sync_request(name)

    {:ok, state}
  end

  @impl true
  def handle_call({:increment, amount}, _from, state) do
    new_value = state.value + amount
    new_state = %{state | value: new_value}

    # Update our node's value in the distributed map
    node_values = Map.put(state.node_values, Node.self(), new_value)
    final_state = %{new_state | node_values: node_values}

    # Broadcast the change
    broadcast_change(state.name, Node.self(), new_value)

    {:reply, new_value, final_state}
  end

  @impl true
  def handle_call({:decrement, amount}, _from, state) do
    new_value = state.value - amount
    new_state = %{state | value: new_value}

    node_values = Map.put(state.node_values, Node.self(), new_value)
    final_state = %{new_state | node_values: node_values}

    broadcast_change(state.name, Node.self(), new_value)

    {:reply, new_value, final_state}
  end

  @impl true
  def handle_call(:get_value, _from, state) do
    {:reply, state.value, state}
  end

  @impl true
  def handle_call(:get_distributed_value, _from, state) do
    total = state.node_values |> Map.values() |> Enum.sum()
    {:reply, total, state}
  end

  @impl true
  def handle_info({:counter_change, node, value}, state) do
    node_values = Map.put(state.node_values, node, value)
    new_state = %{state | node_values: node_values}
    {:noreply, new_state}
  end

  @impl true
  def handle_info({:sync_request, requesting_node}, state) do
    # Send our current value to the requesting node
    Phoenix.PubSub.broadcast(@pubsub, "counter:#{state.name}", {:sync_response, Node.self(), state.value})
    {:noreply, state}
  end

  @impl true
  def handle_info({:sync_response, node, value}, state) do
    node_values = Map.put(state.node_values, node, value)
    new_state = %{state | node_values: node_values}
    {:noreply, new_state}
  end

  @impl true
  def handle_info({:nodeup, node}, state) do
    Logger.info("Node #{node} joined - syncing counter #{state.name}")
    broadcast_sync_request(state.name)
    {:noreply, state}
  end

  @impl true
  def handle_info({:nodedown, node}, state) do
    Logger.info("Node #{node} left - removing from counter #{state.name}")
    node_values = Map.delete(state.node_values, node)
    new_state = %{state | node_values: node_values}
    {:noreply, new_state}
  end

  ## Private Functions
  defp via_tuple(name) do
    {:via, Registry, {@registry, name}}
  end

  defp broadcast_change(counter_name, node, value) do
    Phoenix.PubSub.broadcast(@pubsub, "counter:#{counter_name}", {:counter_change, node, value})
  end

  defp broadcast_sync_request(counter_name) do
    Phoenix.PubSub.broadcast(@pubsub, "counter:#{counter_name}", {:sync_request, Node.self()})
  end
endCode language: Elixir (elixir)

Dynamic Supervisor for Counter Management

Now let’s create a dynamic supervisor to manage multiple counters:

defmodule DistributedCounter.CounterSupervisor do
  use DynamicSupervisor

  def start_link(init_arg) do
    DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  def start_counter(name) do
    child_spec = %{
      id: DistributedCounter.Counter,
      start: {DistributedCounter.Counter, :start_link, [name]},
      restart: :transient
    }
    
    DynamicSupervisor.start_child(__MODULE__, child_spec)
  end

  def stop_counter(name) do
    case Registry.lookup(DistributedCounter.Registry, name) do
      [{pid, _}] ->
        DynamicSupervisor.terminate_child(__MODULE__, pid)
      [] ->
        {:error, :not_found}
    end
  end

  @impl true
  def init(_init_arg) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end
endCode language: Elixir (elixir)

Main Application Module

Let’s set up the main application:

defmodule DistributedCounter.Application do
  use Application

@impl true
  def start(_type, _args) do
    children = [
      {Registry, keys: :unique, name: DistributedCounter.Registry},
      {Phoenix.PubSub, name: DistributedCounter.PubSub},
      {Cluster.Supervisor, [Application.get_env(:distributed_counter, :libcluster, []), [name: DistributedCounter.ClusterSupervisor]]},
      {DistributedCounter.CounterSupervisor, []},
      {DistributedCounter.NodeMonitor, []}
    ]
    opts = [strategy: :one_for_one, name: DistributedCounter.Supervisor]
    Supervisor.start_link(children, opts)
  end
endCode language: Elixir (elixir)

Node Monitoring

To handle node joins and leaves:

defmodule DistributedCounter.NodeMonitor do
  use GenServer

  require Logger

  def start_link(_) do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  @impl true
  def init(_) do
    :net_kernel.monitor_nodes(true, node_type: :visible)
    {:ok, %{}}
  end

  @impl true
  def handle_info({:nodeup, node, _info}, state) do
    Logger.info("Node #{node} connected")

    Registry.dispatch(DistributedCounter.Registry, :all, fn entries ->
      for {pid, counter_name} <- entries do
        send(pid, {:nodeup, node})
      end
    end)

    {:noreply, state}
  end

  @impl true
  def handle_info({:nodedown, node, _info}, state) do
    Logger.info("Node #{node} disconnected")

    Registry.dispatch(DistributedCounter.Registry, :all, fn entries ->
      for {pid, counter_name} <- entries do
        send(pid, {:nodedown, node})
      end
    end)

    {:noreply, state}
  end
endCode language: Elixir (elixir)

High-Level API

Finally, let’s create a convenient API:

defmodule DistributedCounter do
  alias DistributedCounter.{Counter, CounterSupervisor}

def create_counter(name) do
    CounterSupervisor.start_counter(name)
  end
  def increment(name, amount \\ 1) do
    ensure_counter_exists(name)
    Counter.increment(name, amount)
  end
  def decrement(name, amount \\ 1) do
    ensure_counter_exists(name)
    Counter.decrement(name, amount)
  end
  def get_local_value(name) do
    case Registry.lookup(DistributedCounter.Registry, name) do
      [{_pid, _}] -> Counter.get_value(name)
      [] -> {:error, :not_found}
    end
  end
  def get_total_value(name) do
    case Registry.lookup(DistributedCounter.Registry, name) do
      [{_pid, _}] -> Counter.get_distributed_value(name)
      [] -> {:error, :not_found}
    end
  end
  defp ensure_counter_exists(name) do
    case Registry.lookup(DistributedCounter.Registry, name) do
      [] -> create_counter(name)
      _ -> :ok
    end
  end
endCode language: Elixir (elixir)

Configuration

Add to config/config.exs:

import Config

config :distributed_counter, :libcluster,
  topologies: [
    gossip: [
      strategy: Cluster.Strategy.Gossip,
      config: [
        port: 45892,
        if_addr: "0.0.0.0",
        multicast_addr: "230.1.1.251",
        multicast_ttl: 1,
        secret: "someSecret"
      ]
    ]
  ]
config :logger, :console,
  format: "[$level] $message\n"Code language: Elixir (elixir)

Testing the Implementation

Let’s create a simple test:

defmodule DistributedCounterTest do
  use ExUnit.Case

test "basic counter operations" do
    name = "test_counter_#{:rand.uniform(1000)}"
    
    assert {:ok, _pid} = DistributedCounter.create_counter(name)
    
    assert 5 = DistributedCounter.increment(name, 5)
    assert 8 = DistributedCounter.increment(name, 3)
    assert 5 = DistributedCounter.decrement(name, 3)
    
    assert {:ok, 5} = DistributedCounter.get_local_value(name)
    assert {:ok, 5} = DistributedCounter.get_total_value(name)
  end
endCode language: Elixir (elixir)

Running the System

To test with multiple nodes:

# Terminal 1
iex --name node1@127.0.0.1 -S mix

# Terminal 2  
iex --name node2@127.0.0.1 -S mix
# In each terminal:
iex>DistributedCounter.create_counter("my_counter")
iex>DistributedCounter.increment("my_counter", 10)
iex>DistributedCounter.get_total_value("my_counter")Code language: Elixir (elixir)

Key Benefits of the Elixir Approach

The Elixir implementation showcases several powerful features:

Fault Tolerance: Each counter runs in its own process. If one fails, others continue working, and the supervisor restarts failed processes automatically.

Concurrency: Thousands of counters can run simultaneously without blocking each other, thanks to the actor model.

Distribution: Built-in clustering support makes it easy to connect nodes and share state.

Consistency: Phoenix PubSub ensures all nodes stay synchronized with counter changes.

Hot Code Updates: You can update counter logic without stopping the system.

Potential Improvements

While our implementation works well, there are areas for enhancement:

  • Add persistence with ETS or a database
  • Implement conflict resolution for network partitions
  • Add rate limiting and back-pressure
  • Create more sophisticated clustering strategies
  • Add metrics and monitoring

The beauty of Elixir’s approach lies in its simplicity and reliability. The actor model naturally handles the complexities of distributed systems, making our code both readable and robust. In our next post, we’ll see how Go approaches the same problem with a very different philosophy.

Leave a Comment

Your email address will not be published. Required fields are marked *