Overview
This is the fifth (and probably final) part in a series on conduits, an approach to handling streams of data. You can see the previous posts at:
This post is mostly about the concept of buffering. We'll also cover a few other miscealaneous topics as well.
Inversion of Control
Buffering was actually one of the main motivations in the creation of the
conduit
package. To see its importance, we need to consider the approach we've
seen so far, which we'll call inversion of control, or IoC.
Suppose you want to count how many newline characters there are in a file. In the standard imperative approach, you would do someting like:
- Open the file
- Pull some data into a buffer
- Loop over the values in the buffer, incrementing a counter on each newline character
- Return to 2
- Close the file
Notice that your code is explicitly calling out to other code and that code is returning control back to your code. You have retained full control of the flow of execution of your program. The conduit approach we've seen so far does not work this way. Instead, you would:
- Write a sink that counts newlines and adds the result to an accumulator.
- Connect the sink to a source
There's no doubt in my mind that this is an easier approach. You don't have to worry about opening and closing files or pulling data from the file. Instead, the data you need to process is simply presented to you. This is the advantage of IoC: you can focus on specifically your piece of the code.
We use this IoC approach all over Haskell: for example, instead of readMVar
and putMVar
, you can use withMVar
. Don't bother with
openFile
and closeFile
, just use withFile
and
pass in a function that uses the Handle
. Even C has a version of this: why
malloc
and free
when you could just
alloca
?
Actually, that last one is a huge red herring. Of course you can't just use
alloca
for everything. alloca
only allocates memory locally on
the stack, not dynamically on the heap. There's no way to return your allocated memory outside
the current function.
But actually, the same restriction applies to the whole family of with
functions: you can never return an allocated resource outside of the "block". Usually this works
out just fine, but we need to recognize that this is a change in how we structure our
programs. Often times, with simple examples, this is a minor change. However, in larger settings
this can become very difficult to manage, bordering on impossible at times.
A web server
Let's say we're going to write a web server. We're going to use the following low-level operations:
data Socket recv :: Socket -> Int -> IO ByteString -- returns empty when the socket is closed sendAll :: Socket -> ByteString -> IO ()
We're up to the part where we need to implement the function handleConn
that
handles an individual connection. It will look something like this:
data Request -- request headers, HTTP version, etc data Response -- status code, response headers, resposne body type Application = Request -> IO Response handleConn :: Application -> Socket -> IO ()
What does our handleConn
need to do? In broad strokes:
- Parse the request line
- Parse the request headers
- Construct the
Request
value - Pass
Request
to theApplication
and get back aResponse
- Send the
Response
over theSocket
We start off by writing steps 1 and 2 manually, without using conduits. We'll do this very simply and just assume three space-separated strings. We end up with something that looks like:
data RequestLine = RequestLine ByteString ByteString ByteString parseRequestLine :: Socket -> IO RequestLine parseRequestLine socket = do bs <- recv socket 4096 let (method:path:version:ignored) = S8.words bs return $ RequestLine method path version
There are two issues here: it doesn't handle the case where there are less than three words in the chunk of data, and it throws away any extra data. We can definitely solve both of these issues manually, but it's very tedious. It's much easier to implement this in terms of conduits.
import Data.ByteString (ByteString) import qualified Data.ByteString as S import Data.Conduit import qualified Data.Conduit.Binary as CB import qualified Data.Conduit.List as CL data RequestLine = RequestLine ByteString ByteString ByteString parseRequestLine :: Sink ByteString IO RequestLine parseRequestLine = do let space = toEnum $ fromEnum ' ' let getWord = do CB.dropWhile (== space) bss <- CB.takeWhile (/= space) =$ CL.consume return $ S.concat bss method <- getWord path <- getWord version <- getWord return $ RequestLine method path version
This means that our code will automatically be supplied with more data as it comes in, and any
extra data will automatically be buffered in the Source
, ready for the next time
it's used. Now we can easily structure our program together, demonstrating the power of the
conduits approach:
import Data.ByteString (ByteString) import Data.Conduit import Data.Conduit.Network (sourceSocket) import Control.Monad.IO.Class (liftIO) import Network.Socket (Socket) data RequestLine = RequestLine ByteString ByteString ByteString type Headers = [(ByteString, ByteString)] data Request = Request RequestLine Headers data Response = Response type Application = Request -> IO Response parseRequestHeaders :: Sink ByteString IO Headers parseRequestHeaders = undefined parseRequestLine :: Sink ByteString IO RequestLine parseRequestLine = undefined sendResponse :: Socket -> Response -> IO () sendResponse = undefined handleConn :: Application -> Socket -> IO () handleConn app socket = do req <- runResourceT $ sourceSocket socket $$ do requestLine <- parseRequestLine headers <- parseRequestHeaders return $ Request requestLine headers res <- liftIO $ app req liftIO $ sendResponse socket res
Whither the request body?
This is all great, until we realize we can't read the request body. The
Application
is simply given the Request
, and lives in the
IO
monad. It has no access whatsoever to the incoming stream of data.
There's an easy fix for this actually: have the Application
live in the
Sink
monad. This is the very approach we took with
enumerator-based WAI 0.4. However, there are two problems:
- People find it confusing. What people expect is that the
Request
value would have arequestBody
value of typeSource
. - This makes certain kinds of usage incredibly difficult. For example, trying to write an HTTP
proxy combining WAI and
http-enumerator
proved to be almost impossible.
This is the downside of inversion of control. Our code wants to be in control. It wants to be given something to pull from, something to push to, and run with it. We need some solution to the problem.
The simplest solution would be to just create a new Source
and pass that to
the Application
. Unfortunately, this will cause problems with our buffering. You
see, when we connect our source to the parseRequestLine
and
parseRequestHeaders
sinks, it made a call to recv
. If the data
it received was not enough to cover all of the headers, it would issue another call. When it had
enough data, it would stop. However, odds are that it didn't stop exactly at the end of
the headers. It likely consumed a bit of the request body as well.
If we just create a new source and pass that to the request, it will be missing the beginning of the request body. We need some way to pass that buffered data along.
BufferedSource
And so we finally get to introduce the last data type in conduits:
BufferedSource
. This is an abstract data type, but all it really does is keep a
mutable reference to a buffer and an underlying Source
. In order to create one
of these, you use the bufferSource
function.
bufferSource ::Resource m => Source m a -> ResourceT m (BufferedSource m a)
This one little change is what allows us to easily solve our web server dilemna. Instead of
connecting a Source
to our parsing Sink
s, we use a
BufferedSource
. At the end of each connection, any leftover data is put back on
the buffer. For our web server case, we can now create a BufferedSource
, use
that to read the request line and headers, and then pass that same
BufferedSource
to the application for reading the request body.
Typeclass
We want to be able to connect a buffered source to a sink, just like we would a regular source.
We would also like to be able to fuse it to a conduit. In order to make this convenient, conduit
has a typeclass, IsSource
. There are instances provided for both
Source
and BufferedSource
. Both the connect
($$
) and left-fuse ($=
) operators use this typeclass.
There's one "gotcha" in the BufferedSource
instance of this typeclass, so
let's explain it. Suppose we want to write a file copy function, without any buffering. This is a
fairly standard usage of conduits:
sourceFile input $$ sinkFile output
When this line is run, both the input and output files are opened, the data is copied, and then both files are closed. Let's change this example slightly to use buffering:
bsrc <- bufferSource $ sourceFile input bsrc $$ isolate 50 =$ sinkFile output1 bsrc $$ sinkFile output2
When is the input file opened and closed? The opening occurs on the first line, when buffering
the source. And if we follow the normal rules from sources, the file should be closed after the
second line. However, if we did that, we couldn't reuse bsrc
for line 3!
So instead, $$
does not close the file. As a result, you can pass a
buffered source to as many actions as you want, without concerns that the file handle has been
closed out from under you.
This presents one caveat: when you're finished with a buffered source, you should manually call
bsourceClose
on it. However, as usual, this is merely an optimization, as the
source will automatically be closed when runResourceT
is called.
Recapping the web server
So what exactly does our web server look like now?
import Data.ByteString (ByteString) import Data.Conduit import Data.Conduit.Network (sourceSocket) import Control.Monad.IO.Class (liftIO) import Network.Socket (Socket) data RequestLine = RequestLine ByteString ByteString ByteString type Headers = [(ByteString, ByteString)] data Request = Request RequestLine Headers (BufferedSource IO ByteString) data Response = Response type Application = Request -> ResourceT IO Response parseRequestHeaders :: Sink ByteString IO Headers parseRequestHeaders = undefined parseRequestLine :: Sink ByteString IO RequestLine parseRequestLine = undefined sendResponse :: Socket -> Response -> IO () sendResponse = undefined handleConn :: Application -> Socket -> IO () handleConn app socket = runResourceT $ do bsrc <- bufferSource $ sourceSocket socket requestLine <- bsrc $$ parseRequestLine headers <- bsrc $$ parseRequestHeaders let req = Request requestLine headers bsrc res <- app req liftIO $ sendResponse socket res
We've made a few minor changes. Firstly, the Application
now lives in the
ResourceT IO
monad. This isn't strictly necessary, but it's very convenient:
the application can now register cleanup actions that will only take place after the response has
been fully sent to the client.
But the major changes are in the handleConn
function. We now start off by
buffering our source. This buffered source is then used twice in our function, and then passed
off to the application.
That's all folks!
Thanks for making it through this very long series of posts, I hope it's been informative. The next step is to dive into the conduit packages on Hackage. Also, stay tuned in the next few weeks for an all new, all conduit release of Yesod.
My hope is that the simplicity afforded by conduits will allow people not alone to become more involved in playing around with code, but will let people make even more interesting combinations of the existing packages. I'm looking forward to seeing the results.