Concurrency — Task, Agent, and ETS

Elixir concurrency primitives — Task for async work, Agent for simple state, ETS for in-memory key-value storage

Task

Tasks are for one-off concurrent computations. They wrap spawning a process and retrieving the result.

Task.async / Task.await

# Single task
task = Task.async(fn -> expensive_computation() end)
result = Task.await(task, 5000)  # 5s timeout, returns result or raises

# Multiple tasks (parallel map)
tasks = Enum.map(urls, fn url ->
  Task.async(fn -> HTTP.get(url) end)
end)
results = Task.await_many(tasks, 10_000)  # 10s timeout per task

Task.async_stream

Process a collection in parallel with bounded concurrency:

# Process items with max 10 concurrent tasks
results = Task.async_stream(1..100, &process/1, max_concurrency: 10)
|> Enum.to_list()

# With timeout and on_timeout option
results = Task.async_stream(urls, &fetch/1,
  max_concurrency: 4,
  timeout: 5000,
  on_timeout: :kill_task
)
|> Enum.to_list()

Fire-and-forget tasks (no result collection):

# Supervised (restarts on crash)
{:ok, pid} = Task.start_link(fn -> send_email(user) end)

# Unsupervised (for side effects)
{:ok, pid} = Task.start(fn -> log_event(event) end)

Yield (non-blocking check)

task = Task.async(fn -> slow_computation() end)

case Task.yield(task, 1000) || Task.shutdown(task) do
  {:ok, result} -> result
  nil -> :timeout
end

Agent

Agents are a simplified GenServer for holding state. Good for simple state; use GenServer for complex logic.

# Start an agent
{:ok, pid} = Agent.start_link(fn -> %{} end)
{:ok, _} = Agent.start_link(fn -> %{} end, name: :cache)

# Read state
Agent.get(:cache, fn state -> state end)            # => %{}
Agent.get(:cache, fn state -> state[:key] end)      # => nil
Agent.get(:cache, & &1)                              # => %{}

# Update state
Agent.update(:cache, fn state -> Map.put(state, :key, "value") end)

# Get and update in one call
Agent.get_and_update(:cache, fn state ->
  {state[:count], Map.update(state, :count, 1, &(&1 + 1))}
end)

# Stop an agent
Agent.stop(:cache)

When to use Agent vs GenServer: Use Agent for simple get/update operations. Use GenServer when you need to handle multiple message types, complex state transitions, or timeout logic.

ETS

ETS (Erlang Term Storage) is an in-memory key-value store. Fast O(1) lookups, shared across processes, lives as long as the owning process.

Creating a table

# Table types: :set (unique keys), :ordered_set (sorted), :bag (duplicates ok), :duplicate_bag
:ets.new(:users, [:set, :public, :named_table])

# Access levels: :public (any process), :protected (owner writes, any reads), :private (owner only)
:ets.new(:sessions, [:set, :protected, :named_table])

CRUD operations

# Insert
:ets.insert(:users, {:alice, 30, "[email protected]"})
:ets.insert(:users, {:bob, 25, "[email protected]"})

# Lookup (returns a list)
:ets.lookup(:users, :alice)    # => [{:alice, 30, "[email protected]"}]

# Update element at position (0-indexed)
:ets.update_element(:users, :alice, [{2, 31}])  # update age to 31

# Delete
:ets.delete(:users, :bob)

# Delete all entries
:ets.delete_all_objects(:users)

# Delete the table
:ets.delete(:users)

Querying

# Match spec: replaces _ with :"$n" positional variables
:ets.match(:users, {:"$1", :_, :_})          # => [[:alice], [:bob]]
:ets.match_object(:users, {:_, 30, :"_"})    # => [{:alice, 30, "..."}]

# match_delete
:ets.match_delete(:users, {:_, :_, :_})       # delete all entries

# Select with complex queries
:ets.select(:users, [
  {{:"$1", :"$2", :_}, [{:>, :"$2", 25}], [:"$1"]}   # names where age > 25
])
# => [:alice]

# Info
:ets.info(:users)             # => table info
:ets.info(:users, :size)      # => number of entries
:ets.info(:users, :memory)    # => memory in words

ETS as a cache

defmodule Cache do
  use Agent

  def start_link(_opts) do
    Agent.start_link(fn ->
      :ets.new(__MODULE__, [:set, :public, :named_table])
    end, name: __MODULE__)
  end

  def get(key) do
    case :ets.lookup(__MODULE__, key) do
      [{^key, value}] -> {:ok, value}
      [] -> :not_found
    end
  end

  def put(key, value) do
    :ets.insert(__MODULE__, {key, value})
    :ok
  end

  def put(key, value, ttl_ms) do
    :ets.insert(__MODULE__, {key, value, System.monotonic_time(:millisecond) + ttl_ms})
    :ok
  end
end

Task.Supervisor

For properly supervised tasks:

# In your supervision tree:
children = [
  {Task.Supervisor, name: MyApp.TaskSupervisor}
]

# Start supervised tasks
Task.Supervisor.async(MyApp.TaskSupervisor, fn -> do_work() end)
Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> do_work() end)

# The async_nolink variant won't crash the caller if the task fails

Comparison

Tool Best for State Concurrency Persistence
Task One-off computations None Per-task Process dies → gone
Agent Simple shared state Single value Per-call Process dies → gone
GenServer Complex state/logic Structured Per-call Process dies → gone
ETS Fast key-value lookups Table of records Read-parallel Owning process dies → gone
DETS Disk-based key-value Table of records Read-parallel Survives restarts