Conduits
Conduits are a solution to the streaming data problem. They fit the same solution space as enumerators, in that we want to have deterministic resource handling, constant memory usage, and composability. However, conduits have been designed based on a huge amount of experience using enumerators in real life projects. They intend to solve the same problems in a simpler, more robust way.
This post is not intended to be a detailed comparison between enumerators and conduits, though it will be impossible to avoid some comparison. Instead, the goal is to explain the design decisions and usage of conduits, and give enough information that a reader familiar with both concepts can draw his/her own conclusions. In addition, the README file of the conduit repo on Github provides some of the motivation for conduits.
Note that this article will assume a basic understanding of the Resource monad transformer, which was described in a previous article. Also, conduits have not yet been released to Hackage, though the code in question is mostly stable, and is in fact in production use already.
Goals
We often talk vaguely about having streaming, composable data. Let's give some concrete ideas of what we want out of this library:
- A unified interface for streaming data from and piping data into various places, whether they be files, sockets, or memory-based.
- Some technique for modifying this stream of data in some way, such as applying HTTP chunking to a stream of bytes of decoding a stream of bytes into a stream of characters.
- Deterministic resource management. If we read data from a file, we want the file to be closed immediately after an EOF is reached. We should not need to wait for any finalizer to be called. (This is a common complaint against lazy I/O.)
- Exception safety. All resources should be freed, even in the presence of exceptions, and even if those exceptions are asynchronous.
- Any part of the conduit pipeline- including the data consumer- should be able to safely acquire scarce resources and know that they will be released. (This is a downside of the enumerator approach, and one of our main design goals.)
- Conduits should interoperate well with monad transformer stacks. Almost all frameworks today use a monad stack in some way, and we want easy interoperability.
- The developer should be free to choose the control flow of the program. (This is another complaint against enumerator, which creates a form of "inversion of control.")
- Finally, conduits should be simple. The majority of this article should be understandable by almost any Haskell developer. If not, I've failed in either my writing or my coding.
Conduits in Five Minutes
While a good understanding of the lower-level mechanics of conduits is advisable, you can get very far without it. Let's start off with some high-level examples. Don't worry if some of the details seem a bit magical right now. We'll cover everything in the course of this series. Let's start with the terminology, and then some sample code.
- Source
- A producer of data. The data could be in a file, coming from a socket, or in memory as a list. To access this data, we pull from the source.
- Sink
- A consumer of data. Basic examples would be a sum function (adding up a stream of numbers fed in), a file sink (which writes all incoming bytes to a file), or a socket. We push data into a sink. When the sink finishes processing (we'll explain that later), it returns some value.
- Conduit
- A transformer of data. The simplest example is a map function, though there are many others. Like a sink, we push data into a conduit. But instead of returning a single value at the end, a conduit can return multiple outputs every time it is pushed to.
- Fuse
- (Thanks to David Mazieres for the term.) A conduit can be fused with a source
to produce a new, modified source (the
$=
operator). For example, you could have a source that reads bytes from a file, and a conduit that decodes bytes into text. If you fuse them together, you would now have a source that reads text from a file. Likewise, a conduit and a sink can fuse into a new sink (=$
), and two conduits can fuse into a new conduit (=$=
). - Connect
- You can connect a source to a sink using the
$$
operator. Doing so will pull data from the source and push it to the sink, until either the source or sink signals that they are "done."
Let's see some examples of conduit code.
{-# LANGUAGE OverloadedStrings #-} import Data.Conduit -- the core library import qualified Data.Conduit.List as CL -- some list-like functions import qualified Data.Conduit.Binary as CB -- bytes import qualified Data.Conduit.Text as CT import Data.ByteString (ByteString) import Data.Text (Text) import qualified Data.Text as T import Control.Monad.ST (runST) -- Let's start with the basics: connecting a source to a sink. We'll use the -- built in file functions to implementing efficient, constant-memory, -- resource-friendly file copying. -- -- Two things to note: we use $$ to connect our source to our sink, and then -- use runResourceT. copyFile :: FilePath -> FilePath -> IO () copyFile src dest = runResourceT $ CB.sourceFile src $$ CB.sinkFile dest -- The Data.Conduit.List module provides a number of helper functions for -- creating sources, sinks, and conduits. Let's look at a typical fold: summing -- numbers. sumSink :: Resource m => Sink Int m Int sumSink = CL.fold (+) 0 -- If we want to go a little more low-level, we can code our sink with the -- sinkState function. This function takes three parameters: an initial state, -- a push function (receive some more data), and a close function. sumSink2 :: Resource m => Sink Int m Int sumSink2 = sinkState 0 -- initial value -- update the state with the new input and -- indicate that we want more input (\accum i -> return $ StateProcessing (accum + i)) (\accum -> return accum) -- return the current accum value on close -- Another common helper function is sourceList. Let's see how we can combine -- that function with our sumSink to reimplement the built-in sum function. sum' :: [Int] -> Int sum' input = runST $ runResourceT $ CL.sourceList input $$ sumSink -- Since this is Haskell, let's write a source to generate all of the -- Fibonacci numbers. We'll use sourceState. The state will contain the next -- two numbers in the sequence. We also need to provide a pull function, which -- will return the next number and update the state. fibs :: Resource m => Source m Int fibs = sourceState (0, 1) -- initial state (\(x, y) -> return $ StateOpen (y, x + y) x) -- Suppose we want to get the sum of the first 10 Fibonacci numbers. We can use -- the isolate conduit to make sure the sum sink only consumes 10 values. sumTenFibs :: Int sumTenFibs = runST -- runs fine in pure code $ runResourceT $ fibs $= CL.isolate 10 -- fuse the source and conduit into a source $$ sumSink -- We can also fuse the conduit into the sink instead, we just swap a few -- operators. sumTenFibs2 :: Int sumTenFibs2 = runST $ runResourceT $ fibs $$ CL.isolate 10 =$ sumSink -- Alright, let's make some conduits. Let's turn our numbers into text. Sounds -- like a job for a map... intToText :: Int -> Text -- just a helper function intToText = T.pack . show textify :: Resource m => Conduit Int m Text textify = CL.map intToText -- Like previously, we can use a conduitState helper function. But here, we -- don't even need state, so we provide a dummy state value. textify2 :: Resource m => Conduit Int m Text textify2 = conduitState () (\() input -> return $ StateProducing () [intToText input]) (\() -> return []) -- Let's make the unlines conduit, that puts a newline on the end of each piece -- of input. We'll just use CL.map; feel free to write it with conduitState as -- well for practice. unlines' :: Resource m => Conduit Text m Text unlines' = CL.map $ \t -> t `T.append` "\n" -- And let's write a function that prints the first N fibs to a file. We'll -- use UTF8 encoding. writeFibs :: Int -> FilePath -> IO () writeFibs count dest = runResourceT $ fibs $= CL.isolate count $= textify $= unlines' $= CT.encode CT.utf8 $$ CB.sinkFile dest -- We used the $= operator to fuse the conduits into the sources, producing a -- single source. We can also do the opposite: fuse the conduits into the sink. We can even combine the two. writeFibs2 :: Int -> FilePath -> IO () writeFibs2 count dest = runResourceT $ fibs $= CL.isolate count $= textify $$ unlines' =$ CT.encode CT.utf8 =$ CB.sinkFile dest -- Or we could fuse all those inner conduits into a single conduit... someIntLines :: ResourceThrow m -- encoding can throw an exception => Int -> Conduit Int m ByteString someIntLines count = CL.isolate count =$= textify =$= unlines' =$= CT.encode CT.utf8 -- and then use that conduit writeFibs3 :: Int -> FilePath -> IO () writeFibs3 count dest = runResourceT $ fibs $= someIntLines count $$ CB.sinkFile dest main :: IO () main = do putStrLn $ "First ten fibs: " ++ show sumTenFibs writeFibs 20 "fibs.txt" copyFile "fibs.txt" "fibs2.txt"
Source
I think it's simplest to understand sources by looking at the types:
data SourceResult m a = Open (Source m a) a | Closed data Source m a = Source { sourcePull :: ResourceT m (SourceResult m a) , sourceClose :: ResourceT m () }
A source has just two operations on it: you can pull data from it, and you can close it (think
of closing a file handle). When you pull, you either get some data and the a new
Source
(the source is still open), or nothing (the source is closed). Let's
look at some of the simplest sources:
import Prelude hiding (repeat) import Data.Conduit -- | Never give any data eof :: Monad m => Source m a eof = Source { sourcePull = return Closed , sourceClose = return () } -- | Always give the same value repeat :: Monad m => a -> Source m a repeat a = Source { sourcePull = return $ Open (repeat a) a , sourceClose = return () }
These sources are very straight-forward, since they always return the same results.
Additionally, their close records don't do anything. You might think that this is a bug:
shouldn't a call to sourcePull
return Closed
after it's been
closed? This isn't required, since one of the rules of sources is that they can never be
reused. In other words:
- If a
Source
returnsOpen
, it has provided you with a newSource
which you should use in place of the original one. - If it returns
Closed
, then you cannot perform any more operations on it.
Don't worry too much about the invariant. In practice, you will almost never call
sourcePull
or sourceClose
yourself. In fact, you hardly
even write them yourself either (that's what sourceState
and
sourceIO
are for). The point is that we can make some assumptions when we
implement our sources.
State
There is something similar about the two sources mentioned above: they never change. They always return the same value. In other words, they have no state. For almost all serious sources, we'll need some kind of state.
The way we store state in a source is by updating the returned Source
value in
the Open
constructor. This is best seen with an example.
import Data.Conduit import Control.Monad.Trans.Resource -- | Provide data from the list, one element at a time. sourceList :: Resource m => [a] -> Source m a sourceList list = Source { sourcePull = case list of [] -> return Closed -- no more data -- This is where we store our state: by return a new -- Source with the rest of the list x:xs -> return $ Open (sourceList xs) x , sourceClose = return () }
Each time we pull from the source, it checks the input list. If the list is empty, pulling
returns Closed
, which makes sense. If the list is not empty, pulling returns
Open
with both the next value in the list, and a new Source
value containing the rest of the input list.
sourceState and sourceIO
In addition to being able to manually create Source
s, we also have a few
convenience functions that allow us to create most sources in a more high-level fashion.
sourceState
let's you write code similar to how you would use the
State
monad. You provide an initial state, your pull function is provided with
the current state, and it returns a new state and a return value. Let's use this to reimplement
sourceList.
import Data.Conduit import Control.Monad.Trans.Resource -- | Provide data from the list, one element at a time. sourceList :: Resource m => [a] -> Source m a sourceList state0 = sourceState state0 pull where pull [] = return StateClosed pull (x:xs) = return $ StateOpen xs x
Notice the usage of the StateClosed
and StateOpen
constructors. These are very similar to Closed
and Open
, except
that instead of specifying the next Source
to be used, you provide the next
state (here, the remainder of the list).
The other common activity is to perform some I/O allocation (like opening a file), registering
some cleanup action (closing that file), and having a function for pulling data from that
resource. conduit
comes built-in with a sourceFile
function
that gives a stream of ByteString
s. Let's write a wildly inefficient alternative
that returns a stream of characters.
import Data.Conduit import Control.Monad.Trans.Resource import System.IO import Control.Monad.IO.Class (liftIO) sourceFile :: ResourceIO m => FilePath -> Source m Char sourceFile fp = sourceIO (openFile fp ReadMode) hClose (\h -> liftIO $ do eof <- hIsEOF h if eof then return IOClosed else fmap IOOpen $ hGetChar h)
Like sourceState
, it uses a variant on the Open
and
Closed
constructors. sourceIO
does a number of things for
us:
- It registers the cleanup function with the
ResourceT
transformer, ensuring it gets called even in the presence of exceptions. - It sets up the
sourceClose
record to release the resource immediately. - As soon as you return
IOClosed
, it will release the resource.
Next time
Everyone loves cliffhangers, right? Well, there's too much information to stick into a single blog post, so here are the topics I'm hoping to cover in the rest of this series:
- Sinks
- Connecting sources to sinks
- Conduits
- The fuse operators
- The rules of data loss
- Buffering/resumable sources
- Using conduits in pure code
- Building up conduits from sinks
- Some real-life examples
Remember that conduit has not yet been officially released, and some of the information in this article may change over time. However, the core concepts are mostly solidified now.