Conduits released!
Since the last blog post, I've uploaded the conduit packages to Hackage so that everyone can play along at home. Obviously, this code is very new, and user feedback is highly appreciated. In the same vein, it's entirely possible that there will be changes to the API. That said, a huge amount of code has already been migrated over to conduit (besides the packages below, all of WAI, http-conduit, and a number of XML libraries have been ported), so I feel pretty confident that things are mostly stable.
The code is split into a few different pacakges:
- conduit
- Base package, including the Resource transformer.
- attoparsec-conduit
- Turn attoparsec parsers into Sinks.
- blaze-builder-conduit
- Convert a stream of Builders into a stream of ByteStrings.
- filesystem-conduit
- Traverse folders, and convenience adapters for the system-filepath package.
- zlib-conduit
- Compress and decompress streams of bytes.
Quick Review
This is part 3 in the conduits series. The first two parts were:
To give a basic overview: you use runResourceT
to run a Resource block, which
ensures all allocated resources are freed. The conduit package always lives inside a Resource
block, which ensures that resources are freed. Sources produce data, sinks consume data, and
conduits transform data. You can fuse conduits into sources, into sinks, or with other conduits.
And finally, you can connect a source to a sink to produce a result.
That should be enough information to cover this post, though reading the previous posts- and especially the second one- is highly recommended.
Sinks
A sink consumes a stream of data, and produces a result. A sink must always produce a result, and must always produce a single result. This is encoded in the types themselves.
There is a Monad
instance for sink, making it simple to compose multiple sinks
together into a larger sink. You can also use the built-in sink functions to perform most of your
work. Like sources, you'll rarely need to dive into the inner workings. Let's start off with an
example: getting lines from a stream of Char
s (we'll assume Unix line endings
for simplicity).
import Data.Conduit import qualified Data.Conduit.List as CL -- Get a single line from the stream. sinkLine :: Resource m => Sink Char m String sinkLine = sinkState id -- initial state, nothing at the beginning of the line push close where -- On a new line, return the contents up until here push front '\n' = return $ StateDone Nothing $ front [] -- Just another character, add it to the front and keep going push front char = return $ StateProcessing $ front . (char:) -- Got an EOF before hitting a newline, just give what we have so far close front = return $ front [] -- Get all the lines from the stream, until we hit a blank line or EOF. sinkLines :: Resource m => Sink Char m [String] sinkLines = do line <- sinkLine if null line then return [] else do lines <- sinkLines return $ line : lines content :: String content = unlines [ "This is the first line." , "Here's the second." , "" , "After the blank." ] main :: IO () main = do lines <- runResourceT $ CL.sourceList content $$ sinkLines mapM_ putStrLn lines
Running this sample produces the expected output:
This is the first line. Here's the second.
sinkLine
demonstrates usage of the sinkState
function, which
is very similar to the sourceState
function we just saw. It takes three
arguments: an initial state, a push function (takes the current state and next input, and returns
a new state and result) and a close function (takes the current state and returns an output). As
opposed to sourceState
- which doesn't need a close function- a sink is required
to always return a result.
Our push function has two clauses. When it gets a newline character, it indicates that
processing is complete via StateDone
. The Nothing
indicates
that there is no leftover input (we'll discuss that later). It also gives an output of all the
characters it has received. The second clause simply appends the new character to the existing
state and indicates that we are still working via StateProcessing
. The close
function returns all characters.
sinkLines
shows how we can use the monadic interface to produce new sinks. If
you replace sinkLine
with getLine
, this would look like
standard code to pull lines from standard input. This familiar interface should make it easy to
get up and running quickly.
Types
The types for sinks are just a bit more involved than sources. Let's have a look:
type SinkPush input m output = input -> ResourceT m (SinkResult input m output) type SinkClose m output = ResourceT m output data SinkResult input m output = Processing (SinkPush input m output) (SinkClose m output) | Done (Maybe input) output data Sink input m output = SinkNoData output | SinkData { sinkPush :: SinkPush input m output , sinkClose :: SinkClose m output } | SinkLift (ResourceT m (Sink input m output))
Whenever a sink is pushed to, it can either say it needs more data
(Processing
) or say it's all done. When still processing, it must provided
updated push and close function; when done, it returns any leftover inut and the output. Fairly
straight-forward.
The first real "gotcha" is the three constructors for Sink
. Why do we need
SinkNoData
: aren't sinks all about consuming data? The answer is that we need
it to efficiently implement our Monad
instance. When we use
return
, we're giving back a value that requires no data in order to compute it.
We could model this with the SinkData
constructor, with something like:
myReturn a = SinkData (\input -> return (Done (Just input) a)) (return a)
But doing so would force reading in an extra bit of input that we don't need right now, and
possibly will never need. (Have a look again at the sinkLines
example.) So
instead, we have an extra constructor to indicate that no input is required. Likewise,
SinkLift
is provided in order to implement an efficient
MonadTrans
instance.
Sinks: no helpers
Let's try to implement some sinks on the "bare metal", without any helper functions.
import Data.Conduit import System.IO import Control.Monad.Trans.Resource import Control.Monad.IO.Class (liftIO) -- Consume all input and discard it. sinkNull :: Resource m => Sink a m () sinkNull = SinkData push close where push _ignored = return $ Processing push close close = return () -- Let's stream characters to a file. Here we do need some kind of -- initialization. We do this by initializing in a push function, -- and then returning a different push function for subsequent -- calls. By using withIO, we know that the handle will be closed even -- if there's an exception. sinkFile :: ResourceIO m => FilePath -> Sink Char m () sinkFile fp = SinkData pushInit closeInit where pushInit char = do (releaseKey, handle) <- withIO (openFile fp WriteMode) hClose push releaseKey handle char closeInit = do -- Never opened a file, so nothing to do here return () push releaseKey handle char = do liftIO $ hPutChar handle char return $ Processing (push releaseKey handle) (close releaseKey handle) close releaseKey _ = do -- Close the file handle as soon as possible. return () -- And we'll count how many values were in the stream. count :: Resource m => Sink a m Int count = SinkData (push 0) (close 0) where push count _ignored = return $ Processing (push count') (close count') where count' = count + 1 close count = return count
Nothing is particularly complicated to implement. You should notice a common pattern here:
declaring your push and close functions in a where
clause, and then
using them twice: once for the initial SinkData
, and once for the
Processing
constructor. This can become a bit tedious; that's why
we have helper functions.
Sinks: with helpers
Let's rewrite sinkFile
and count
to take advantage of the
helper functions sinkIO
and sinkState
, respectively.
import Data.Conduit import System.IO import Control.Monad.IO.Class (liftIO) -- We never have to touch the release key directly, sinkIO automatically -- releases our resource as soon as we return IODone from our push function, -- or sinkClose is called. sinkFile :: ResourceIO m => FilePath -> Sink Char m () sinkFile fp = sinkIO (openFile fp WriteMode) hClose -- push: notice that we are given the handle and the input (\handle char -> do liftIO $ hPutChar handle char return IOProcessing) -- close: we're also given the handle, but we don't use it (\_handle -> return ()) -- And we'll count how many values were in the stream. count :: Resource m => Sink a m Int count = sinkState 0 -- The push function gets both the current state and the next input... (\state _ignored -> -- and it returns the new state return $ StateProcessing $ state + 1) -- The close function gets the final state and returns the output. (\state -> return state)
Nothing dramatic, just slightly shorter, less error-prone code. Using these two helper functions is highly recommended, as it ensures proper resource management and state updating.
List functions
As easy as it is to write your own sinks, you'll likely want to take advantage of the built-in
sinks available in the Data.Conduit.List module. These provide
analogues to common list functions, like folding. (The module also has some
Conduit
s, like map.)
If you're looking for some way to practice with conduits, reimplementing the functions in the
List
module- both with and without the helper functions- would be a good
start.
Let's look at some simple things we can make out of the built-in sinks.
import Data.Conduit import qualified Data.Conduit.List as CL import Control.Monad.IO.Class (liftIO) -- A sum function. sum' :: Resource m => Sink Int m Int sum' = CL.fold (+) 0 -- Print every input value to standard output. printer :: (Show a, ResourceIO m) => Sink a m () printer = CL.mapM_ (liftIO . print) -- Sum up all the values in a stream after the first five. sumSkipFive :: Resource m => Sink Int m Int sumSkipFive = do CL.drop 5 CL.fold (+) 0 -- Print each input number and sum the total printSum :: ResourceIO m => Sink Int m Int printSum = do total <- CL.foldM go 0 liftIO $ putStrLn $ "Sum: " ++ show total return total where go accum int = do liftIO $ putStrLn $ "New input: " ++ show int return $ accum + int
Connecting
At the end of the day, we're actually going to want to use our sinks. While we could manually
call sinkPush
and sinkClose
, it's tedious. For example:
main :: IO () main = runResourceT $ do res <- case printSum of SinkData push close -> loop [1..10] push close SinkNoData res -> return res liftIO $ putStrLn $ "Got a result: " ++ show res where start (SinkData push close) = loop [1..10] push close start (SinkNoData res) = return res start (SinkLift msink) = msink >>= start loop [] _push close = close loop (x:xs) push close = do mres <- push x case mres of Done _leftover res -> return res Processing push' close' -> loop xs push' close'
Instead, the recommended approach is to connect your sink to a source. Not only is this simpler, it's less error prone, and means you have a lot of flexibility in where your data is coming from. To rewrite the example above:
main :: IO () main = runResourceT $ do res <- CL.sourceList [1..10] $$ printSum liftIO $ putStrLn $ "Got a result: " ++ show res
Connecting takes care of testing for the sink constructor (SinkData
versus
SinkNoData
versus SinkLift
), pulling from the source, and
pushing to/closing the sink.
However, there is one thing I wanted to point out from the long-winded example. On the second
to last line, we ignore the leftover value of Done
. This brings up the issue of
data loss. This is an important topic that has had a lot of thought put into it.
Unfortunately, we can't fully cover it yet, as we haven't discussed the main culprit in the
drama: Conduit
s (the type, not the package).
But as a quick note here, the leftover value from the Done
constructor is not
always ignored. The Monad
instance, for example, uses it to pass data from one
sink to the next in a binding. And in fact, the real connect operator doesn't always throw
away the leftovers. When we cover resumable sources later, we'll see that the leftover value is
put back on the buffer to allow later sinks reusing an existing source to pull the value.
To be continued...
We still have a lot to cover in conduits, though at this point you likely have enough
information to get started using them. The next big topic is Conduit
s. We'll see
what they are, and how they combine together with sources and sinks. Finally, we'll try to cover
the larger design decisions behind conduits, and some more advanced usages.