Introducing conduit-extra
Michael Snoyman recently added a new package called conduit-extra into the conduit framework. It's aimed on incubating new ideas around conduit, testing them and seeing if they're worth adding into the main package.
In this post I'd like to introduce one of its first additions.
ZipSink
Motivating example
Our task will be to compute several different power sums from a sequence of numbers. We would like to solve the task by constructing a Sink
that will read numbers on its input and compute first n power sums of its input:
powerSums :: (Monad m, Num a) => Int -> Sink a m [a]
The first obvious step is to create a sink that computes the k-th power sum:
import Control.Applicative
import Control.Seq
import Data.Conduit
import Data.Conduit.Extra
import Data.Conduit.List (fold, sourceList)
import Data.Traversable (traverse)
powerSum :: (Monad m, Num a) => Int -> Sink a m a
powerSum k = fold (\s x -> s + x^k) 0
Now we'd like to generalize it to a list of power sums. Unfortunately, if we want to compute a list of sums at once, we'll have to keep the list of sums as the state of fold
:
powerSumsAttempt1 :: (Monad m, Num a) => Int -> Sink a m [a]
powerSumsAttempt1 n = fold f (repeat 0)
where
f ss x = zipWith (+) ss (map (x^) [1..n])
This involves mapping and zipping and the code is somewhat obscure, clearly much less readable than our original powerSum
.
Moreover, while it produces the correct result, it's still problematic. We soon realize that it leaks memory for large inputs, because even though fold
is strict, thunks accumulate inside the list. So we have to force the evaluation of the list during folding (using Control.Seq
):
powerSumsAttempt2 :: (Monad m, Num a) => Int -> Sink a m [a]
powerSumsAttempt2 n = fold f (repeat 0)
where
f ss x = withStrategy (seqList rseq) $ zipWith (+) ss (map (x^) [1..n])
Clearly, this solution has several major drawbacks:
- The original simple idea is completely lost within auxiliary code.
- The code isn't composable. We couldn't use
powerSum
as a building block. Instead, we had to reimplement a more complex variant from scratch. - In this case we're dealing with a collection of parallel folds, which can be expressed as a fold over a list. But what if our building blocks were more complex sinks, not simple folds? Then there would be no way how to combine them together, even manually.
- We don't want to deal with strictness and evaluation, we want conduit to handle it for us.
ZipSinks applicative functor
What we need is a way how to parallelize sinks, combine them so that the input is fed to all of them, and combine the results at the end.
Recall that ConduitM
is a Monad
and an Applicative
and allows to compose conduits using their operations. But the semantic of these instances is different. It's sequential instead of parallel. Expression condF <*> condX
runs condF
first, when it finishes it continues with condX
and then applies the result of condF
to the result of condX
.
Therefore conduit-extra has introduced a simple newtype wrapper for Sink
s
newtype ZipSink i m r = ZipSink { getZipSink :: Sink i m r }
which implements Applicative
with the parallel semantics:
pure
creates a sink that doesn't consume any input, just returns a given value;<*>
runs two sinks in parallel until both finish, and then applies the result of the first one to the second one.
Now we can run powerSum
s in parallel as
powerSumZ :: (Monad m, Num a) => Int -> ZipSink a m a
powerSumZ = ZipSink . powerSum
powerSumTuple :: (Monad m, Num a) => Int -> Int -> Sink a m (a, a)
powerSumTuple k l = getZipSink $ (,) <$> powerSumZ k <*> powerSumZ l
Not just that, we can use all the existing functions for Applicative
s. Here we had two sinks and created one sink that returned a tuple. Our final aim is to create a sink that returns a list, provided we can create a sink using powerSumZ
for each member of [0..n]
. This sounds like a job for traverse
:
traverse :: (Applicative f, Traversable t) => (a -> f b) -> t a -> f (t b)
which, specialized to our case, is typed as
traverse :: (Int -> ZipSink a m b) -> [Int] -> ZipSink a m [b]
It solves our problem completely and very elegantly:
powerSums :: (Monad m, Num a) => Int -> Sink a m [a]
powerSums n = getZipSink $ traverse powerSumZ [0..n]
Let's print some test results:
main :: IO ()
main = (sourceList [1..100] $$ powerSums 4) >>= print
which produces
[100,5050,338350,25502500,2050333330]
Notes
The above scenario is quite often, therefore conduit-extra already provides a helper function for
Sink
s that usesZipSink
s only internally:broadcast :: (Monad m, Traversable t) => t (Sink i m r) -> Sink i m (t r)
So an alternative way how to implement
powerSums
would bepowerSums' n = broadcast $ map powerSum [0..n]
ZipSink
's<*>
is implemented usingzipSinks
fromData.Conduit.Util
.