Concurrency — Task, Agent, and ETS
Elixir concurrency primitives — Task for async work, Agent for simple state, ETS for in-memory key-value storage
Task
Tasks are for one-off concurrent computations. They wrap spawning a process and retrieving the result.
Task.async / Task.await
# Single task
task = Task.async(fn -> expensive_computation() end)
result = Task.await(task, 5000) # 5s timeout, returns result or raises
# Multiple tasks (parallel map)
tasks = Enum.map(urls, fn url ->
Task.async(fn -> HTTP.get(url) end)
end)
results = Task.await_many(tasks, 10_000) # 10s timeout per taskTask.async_stream
Process a collection in parallel with bounded concurrency:
# Process items with max 10 concurrent tasks
results = Task.async_stream(1..100, &process/1, max_concurrency: 10)
|> Enum.to_list()
# With timeout and on_timeout option
results = Task.async_stream(urls, &fetch/1,
max_concurrency: 4,
timeout: 5000,
on_timeout: :kill_task
)
|> Enum.to_list()Task.start / Task.start_link
Fire-and-forget tasks (no result collection):
# Supervised (restarts on crash)
{:ok, pid} = Task.start_link(fn -> send_email(user) end)
# Unsupervised (for side effects)
{:ok, pid} = Task.start(fn -> log_event(event) end)Yield (non-blocking check)
task = Task.async(fn -> slow_computation() end)
case Task.yield(task, 1000) || Task.shutdown(task) do
{:ok, result} -> result
nil -> :timeout
endAgent
Agents are a simplified GenServer for holding state. Good for simple state; use GenServer for complex logic.
# Start an agent
{:ok, pid} = Agent.start_link(fn -> %{} end)
{:ok, _} = Agent.start_link(fn -> %{} end, name: :cache)
# Read state
Agent.get(:cache, fn state -> state end) # => %{}
Agent.get(:cache, fn state -> state[:key] end) # => nil
Agent.get(:cache, & &1) # => %{}
# Update state
Agent.update(:cache, fn state -> Map.put(state, :key, "value") end)
# Get and update in one call
Agent.get_and_update(:cache, fn state ->
{state[:count], Map.update(state, :count, 1, &(&1 + 1))}
end)
# Stop an agent
Agent.stop(:cache)When to use Agent vs GenServer: Use Agent for simple get/update operations. Use GenServer when you need to handle multiple message types, complex state transitions, or timeout logic.
ETS
ETS (Erlang Term Storage) is an in-memory key-value store. Fast O(1) lookups, shared across processes, lives as long as the owning process.
Creating a table
# Table types: :set (unique keys), :ordered_set (sorted), :bag (duplicates ok), :duplicate_bag
:ets.new(:users, [:set, :public, :named_table])
# Access levels: :public (any process), :protected (owner writes, any reads), :private (owner only)
:ets.new(:sessions, [:set, :protected, :named_table])CRUD operations
# Insert
:ets.insert(:users, {:alice, 30, "[email protected]"})
:ets.insert(:users, {:bob, 25, "[email protected]"})
# Lookup (returns a list)
:ets.lookup(:users, :alice) # => [{:alice, 30, "[email protected]"}]
# Update element at position (0-indexed)
:ets.update_element(:users, :alice, [{2, 31}]) # update age to 31
# Delete
:ets.delete(:users, :bob)
# Delete all entries
:ets.delete_all_objects(:users)
# Delete the table
:ets.delete(:users)Querying
# Match spec: replaces _ with :"$n" positional variables
:ets.match(:users, {:"$1", :_, :_}) # => [[:alice], [:bob]]
:ets.match_object(:users, {:_, 30, :"_"}) # => [{:alice, 30, "..."}]
# match_delete
:ets.match_delete(:users, {:_, :_, :_}) # delete all entries
# Select with complex queries
:ets.select(:users, [
{{:"$1", :"$2", :_}, [{:>, :"$2", 25}], [:"$1"]} # names where age > 25
])
# => [:alice]
# Info
:ets.info(:users) # => table info
:ets.info(:users, :size) # => number of entries
:ets.info(:users, :memory) # => memory in wordsETS as a cache
defmodule Cache do
use Agent
def start_link(_opts) do
Agent.start_link(fn ->
:ets.new(__MODULE__, [:set, :public, :named_table])
end, name: __MODULE__)
end
def get(key) do
case :ets.lookup(__MODULE__, key) do
[{^key, value}] -> {:ok, value}
[] -> :not_found
end
end
def put(key, value) do
:ets.insert(__MODULE__, {key, value})
:ok
end
def put(key, value, ttl_ms) do
:ets.insert(__MODULE__, {key, value, System.monotonic_time(:millisecond) + ttl_ms})
:ok
end
endTask.Supervisor
For properly supervised tasks:
# In your supervision tree:
children = [
{Task.Supervisor, name: MyApp.TaskSupervisor}
]
# Start supervised tasks
Task.Supervisor.async(MyApp.TaskSupervisor, fn -> do_work() end)
Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> do_work() end)
# The async_nolink variant won't crash the caller if the task failsComparison
| Tool | Best for | State | Concurrency | Persistence |
|---|---|---|---|---|
| Task | One-off computations | None | Per-task | Process dies → gone |
| Agent | Simple shared state | Single value | Per-call | Process dies → gone |
| GenServer | Complex state/logic | Structured | Per-call | Process dies → gone |
| ETS | Fast key-value lookups | Table of records | Read-parallel | Owning process dies → gone |
| DETS | Disk-based key-value | Table of records | Read-parallel | Survives restarts |