Channels

Phoenix Channels — real-time pub/sub communication with topics, sockets, and broadcasts

Channels provide soft real-time communication over WebSockets (with long-polling fallback). They follow a pub/sub model organized by topics.

Channel architecture

Client ←→ Socket ←→ Channel (topic) ←→ PubSub ←→ Other processes

Setting up channels

1. Define a socket

# lib/my_app_web/channels/user_socket.ex
defmodule MyAppWeb.UserSocket do
  use Phoenix.Socket

  channel "rooms:*", MyAppWeb.RoomChannel
  channel "notifications:*", MyAppWeb.NotificationChannel

  @impl true
  def connect(%{"token" => token}, socket, _connect_info) do
    case Phoenix.Token.verify(socket, "user socket", token, max_age: 86_400) do
      {:ok, user_id} ->
        {:ok, assign(socket, :user_id, user_id)}
      {:error, _} ->
        :error
    end
  end

  @impl true
  def id(socket), do: "user_socket:#{socket.assigns.user_id}"
end

2. Mount the socket in the endpoint

# lib/my_app_web/endpoint.ex
socket "/socket", MyAppWeb.UserSocket,
  websocket: true,
  longpoll: false

3. Define a channel

defmodule MyAppWeb.RoomChannel do
  use Phoenix.Channel

  @impl true
  def join("rooms:" <> room_id, _payload, socket) do
    send(self(), :after_join)
    {:ok, assign(socket, :room_id, room_id)}
  end

  @impl true
  def handle_in("new_msg", %{"body" => body}, socket) do
    broadcast!(socket, "new_msg", %{body: body, user: socket.assigns.user_id})
    {:noreply, socket}
  end

  @impl true
  def handle_info(:after_join, socket) do
    push(socket, "welcome", %{msg: "Welcome to room #{socket.assigns.room_id}!"})
    {:noreply, socket}
  end
end

Joining a channel

# JavaScript (assets/js/app.js)
import {Socket} from "phoenix"
import {LiveSocket} from "phoenix_live_view"

let socket = new Socket("/socket", {params: {token: userToken}})
socket.connect()

let channel = socket.channel("rooms:42", {})
channel.join()
  .receive("ok", resp => { console.log("Joined!", resp) })
  .receive("error", resp => { console.log("Unable to join", resp) })

channel.on("new_msg", payload => {
  console.log(payload.body)
})

Sending messages

From client to server

channel.push("new_msg", {body: "Hello!"})

// With timeout and reply handling
channel.push("new_msg", {body: "Hello!"}, 10000)
  .receive("ok", resp => console.log("Sent!", resp))
  .receive("error", resp => console.log("Error!", resp))
  .receive("timeout", () => console.log("Timed out"))

From server to client

# Push to the current socket
push(socket, "notification", %{msg: "You have a new message"})

# Broadcast to all subscribers on the channel
broadcast!(socket, "new_msg", %{body: body})

# Broadcast from anywhere (not in a channel)
MyAppWeb.Endpoint.broadcast("rooms:42", "new_msg", %{body: "System message"})

# Broadcast to a user's socket
MyAppWeb.Endpoint.broadcast("user_socket:#{user_id}", "notification", %{msg: "Hi"})

Intercepting broadcasts

When you need to authorize or transform broadcasts before they reach clients:

defmodule MyAppWeb.RoomChannel do
  use Phoenix.Channel

  intercept ["new_msg"]

  @impl true
  def handle_out("new_msg", payload, socket) do
    if authorized?(socket.assigns.user_id, payload) do
      push(socket, "new_msg", payload)
    end
    {:noreply, socket}
  end
end

Channel callbacks

Callback Purpose
join/3 Authorize and initialize channel subscription
handle_in/3 Handle messages from the client
handle_out/3 Intercept/transform broadcasts to clients
handle_info/2 Handle Elixir messages (from send/2, PubSub, etc.)
terminate/2 Cleanup when channel process exits

PubSub directly

Use PubSub for server-side broadcasting without defining a channel:

# Subscribe a process
Phoenix.PubSub.subscribe(MyApp.PubSub, "updates:#{user_id}")

# Broadcast a message
Phoenix.PubSub.broadcast(MyApp.PubSub, "updates:#{user_id}", {:user_updated, user})

# Handle the message (in LiveView)
@impl true
def handle_info({:user_updated, user}, socket) do
  {:noreply, assign(socket, :user, user)}
end

Presence

Track which users are online in a topic:

defmodule MyAppWeb.Presence do
  use Phoenix.Presence,
    otp_app: :my_app,
    pubsub_server: MyApp.PubSub

  def fetch(_topic, presences) do
    # Enrich presence data (e.g., add user names)
    presences
  end
end

Server-side usage

# Track a user in a topic
{:ok, _} = MyAppWeb.Presence.track(socket, "rooms:42", socket.assigns.user_id, %{
  name: socket.assigns.username,
  online_at: inspect(System.system_time(:second))
})

# List presences
MyAppWeb.Presence.list("rooms:42")
# => %{"1" => [%{metas: [%{name: "Alice", online_at: "1234"}]}]}

Client-side usage

let presence = new Presence(channel)

presence.onSync(() => {
  renderUsers(presence.list())
})

presence.onJoin((id, current, leavePres) => {
  console.log(`${id} joined`)
})

presence.onLeave((id, current, leavePres) => {
  console.log(`${id} left`)
})

Channel error handling

# Reject a join
def join("rooms:" <> _room_id, _payload, socket) do
  if authorized?(socket) do
    {:ok, socket}
  else
    {:error, %{reason: "unauthorized"}}
  end
end

# Stop the channel
def handle_in("leave", _payload, socket) do
  {:stop, :normal, socket}
end

# Reply to a push
def handle_in("ping", payload, socket) do
  {:reply, {:ok, payload}, socket}
end

def handle_in("bad_request", _payload, _socket) do
  {:reply, {:error, %{reason: "invalid"}}, socket}
end