Some ideas for pipes-parse

February 11, 2014

GravatarBy Michael Snoyman

I haven't mentioned them before, but many times in the past, I've played around with from-scratch implementations of conduit to test out new theories. I've tried having versions without automatic termination of any sort, different orderings of finalizers, various different ways of handling leftovers, and strict adherence to the category laws. Each one of them has taught me something, but since the 1.0 release, none of them were enough of an improvement on what exists already to warrant the pain in switching. And frankly, most of them were far worse than what we already have.

A few days ago, Gabriel Gonzalez wrote a blog post about the newest release of pipes-parse. Given that we've had many conversations over the years- both publicly and privately- about the directions of our libraries, I wasn't surprised that the core idea behind this library is something we'd discussed once before, and in fact was an idea I'd even tried out for conduit once. Based on that experience, I'd like to share some ideas on how Gabriel (or others) could approach the problem a bit differently, and possibly avoid some user-facing issues.

Proliferation of APIs

Before discussing the details of leftovers themselves, let mention what I consider the primary issue. It seems that each new feature added to pipes is introducing a completely separate API. Consider, for example, a simple question: how do you get the next value in a stream? In the conduit world, the answer is await. There are also some convenience functions built on top of await: awaitForever, peek, mapM_, etc. But there is just one primitive for awaiting values, and it has the same type everywhere:

await :: Monad m => Consumer i m (Maybe i)

In the pipes world, there are now (to my knowledge) three different "get the next value" primitives:

await :: Monad m => Consumer' a m a
next :: Monad m => Producer a m r -> m (Either r (a, Producer a m r))
draw :: Monad m => Parser a m (Maybe a)

This in turn means that utility functions need to be written multiple times, in different ways, depending on the context in which they're needed. For example, takeWhile in Pipes.Prelude works on a "normal" Pipe, but will silently discard one extra value from the stream since a normal Pipe does not support leftovers. span from Pipes.Parse performs essentially the same functionality, but works in the Parser/leftover aware realm.

One of the biggest changes to come to the conduit library was the unification of the Source, Sink, and Conduit datatypes into a single datatype, called Pipe. And the reason it's called Pipe is, as you might guess, because it was inspired from the pipes world (via Twan van Laarhoven). Though I was skeptical at first about the confusion in error messages and type signatures which may have ensued, the net result was, in my mind, undeniably positive.

It seems to me like pipes is now at that same kind of crossroads. There are a large number of different data types and type synonyms, different ways of composing things together, and different functionality depending on what type you're using. I'd recommend standardizing on one as the canonical entry point to pipes, and make the standard libraries all use that API.

It seems like the Parser API is the best suited for this task. Unless I'm mistaken, all of the folding functions in Pipes.Prelude (e.g., toList) can be implemented in terms of Parser, and it adds the capability of leftovers. If this change happened, then functions like takeWhile would no longer have to silently discard data from the data stream either.

Side note: you might think that conduit has lots of different kinds of composition due to the three different fusion operators, $=, =$, and =$=. In reality, they're all type-restricted aliases for the same function. The question of whether they should be type restricted at all is a good debate to have, but that's not my topic here.

Overview of approaches

With that more general comment out of the way, let's get into the details of Parser. Leftovers may seem like a really complicated concept, and (at least to me) lens-based parsing sounds pretty intimidating. However, the core concepts here are pretty trivial, and I think most people faced with the same constraints would end up with similar solutions to what the major streaming data libraries have. To motivate this, let's consider the simplest of all streaming data: pure lists.

In our case, we'll store our "data producer" (i.e., a list) in a State monad. Getting another value from the stream (a.k.a., awaiting, drawing) means getting the list, popping an element off the top, putting back the smaller list, and returning the popped element. Putting a value back in the stream (a.k.a., undrawing, leftover) means getting the list, sticking an element at the beginning, and putting it back. This can all be embodied in very little Haskell code:

import Control.Monad.Trans.State.Strict

type Parser a r = State [a] r

-- In conduit, this is await
draw :: Parser a (Maybe a)
draw = do
    list <- get
    case list of
        [] -> return Nothing
        x:xs -> do
            put xs
            return (Just x)

-- In conduit, this is leftover
unDraw :: a -> Parser a ()
unDraw a = do
    list <- get
    put $ a:list

At its core, this is what pipes-parse is doing. Instead of a pure list, it's using a Producer, which is really just a list transformer. But there's another, slightly less obvious approach that we could take instead. Right now, we're sticking leftovers right back on the same stack, making it impossible to distinguish between values taken from the original stream, and values leftover from the Parser. Instead, we could store a tuple in the State monad: the original list, and the leftovers. This is also pretty easy to code:

import Control.Monad.Trans.State.Strict

type Parser a r = State ([a], [a]) r

-- In conduit, this is await
draw :: Parser a (Maybe a)
draw = do
    (list, leftovers) <- get
    case leftovers of
        x:leftovers' -> do
            put (list, leftovers')
            return (Just x)
        [] ->
            case list of
                [] -> return Nothing
                x:list' -> do
                    put (list', leftovers)
                    return (Just x)

-- In conduit, this is leftover
unDraw :: a -> Parser a ()
unDraw a = do
    (list, leftovers) <- get
    put (list, a:leftovers)

While this certainly works, it seems a little overkill: what possible benefit is there in having this separation? Well, it would allow us to distinguish between "completely unparsed values" and "parsed and leftovered" values. In our discussion so far, and in the documentation for pipes-parse, I see absolutely no reason why this feature would be relevant. However, let me introduce a non-trivial parsing example to motivate things a bit further.

An archive file format

Let's say for some (probably bad) reason we've decided that the tar file format is unsuitable for our purposes. Instead, we've come up with a new file format that works as follows:

  • Each file consists of a textual filename and binary contents.
  • The filename will be UTF8 encoded.
  • We will encode lengths using a variation on netstrings: the decimal representation of the length followed by a colon.
  • Each file will be encoded as the length of the textual filename, its UTF-8 representation, the length of its binary contents, and the contents.

Yes, this example is ridiculous, but I wanted to find something that would demonstrate pipes-parse's ability to handle leftover preserving. To make the above description a bit easier to understand, here's the Haskell code to encode a list of these files:

data File = File
    { fileName     :: !Text
    , fileContents :: !ByteString
    }
    deriving Show

encodeFile :: File -> Builder
encodeFile (File name contents) =
    tellLen (T.length name) <>
    fromByteString (TEE.encodeUtf8 name) <>
    tellLen (S.length contents) <>
    fromByteString contents
  where
    tellLen i = fromByteString $ TEE.encodeUtf8 $ T.pack $ shows i ":"

encodeFiles :: [File] -> Builder
encodeFiles = mconcat . map encodeFile

What's important for the parsing is that we will need to switch back and forth between a binary and a textual stream of data in order to handle this correctly. (Note: in reality, if for some terrible reason you decide to actually implement this format, you should encode the filename length using the byte count, not the character count. I specifically used the character count to force this awkward kind of stream switching.)

I've implemented the parser in conduit if anyone's interested in checking it out. The magic leftover-preserving occurs in the withUtf8 function:

withUtf8 :: MonadThrow m
         => ConduitM Text o m r
         -> ConduitM ByteString o m r
withUtf8 =
    fuseLeftovers toBS (CT.decode CT.utf8)
  where
    toBS = L.toChunks . TLE.encodeUtf8 . TL.fromChunks

We're saying to convert the stream to text assuming a UTF8 encoding. We'll generate chunks of text on demand (i.e., lazily), and will stop once we hit an invalid UTF8 sequence (that's the behavior of Data.Conduit.Text). Then, after downstream is done, collect all of the leftovers that it generated, and convert them back to their binary representation.

Obviously, in order to do this, we need to be able to distinguish between the consumed upstream and the leftovers from downstream. If we cannot make that distinction, we'd be forced to encode the entire stream into text, take out the text that we need, and then convert the rest of the stream back to binary. Moreover, we'd have to perform the conversion back and forth for each time we call withUtf8. And even worse than the performance hit is the fact that it may not work: if the byte stream contains invalid UTF8 character sequences, this may break, depending on how your parser handles such invalid sequences.

I'm fairly certain that pipes-parse falls prey to this issue. (If I've misunderstood the library, please correct me, and I'll include the correction here.) conduit handles the issue differently: the "parser" (a.k.a., Sink) has an explicit command for reporting leftovers, and it's up to the composition operator to decide how to handle the leftovers. The standard operators- $=, =$ and =$=- use a similar trick to pipes-parse, and stick the leftovers onto the upstream Source. And that's precisely why they have the behavior of discarding downstream leftovers. However, this is just a sensible default, not a requirement of conduit. It took me under five minutes to write an alternative composition function that had leftover preserving behavior instead.

Simpler example

I tried to make the above example somewhat realistic, but the details may be a bit overwhelming. Instead, let's consider something much simpler: an isomorphism between two data types. We want to convert the stream from type A to type B, perform some peeking, and then deal with the rest of the stream. An example of doing this with conduit would be:

import           Control.Applicative ((<$>), (<*>))
import           Data.Conduit        (yield, ($$), (=$=))
import           Data.Conduit.Extra  (fuseLeftovers)
import qualified Data.Conduit.List   as CL
import           Debug.Trace         (traceShow)

newtype A = A Int
    deriving Show
newtype B = B Int
    deriving Show

atob (A i) = traceShow ("atob", i) (B i)
btoa (B i) = traceShow ("btoa", i) (A i)

main :: IO ()
main = do
    let src = mapM_ (yield . A) [1..10]
    res <- src $$ (,,,)
        <$> fuseLeftovers (map btoa) (CL.map atob) CL.peek
        <*> CL.take 3
        <*> (CL.map atob =$= CL.take 3)
        <*> CL.consume
    print res

We have the numbers 1 through 10 as type A. In our Sink, we first convert the stream to type B, peek, and then return the leftovers upstream. Then we take three As, again convert to B and take three more elements, and finally consume the remainder of the stream. I've added trace statements to demonstrate exactly how many conversions are performed:

("atob",1)
("btoa",1)
("atob",4)
("atob",5)
("atob",6)
(Just (B 1),[A 1,A 2,A 3],[B 4,B 5,B 6],[A 7,A 8,A 9,A 10])

The conversions that occur are the absolute bare minimum than could occur: the first element has to be converted to B in order to be peeked at, and then converted back to A to return to the original stream. Then, when we later take three more elements of type B, they obviously need to be converted.

Let's look at the equivalent in pipes-parse, courtesy of Joseph Abrahamson:

{-# LANGUAGE RankNTypes #-}
import           Control.Applicative
import           Control.Lens               (Iso', from, iso, view, zoom)
import           Control.Monad.State.Strict (evalState)
import           Debug.Trace
import           Pipes
import           Pipes.Core                 as Pc
import qualified Pipes.Parse                as Pp
import qualified Pipes.Prelude              as P

newtype A = A Int
    deriving Show
newtype B = B Int
    deriving Show

atob (A i) = traceShow ("atob", i) (B i)
btoa (B i) = traceShow ("btoa", i) (A i)

ab :: Iso' A B
ab = iso atob btoa

piso :: Monad m => Iso' a b -> Iso' (Producer a m r) (Producer b m r)
piso i = iso (P.map (view i) <-<) (>-> P.map (view $ from i))

main :: IO ()
main = do
  let src = P.map A <-< each [1..10]
  let parser = (,,,) <$> zoom (piso ab) Pp.peek
                     <*> zoom (Pp.splitAt 3) Pp.drawAll
                     <*> zoom (Pp.splitAt 3 . piso ab) Pp.drawAll
                     <*> Pp.drawAll
  let res = evalState parser src
  print res

The result is the same, but look at the traces:

("atob",1)
("btoa",1)
("atob",2)
("btoa",2)
("atob",3)
("btoa",3)
("atob",4)
("btoa",4)
("atob",4)
("atob",5)
("btoa",5)
("atob",5)
("atob",6)
("btoa",6)
("atob",6)
("atob",7)
("btoa",7)
("atob",8)
("btoa",8)
("atob",9)
("btoa",9)
("atob",10)
("btoa",10)
(Just (B 1),[A 1,A 2,A 3],[B 4,B 5,B 6],[A 7,A 8,A 9,A 10])

As described above, in pipes-parse you need to convert the entire stream. In our example, the conversion is trivial, and therefore not too worrying. But in the case of either an expensive conversion, or a possibly-failing conversion, this behavior would be incredibly problematic.

UPDATE: Davorak on Reddit helped me come up with a better example which demonstrates not just doubled encoding, but a program not completing correctly. The conduit/pipes comparison code is available as a Gist.

So my second recommendation would be to tweak Parser to have a stack of leftovers in addition to a Producer, which would allow for more powerful leftover preserving

Drop lenses

If you take the two recommendations above, you'll end up with a large collection of pipes-parse-ready functions, like take and takeWhile. And each of these functions will maintain a stack of leftovers, either to be used by the next monadically-composed Parser, or to perhaps propagate upstream. At this point, the lens-based approach to leftovers is overkill.

The problem with the lens approach is twofold. Firstly, it's difficult for people to understand. The core concept is not complex, but the machinery around it obscures its simplicity. But more importantly, it does not seem to compose well. I don't mean that in the sense of lens or function composition: clearly those work as expected. What I mean is that you won't be able to take the existing utility functions I mentioned above and automatically use them in a leftover-propagating manner.

I'd recommend borrowing from the conduit solution here directly: just have a separate composition function or operator that returns the downstream leftovers. It's a simple concept, it's easy to show in a type signature, and it's easy to build more complex solutions on top of it.

I'll even argue that Gabriel's announcement post is in line with this recommendation: as pointed out, the lens laws are being bent by pipes-parse. If there's a separate solution that requires no law bending, why not use that?

Let me be clear: I'm not at all implying that lenses themselves are the problem here. I think the problem is treating leftover propagation as an isomorphism between two producers. I think lenses can actually play very nicely with a streaming data package. I've been working on some conduit data analysis libraries that make heavy usage of lens to let users write much simpler code (things like filterField stockDate (< today)).

So to reiterate my third recommendation: provide a composition operator for dealing with leftovers explicitly, let users reuse your existing library of functions, and don't force them to try to write isomorphisms on entire streams where a simple predicate function will suffice.

I hope these ideas come across the right way: ideas that I think would improve the pipes ecosystem. These ideas should be taken with a large grain of salt. They are strongly inspired by my experience with conduit, and with the kinds of work I tend to see conduit applied to.

Comments

comments powered by Disqus

Archives