Channels
Phoenix Channels — real-time pub/sub communication with topics, sockets, and broadcasts
Channels provide soft real-time communication over WebSockets (with long-polling fallback). They follow a pub/sub model organized by topics.
Channel architecture
Client ←→ Socket ←→ Channel (topic) ←→ PubSub ←→ Other processes
- Socket — The transport connection (WebSocket or long-poll)
- Channel — A topic-based conversation
- Topic — A string like
"rooms:42"or"notifications:123" - PubSub — The broadcast backbone
Setting up channels
1. Define a socket
# lib/my_app_web/channels/user_socket.ex
defmodule MyAppWeb.UserSocket do
use Phoenix.Socket
channel "rooms:*", MyAppWeb.RoomChannel
channel "notifications:*", MyAppWeb.NotificationChannel
@impl true
def connect(%{"token" => token}, socket, _connect_info) do
case Phoenix.Token.verify(socket, "user socket", token, max_age: 86_400) do
{:ok, user_id} ->
{:ok, assign(socket, :user_id, user_id)}
{:error, _} ->
:error
end
end
@impl true
def id(socket), do: "user_socket:#{socket.assigns.user_id}"
end2. Mount the socket in the endpoint
# lib/my_app_web/endpoint.ex
socket "/socket", MyAppWeb.UserSocket,
websocket: true,
longpoll: false3. Define a channel
defmodule MyAppWeb.RoomChannel do
use Phoenix.Channel
@impl true
def join("rooms:" <> room_id, _payload, socket) do
send(self(), :after_join)
{:ok, assign(socket, :room_id, room_id)}
end
@impl true
def handle_in("new_msg", %{"body" => body}, socket) do
broadcast!(socket, "new_msg", %{body: body, user: socket.assigns.user_id})
{:noreply, socket}
end
@impl true
def handle_info(:after_join, socket) do
push(socket, "welcome", %{msg: "Welcome to room #{socket.assigns.room_id}!"})
{:noreply, socket}
end
endJoining a channel
# JavaScript (assets/js/app.js)
import {Socket} from "phoenix"
import {LiveSocket} from "phoenix_live_view"
let socket = new Socket("/socket", {params: {token: userToken}})
socket.connect()
let channel = socket.channel("rooms:42", {})
channel.join()
.receive("ok", resp => { console.log("Joined!", resp) })
.receive("error", resp => { console.log("Unable to join", resp) })
channel.on("new_msg", payload => {
console.log(payload.body)
})Sending messages
From client to server
channel.push("new_msg", {body: "Hello!"})
// With timeout and reply handling
channel.push("new_msg", {body: "Hello!"}, 10000)
.receive("ok", resp => console.log("Sent!", resp))
.receive("error", resp => console.log("Error!", resp))
.receive("timeout", () => console.log("Timed out"))From server to client
# Push to the current socket
push(socket, "notification", %{msg: "You have a new message"})
# Broadcast to all subscribers on the channel
broadcast!(socket, "new_msg", %{body: body})
# Broadcast from anywhere (not in a channel)
MyAppWeb.Endpoint.broadcast("rooms:42", "new_msg", %{body: "System message"})
# Broadcast to a user's socket
MyAppWeb.Endpoint.broadcast("user_socket:#{user_id}", "notification", %{msg: "Hi"})Intercepting broadcasts
When you need to authorize or transform broadcasts before they reach clients:
defmodule MyAppWeb.RoomChannel do
use Phoenix.Channel
intercept ["new_msg"]
@impl true
def handle_out("new_msg", payload, socket) do
if authorized?(socket.assigns.user_id, payload) do
push(socket, "new_msg", payload)
end
{:noreply, socket}
end
endChannel callbacks
| Callback | Purpose |
|---|---|
join/3 |
Authorize and initialize channel subscription |
handle_in/3 |
Handle messages from the client |
handle_out/3 |
Intercept/transform broadcasts to clients |
handle_info/2 |
Handle Elixir messages (from send/2, PubSub, etc.) |
terminate/2 |
Cleanup when channel process exits |
PubSub directly
Use PubSub for server-side broadcasting without defining a channel:
# Subscribe a process
Phoenix.PubSub.subscribe(MyApp.PubSub, "updates:#{user_id}")
# Broadcast a message
Phoenix.PubSub.broadcast(MyApp.PubSub, "updates:#{user_id}", {:user_updated, user})
# Handle the message (in LiveView)
@impl true
def handle_info({:user_updated, user}, socket) do
{:noreply, assign(socket, :user, user)}
endPresence
Track which users are online in a topic:
defmodule MyAppWeb.Presence do
use Phoenix.Presence,
otp_app: :my_app,
pubsub_server: MyApp.PubSub
def fetch(_topic, presences) do
# Enrich presence data (e.g., add user names)
presences
end
endServer-side usage
# Track a user in a topic
{:ok, _} = MyAppWeb.Presence.track(socket, "rooms:42", socket.assigns.user_id, %{
name: socket.assigns.username,
online_at: inspect(System.system_time(:second))
})
# List presences
MyAppWeb.Presence.list("rooms:42")
# => %{"1" => [%{metas: [%{name: "Alice", online_at: "1234"}]}]}Client-side usage
let presence = new Presence(channel)
presence.onSync(() => {
renderUsers(presence.list())
})
presence.onJoin((id, current, leavePres) => {
console.log(`${id} joined`)
})
presence.onLeave((id, current, leavePres) => {
console.log(`${id} left`)
})Channel error handling
# Reject a join
def join("rooms:" <> _room_id, _payload, socket) do
if authorized?(socket) do
{:ok, socket}
else
{:error, %{reason: "unauthorized"}}
end
end
# Stop the channel
def handle_in("leave", _payload, socket) do
{:stop, :normal, socket}
end
# Reply to a push
def handle_in("ping", payload, socket) do
{:reply, {:ok, payload}, socket}
end
def handle_in("bad_request", _payload, _socket) do
{:reply, {:error, %{reason: "invalid"}}, socket}
end