Playing Poker with Elixir (pt. 5)

In this series of posts, we've been working through implementing a poker application using Elixir. So far, the focus has been on the backend - we've implemented a set of GenServers to manage poker hands and tables.

Today, we'll transition up the stack. We'll add the Phoenix framework to our project and use its channels to allow web clients to communicate with our existing code.

Going real-time

For me, one of Phoenix's big draws is its real-time capabilities. As a developer frequently writing applications in Rails, real-time has been a challenge. Typically, I've added this type of functionality by offloading it to a third-party service like Pusher. Rails 5 promises a built-in solution via ActionCable, but given Ruby's performance, I'm skeptical that it will be a viable solution for more demanding use cases.

In Elixir, concurrency is built into the programming language. As a result, real-time systems with large numbers of connections are within reach. Before we implement any functionality, let's dive into the two main abstractions Phoenix provides to help us: sockets and channels.

Sockets

A Phoenix socket manages a single connection between a client and server. It's declared within a Phoenix endpoint. For example:

defmodule GenPoker.Endpoint do
  use Phoenix.Endpoint, otp_app: :gen_poker

  socket "/socket", GenPoker.PlayerSocket
end

The module we specify needs to implement the connect/2 and id/1 functions. When a client establishes a connection to the socket defined in the endpoint, these callbacks provide a means of authenticating and identifying the connecting client.

The socket also serves as a place to maintain persistent state for the connection. For example, a typical connect callback might want to keep track of the user id of the connected user. Much like Plug.Conn, the socket struct has an assigns map for this purpose.

It's worth noting that Phoenix's sockets are not WebSocket-specific. Each socket can define a set of transport modules to handle the network connection details. WebSocket and long-polling based modules are available out of the box.

Channels

Sockets themselves are only part of the picture. To send and receive messages, a client must join a channel. Within a socket module, a channel can be declared as follows:

defmodule GenPoker.PlayerSocket do
  use Phoenix.Socket

  channel "tables:*", GenPoker.TableChannel
end

The first argument to the channel/3 macro defines the "topic pattern" of the channel. Using a wildcard allows multiple topics to match a single channel (for example, imagine a chat service with multiple rooms). The matched topic will be available in our channel module's functions.

Like sockets, we need to implement a callback module for the channel behavior. Its join/3 function is the only required one (although our channel won't do much without implementing handle_in/3). The join function lets us perform channel-specific authorization and handle_in is called upon receiving a message from a client.

If you look at the other callbacks in the channel behaviour, you'll notice that the terminate/2 and handle_info/2 seem a bit familiar. It turns out that a channel runs in a separate process, within a Phoenix.Channel.Server GenServer. Each time a user joins a channel, a new server is spawned to handle that user's messages. These separate processes are key to how Phoenix can remain responsive with massive numbers of concurrent users.

Enough talk. Let's code!

There's a lot more to learn about Phoenix, but let's pause for now and use what we've learned for our app. We'll start with the socket. Here's the code:

defmodule GenPoker.PlayerSocket do
  use Phoenix.Socket

  transport :websocket, Phoenix.Transports.WebSocket

  def connect(%{"playerId" => player_id}, socket) do
    {:ok, assign(socket, :player_id, player_id)}
  end

  def id(socket) do 
    "players_socket:#{socket.assigns.player_id}"
  end
end

The connect callback receives any parameters the client passed when creating the socket. We've skipped authentication, but traditionally, we would perform it here by having our backend generate a signed token for the client to pass back.

Registering processes

To be able to communicate with the GenServers from the previous posts, we'll need to create some channels. First, let's address the table server. Players will join a channel on a topic that matches a specific table. When they send messages, we'll route them to the appropriate GenServer and send the server's response back to the client. We'll also broadcast any updates on the table to all connected players.

Our channels need some way to locate the correct table process, so we'll need to register our processes. Here are the updates to the table server:

defmodule Poker.Table do
  use GenServer

  def start_link(table_name, sup, storage, num_seats) do
    GenServer.start_link(
      __MODULE__, 
      [table_name, sup, storage, num_seats], 
      name: via_tuple(table_name)
    )
  end

  defp via_tuple(table) do 
    {:via, :gproc, {:n, :l, {:table, table}}}
  end

  def whereis(table) do
    :gproc.whereis_name({:n, :l, {:table, table}})
  end
end

We're using the gproc library to register our process, which gives us a bit more flexibility than Erlang's built-in process registration. It allows us to use any term to register a process, rather than a simple atom.

The n atom tells gproc we're registering a name for the current process, and the l atom makes the registration local to the node (as opposed to globally across a cluster). For the name of the process, we're using a tuple, the second element of which is a string (for example, "table_one"). We'll map the topic of the joined channel ("tables:table_one") to this string.

There's a lot more to gproc than I'm covering here. For more information, here's a good introduction to it. Let's add the channel to make use of this code:

module GenPoker.TableChannel do
  use GenPoker.Web, :channel
  alias Poker.Table

  def join("tables:" <> table, _payload, socket) do
    {:ok, assign(socket, :table, table)}
  end

  def handle_in(command, payload, socket) 
    when command in ~w(sit leave buy_in cash_out deal) 
  do
    table = Table.whereis(socket.assigns.table)
    arguments = [table, socket.assigns.player_id] ++ payload
    result = apply(Table, String.to_atom(command), arguments)
    if result == :ok do
      broadcast! socket, "update", Table.get_state(table)
    end
    {:reply, result, socket}
  end
end

When the player connects, we save the table name in our assigns map. Upon receiving a message, we locate the table process and use apply/3 to call the appropriate function, passing the player_id we assigned in our socket. Note that normally using String.to_atom on user-supplied input would not be safe, since the Erlang VM doesn't garbage collect atoms. Here, we're using a guard clause as protection.

If the command succeeds, we use Phoenix.Channel's broadcast function to send a message to all clients subscribed to the socket's topic. Either way, we respond to the client by returning a reply tuple.

Phoenix expects this tuple to be one of two types - either a status atom or a status/map tuple. We can use the status to register callbacks in the client. For example, in our javascript, we can do:

channel.push("message", arguments)
  .receive("ok", (msg) => console.log("Got OK!"))
  .receive("error", (msg) => console.log("Oops!"))

When we receive a reply from the server, the client will execute the callback with the matching status. If we passed a map in our reply tuple, it will be passed to the client's callback as the msg parameter.

Sending the initial state

There's a problem with our code - when a client joins the channel, they don't get an update of the table's state until it changes later. We can attempt to solve the problem by pushing the client a message when joining:

def join("tables:" <> table, _payload, socket) do
  state = table |> Table.whereis |> Table.get_state
  push socket, "update", state
  {:ok, assign(socket, :table, table)}
end

However, this code won't work. Phoenix helpfully raises an error letting us know we can't push or broadcast to a socket until it's been joined successfully.

Thankfully, Phoenix offers the solution in its error message - it tells us to send a message to ourself in the join callback. Since we're running in our own GenServer-like process, we'll be able to use the handle_info callback to push the initial state later:

def join("tables:" <> table, _payload, socket) do
  send self, :after_join
  {:ok, assign(socket, :table, table)}
end

def handle_info(:after_join, socket) do
  state = socket.assigns.table |> Table.whereis |> Table.get_state
  push socket, "update", state
  {:noreply, socket}
end

def handle_info(_, socket) do
  {:noreply, socket}
end
Intercepting messages

We'll create a separate channel for hand-related messages. Separating the messages into two channels will simplify our code, making the channels a thin communication layer around the servers.

For the most part, the code will be identical, but there's one caveat. We need to take care to send each player's private hand only to them, so we can't broadcast the same message to every socket.

Luckily, Phoenix provides a means of intercepting outgoing broadcasts on a per-socket basis. First, we need to tell Phoenix which events we'd like to intercept by using the intercept macro in our channel:

intercept ["update"]

Phoenix requires we explicitly declare which events we want to intercept. It does this for performance reasons - if the event doesn't need to be intercepted, Phoenix can send it directly to the transport process, skipping the channel server entirely.

We also need to define a handle_out function:

def handle_out("update", state, socket) do
  push socket, "update", hide_other_hands(state, socket)
  {:noreply, socket}
end

defp hide_other_hands(state, socket) do
  player_id = socket.assigns.player_id
  hide_hand_if_current_player = fn
    %{id: ^player_id} = player -> player
    player -> Map.delete(player, :hand)
  end

  update_in(state.players, fn players ->
    Enum.map(players, hide_hand_if_current_player)
  end)
end

Since we're intercepting the event, we need to push a replacement message explicitly. We define a closure that we'll use to map over the hand's players. It uses pattern matching to leave the current socket's player unchanged, while stripping the hand from all other players.

Wrapping up

It's no surprise that the Phoenix framework made adding real-time features remarkably easy - it was created with that in mind. If you're interested in reading through all the code, it's available on GitHub. Thanks for reading!