Review and Status
This is part 4 in the conduits series. You can see the previous posts at:
This part covers the final major datatype in our package, conduits. While sources produce a stream of data and sinks consume a stream, conduits transform a stream.
Also, just wanted to give an update on conduits activity. A few of the major enumerator libraries have been converted over to conduits (http-conduit and xml-conduit) and released to Hackage. Also, the Github versions of WAI (including Warp), Persistent and Yesod have been converted over as well.
Not only does the code all work, it's already allowing enhancements we hadn't even thought of. All in all, the change to conduits has been a very pleasant one.
Types
As we did previously, let's start off by looking at the types involved.
data ConduitResult input m output = Producing (Conduit input m output) [output] | Finished (Maybe input) [output] data Conduit input m output = Conduit { conduitPush :: input -> ResourceT m (ConduitResult input m output) , conduitClose :: ResourceT m [output] }
This should look very similar to what we've seen with sinks. A conduit can be pushed to, in which case it returns a result. A result either indicates that it is still producing data, or that it is finished. When a conduit is closed, it returns some more output.
But let's examine the idiosyncracies a bit. Like sinks, we can only push one piece of input at a time, and leftover data may be 0 or 1 pieces. However, there are a few changes:
- When producing (the equivalent of processing for a sink), we can return output. This is because a conduit will product a new stream of output instead of producing a single output value at the end of processing.
- A sink always returns a single output value, while a conduit returns 0 or more outputs (a
list). To understand why, consider conduits such as
concatMap
(produces multiple outputs for one input) andfilter
(returns 0 or 1 output for each input). - We have no special constructor like
SinkNoData
. That's because we provide noMonad
instance for conduits. We'll see later how you can still use a familiar Monadic approach to creating conduits.
Overall conduits should seem very similar to what we've covered so far.
Simple conduits
We'll start off by defining some simple conduits that don't have any state.
import Prelude hiding (map, concatMap) import Data.Conduit -- A simple conduit that just passes on the data as-is. passThrough :: Monad m => Conduit input m input passThrough = Conduit { conduitPush = \input -> return $ Producing passThrough [input] , conduitClose = return [] } -- map values in a stream map :: Monad m => (input -> output) -> Conduit input m output map f = Conduit { conduitPush = \input -> return $ Producing (map f) [f input] , conduitClose = return [] } -- map and concatenate concatMap :: Monad m => (input -> [output]) -> Conduit input m output concatMap f = Conduit { conduitPush = \input -> return $ Producing (concatMap f) $ f input , conduitClose = return [] }
Stateful conduits
Of course, not all conduits can be declared without state. Doing so on the bare metal is not too difficult.
import Prelude hiding (reverse) import qualified Data.List import Data.Conduit import Control.Monad.Trans.Resource -- Reverse the elements in the stream. Note that this has the same downside as -- the standard reverse function: you have to read the entire stream into -- memory before producing any output. reverse :: Resource m => Conduit input m input reverse = mkConduit [] where mkConduit state = Conduit (push state) (close state) push state input = return $ Producing (mkConduit $ input : state) [] close state = return state -- Same thing with sort: it will pull everything into memory sort :: (Ord input, Resource m) => Conduit input m input sort = mkConduit [] where mkConduit state = Conduit (push state) (close state) push state input = return $ Producing (mkConduit $ input : state) [] close state = return $ Data.List.sort state
But we can do better. Just like sourceState
and sinkState
, we
have conduitState
to simplify things.
import Prelude hiding (reverse) import qualified Data.List import Data.Conduit -- Reverse the elements in the stream. Note that this has the same downside as -- the standard reverse function: you have to read the entire stream into -- memory before producing any output. reverse :: Resource m => Conduit input m input reverse = conduitState [] push close where push state input = return $ StateProducing (input : state) [] close state = return state -- Same thing with sort: it will pull everything into memory sort :: (Ord input, Resource m) => Conduit input m input sort = conduitState [] push close where push state input = return $ StateProducing (input : state) [] close state = return $ Data.List.sort state
Using conduits
The way Conduit
s interact with the rest of the package is via
fusing. A conduit can be fused into a source, producing a new source, fused into a
sink to produce a new sink, or fused with another conduit to produce a new conduit. It's best to
just look at the fusion operators.
-- Left fusion: source + conduit = source ($=) :: (Resource m, IsSource src) => src m a -> Conduit a m b -> Source m b -- Right fusion: conduit + sink = sink (=$) :: Resource m => Conduit a m b -> Sink b m c -> Sink a m c -- Middle fusion: conduit + conduit = conduit (=$=) :: Resource m => Conduit a m b -> Conduit b m c -> Conduit a m c
Using these operators is straightforward.
useConduits = do runResourceT $ CL.sourceList [1..10] $= reverse $= CL.map show $$ CL.consume -- equivalent to runResourceT $ CL.sourceList [1..10] $$ reverse =$ CL.map show =$ CL.consume -- and equivalent to runResourceT $ CL.sourceList [1..10] $$ (reverse =$= CL.map show) =$ CL.consume
There is in fact one last way of expressing the same idea. I'll leave it as an exercise to the reader to discover it.
It may seem like all these different approaches are redundant. While occasionally you can in fact choose whichever approach you feel like using, in many cases you will need a specific approach. For example:
- If you have a stream of numbers, and you want to apply a conduit (e.g.,
map show
) to only some of the stream that will be passed to a specific sink, you'll want to use the right fusion operator. - If you're reading a file, and want to parse the entire file as textual data, you'll want to use left fusion to convert the entire stream.
- If you want to create reusable conduits that combine together individual, smaller conduits, you'll use middle fusion.
Data loss
Let's forget about conduits for a moment. Instead, suppose we want to write a program- using plain old lists- that will take a list of numbers, apply some kind of transformation to them, take the first five transformed values and do something with them, and then do something else with the remaining non-transformed values. For example, we want something like:
main = do let list = [1..10] transformed = map show list (begin, end) = splitAt 5 transformed untransformed = map read end mapM_ putStrLn begin print $ sum untransformed
But clearly this isn't a good general solution, since we don't want to have to transform and then untransform every element in the list. For one thing, we may not always have an inverse function. Another issue is efficiency. In this case, we can write something more efficient:
main = do let list = [1..10] (begin, end) = splitAt 5 list transformed = map show begin mapM_ putStrLn transformed print $ sum end
Note the change: we perform our split before transforming any elements. This works because,
with map
, we have a 1-to-1 correspondence between the input and output elements.
So splitting at 5 before or after mapping show
is the same thing. But what
happens if we replace map show
with something more devious.
deviousTransform = concatMap go where go 1 = [show 1] go 2 = [show 2, "two"] go 3 = replicate 5 "three" go x = [show x]
We no longer have the 1-to-1 correspondence. As a result, we can't use the second method. But
it's even worse: we can't use the first method either, since there's no inverse of our
deviousTransform
.
There's only one solution to the problem that I'm aware of: transform elements one at a time. The final program looks like this:
deviousTransform 1 = [show 1] deviousTransform 2 = [show 2, "two"] deviousTransform 3 = replicate 5 "three" deviousTransform x = [show x] transform5 :: [Int] -> ([String], [Int]) transform5 list = go [] list where go output (x:xs) | newLen >= 5 = (take 5 output', xs) | otherwise = go output' xs where output' = output ++ deviousTransform x newLen = length output' -- Degenerate case: not enough input to make 5 outputs go output [] = (output, []) main = do let list = [1..10] (begin, end) = transform5 list mapM_ putStrLn begin print $ sum end
The final output of this program is
1 2 two three three 49What's important to note is that the number 3 is converted into five copies of the word "three", yet only two of them show up in the output. The rest are discarded in the
take 5
call.
This whole exercise is just to demonstrate the issue of data loss in conduits. By forcing conduits to accept only one input at a time, we avoid the issue of transforming too many elements at once. That doesn't mean we don't lose any data: if a conduit produces too much output for the receiving sink to handle, some of it may be lost.
To put all this another way: conduits avoid chunking to get away from data loss. This is not an
issue unique to conduits. If you look in the implementation of concatMapM
for
enumerator, you'll see that it forces elements to be handled one at a time.
In conduits, we opted to force the issue at the type level.
SequencedSink
Suppose we want to be able to combine up existing conduits and sinks to produce a new, more
powerful conduit. For example, we want to write a conduit that takes a stream of numbers and sums
up every five. In other words, for the input [1..50]
, it should result in the
sequence [15,40,65,90,115,140,165,190,215,240]
. We can definitely do this with
the low-level conduit interface.
sum5Raw :: Resource m => Conduit Int m Int sum5Raw = conduitState (0, 0) push close where push (total, count) input | newCount == 5 = return $ StateProducing (0, 0) [newTotal] | otherwise = return $ StateProducing (newTotal, newCount) [] where newTotal = total + input newCount = count + 1 close (total, count) | count == 0 = return [] | otherwise = return [total]
But this is frustrating, since we already have all the tools we need to do this at a high
level! There's the fold
sink for adding up the numbers, and the
isolate
conduit which will only allow up to a certain number of elements to be
passed to a sink. Can't we combine these somehow?
The answer is a SequencedSink
. The idea is to create a normal
Sink
, except it returns a special output called a
SequencedSinkResponse
. This value can emit new output, stop processing data, or
transfer control to a new conduit. (See the Haddocks for more information.) Then we can turn this
into a Conduit
using the sequenceSink
function. This function
also takes some state value that gets passed through to the sink.
So we can rewrite sum5Raw
in a much more high-level manner.
sum5 :: Resource m => Conduit Int m Int sum5 = sequenceSink () $ \() -> do nextSum <- CL.isolate 5 =$ CL.fold (+) 0 return $ Emit () [nextSum]
All of the ()
in there are simply the unused state variable being passed
around, they can be ignored. Otherwise, we're doing exactly what we want. We fuse
isolate
to fold
to get the sum of the next five elements from
the stream. We then emit that value, and start all over again.
Let's say we want to modify this slightly. We want to get the first 8 sums, and then pass
through the remaining values, multiplied by 2. We can keep track of how many values we've
returned in our state, and then use the StartConduit
constructor to pass control
to the multiply-by-2 conduit next.
sum5Pass :: Resource m => Conduit Int m Int sum5Pass = sequenceSink 0 $ \count -> do if count == 8 then return $ StartConduit $ CL.map (* 2) else do nextSum <- CL.isolate 5 =$ CL.fold (+) 0 return $ Emit (count + 1) [nextSum]
These are obviously very contrived examples, but I hope it makes clear the power and simplicity available from this approach.
Summary
We're nearing the end of our conduits series. The last remaining major point is buffering and resumable sources. (I would have included it here, but (a) it doesn't exactly fit with the rest of the material and (b) it's 10:30 at night and I want to go to sleep.) In addition, we'll try to cover some real-life use cases for conduits, and give examples of where libraries like http-conduit and the upcoming conduit-based wai can be used together.