Single process pub-sub

The previous example was admittedly quite simple. Let’s build on that foundation (pun intended) to do something a bit more interesting. Suppose we have a workflow on our site like the following:

  1. Enter some information on page X, and submit.

  2. Submission starts a background job, and the user is redirected to a page to view status of that job.

  3. That second page will subscribe to updates from the background job and display them to the user.

The core principle here is the ability to let one thread publish updates, and have another thread subscribe to receive those updates. This is known generally as pub/sub, and fortunately is very easy to achieve in Haskell via STM.

Like the previous chapter, let me start off with the caveat: this technique only works properly if you have a single web application process. If you have two different servers and a load balancer, you’d either need sticky sessions or some other solution to make sure that the requests from a single user are going to the same machine. In those situations, you may want to consider using an external pubsub solution, such as Redis.

With that caveat out of the way, let’s get started.

Foundation datatype

We’ll need two different mutable references in our foundation. The first will keep track of the next "job id" we’ll hand out. Each of these background jobs we’ll be represented by a unique identifier, which will be used in our URLs. The second piece of data will be a map from the job ID to the broadcast channel used for publishing updates. In code:

data App = App
    { jobs    :: TVar (IntMap (TChan (Maybe Text)))
    , nextJob :: TVar Int
    }

Notice that our TChan contains Maybe Text values. The reason for the Maybe wrapper is so that we can indicate that the channel is complete, by providing a Nothing value.

Allocating a job

In order to allocate a job, we need to:

  1. Get a job ID.

  2. Create a new broadcast channel.

  3. Add the channel to the channel map.

Due to the beauty of STM, this is pretty easy.

(jobId, chan) <- liftIO $ atomically $ do
    jobId <- readTVar nextJob
    writeTVar nextJob $! jobId + 1
    chan <- newBroadcastTChan
    m <- readTVar jobs
    writeTVar jobs $ IntMap.insert jobId chan m
    return (jobId, chan)

Fork our background job

There are many different ways we could go about this, and they depend entirely on what the background job is going to be. Here’s a minimal example of a background job that prints out a few messages, with a 1 second delay between each message. Note how after our final message, we broadcast a Nothing value and remove our channel from the map of channels.

liftIO $ forkIO $ do
    threadDelay 1000000
    atomically $ writeTChan chan $ Just "Did something\n"
    threadDelay 1000000
    atomically $ writeTChan chan $ Just "Did something else\n"
    threadDelay 1000000
    atomically $ do
        writeTChan chan $ Just "All done\n"
        writeTChan chan Nothing
        m <- readTVar jobs
        writeTVar jobs $ IntMap.delete jobId m

View progress

For this demonstration, I’ve elected for a very simple progress viewing: a plain text page with stream response. There are a few other possibilities here: an HTML page that auto-refreshes every X seconds or using eventsource or websockets. I encourage you to give those a shot also, but here’s the simplest implementation I can think of:

getViewProgressR jobId = do
    App {..} <- getYesod
    mchan <- liftIO $ atomically $ do
        m <- readTVar jobs
        case IntMap.lookup jobId m of
            Nothing -> return Nothing
            Just chan -> fmap Just $ dupTChan chan
    case mchan of
        Nothing -> notFound
        Just chan -> respondSource typePlain $ do
            let loop = do
                    mtext <- liftIO $ atomically $ readTChan chan
                    case mtext of
                        Nothing -> return ()
                        Just text -> do
                            sendChunkText text
                            sendFlush
                            loop
            loop

We start off by looking up the channel in the map. If we can’t find it, it means the job either never existed, or has already been completed. In either event, we return a 404. (Another possible enhancement would be to store some information on all previously completed jobs and let the user know if they’re done.)

Assuming the channel exists, we use respondSource to start a streaming response. We then repeatedly call readTChan until we get a Nothing value, at which point we exit (via return ()). Notice that on each iteration, we call both sendChunkText and sendFlush. Without that second call, the user won’t receive any updates until the output buffer completely fills up, which is not what we want for a real-time update system.

Complete application

For completeness, here’s the full source code for this application:

{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes       #-}
{-# LANGUAGE RecordWildCards   #-}
{-# LANGUAGE TemplateHaskell   #-}
{-# LANGUAGE TypeFamilies      #-}
import           Control.Concurrent     (forkIO, threadDelay)
import           Control.Concurrent.STM
import           Data.IntMap            (IntMap)
import qualified Data.IntMap            as IntMap
import           Data.Text              (Text)
import           Yesod

data App = App
    { jobs    :: TVar (IntMap (TChan (Maybe Text)))
    , nextJob :: TVar Int
    }

mkYesod "App" [parseRoutes|
/ HomeR GET POST
/view-progress/#Int ViewProgressR GET
|]

instance Yesod App

getHomeR :: Handler Html
getHomeR = defaultLayout $ do
    setTitle "PubSub example"
    [whamlet|
        <form method=post>
            <button>Start new background job
    |]

postHomeR :: Handler ()
postHomeR = do
    App {..} <- getYesod
    (jobId, chan) <- liftIO $ atomically $ do
        jobId <- readTVar nextJob
        writeTVar nextJob $! jobId + 1
        chan <- newBroadcastTChan
        m <- readTVar jobs
        writeTVar jobs $ IntMap.insert jobId chan m
        return (jobId, chan)
    liftIO $ forkIO $ do
        threadDelay 1000000
        atomically $ writeTChan chan $ Just "Did something\n"
        threadDelay 1000000
        atomically $ writeTChan chan $ Just "Did something else\n"
        threadDelay 1000000
        atomically $ do
            writeTChan chan $ Just "All done\n"
            writeTChan chan Nothing
            m <- readTVar jobs
            writeTVar jobs $ IntMap.delete jobId m
    redirect $ ViewProgressR jobId

getViewProgressR :: Int -> Handler TypedContent
getViewProgressR jobId = do
    App {..} <- getYesod
    mchan <- liftIO $ atomically $ do
        m <- readTVar jobs
        case IntMap.lookup jobId m of
            Nothing -> return Nothing
            Just chan -> fmap Just $ dupTChan chan
    case mchan of
        Nothing -> notFound
        Just chan -> respondSource typePlain $ do
            let loop = do
                    mtext <- liftIO $ atomically $ readTChan chan
                    case mtext of
                        Nothing -> return ()
                        Just text -> do
                            sendChunkText text
                            sendFlush
                            loop
            loop

main :: IO ()
main = do
    jobs <- newTVarIO IntMap.empty
    nextJob <- newTVarIO 1
    warp 3000 App {..}

Note: You are looking at version 1.2 of the book, which is two versions behind

Chapters