pipes resource problems

October 7, 2013

GravatarBy Michael Snoyman

I promised a blog post elaborating on my concerns with the pipes 4.0 release. What you're reading now is not that blog post; this is an introduction to it. Right now, I'm trying to motivate the fact that there's a serious problem. I've been having a private conversation with Gabriel about this specific concern, and I don't believe my concerns have made much of an impact. So I'm raising them here. A quick summary is:

  • pipes has known cases in which it will not clean up resources promptly.
  • Worse yet, the cleanup behavior is in many cases unreliable, in non-obvious ways.
  • The problem can easily lead to programs crashing.

The crashing program

Let me jump right in to a concrete example of that third point, which is IMO the most troubling. Let's create a directory with lots of files:

{-# LANGUAGE OverloadedStrings #-}
import           Control.Monad             (forM_)
import           Filesystem                (createTree)
import           Filesystem.Path.CurrentOS (decodeString, directory,
                                            encodeString, (</>))

main = do
    let files = do
            let pieces = map (decodeString . show) [1..20 :: Int]
            p1 <- pieces
            p2 <- pieces
            p3 <- pieces
            return $ "input" </> p1 </> p2 </> p3
    forM_ (zip files [1 :: Int ..]) $ \(fp, num) -> do
        createTree $ directory fp
        writeFile (encodeString fp) (show num)

I'd like to write a program to create a clone of this "input" directory, into a directory called "output". Using filesystem-conduit, this is pretty easy:

{-# LANGUAGE OverloadedStrings #-}
import           Control.Monad.IO.Class
import           Data.Conduit
import           Data.Conduit.Filesystem
import           Filesystem                (createTree)
import           Filesystem.Path.CurrentOS

main = runResourceT $ traverse False "input" $$ awaitForever (\infile -> do
    Just suffix <- return $ stripPrefix "input/" infile
    let outfile = "output" </> suffix
    liftIO $ createTree $ directory outfile
    sourceFile infile =$ sinkFile outfile
    )

traverse creates a stream of all of the files found in the input path, traversing subdirectories. For each file, we create the output filename, create the directory for the output file, and then connect sourceFile to sinkFile to perform the actual copy. Each of these functions guarantees prompt cleanup of the file handles they hold, and the enclosing runResourceT ensures that resources will be cleaned up, even in the event of an exception.

Let's translate this code to use pipes-safe, which claims to support prompt finalization of resources. (Note that I've included an implementation of traverse here, based on the conduit implementation.)

{-# LANGUAGE OverloadedStrings #-}
import           Control.Monad
import           Control.Monad.IO.Class
import           Filesystem                (createTree, isDirectory, isFile,
                                            listDirectory)
import           Filesystem.Path.CurrentOS
import           Pipes
import qualified Pipes.Prelude             as P
import           Pipes.Safe
import           Pipes.Safe.Prelude
import           Prelude                   hiding (FilePath, readFile,
                                            writeFile)

main = runSafeT $ runEffect $ traverse "input" >-> forever (await >>= \infile -> do
    Just suffix <- return $ stripPrefix "input/" infile
    let outfile = "output" </> suffix
    liftIO $ createTree $ directory outfile
    readFile (encodeString infile) >-> writeFile (encodeString outfile)
    )

traverse :: MonadIO m => FilePath -> Producer FilePath m ()
traverse root =
    liftIO (listDirectory root) >>= pull
  where
    pull [] = return ()
    pull (p:ps) = do
        isFile' <- liftIO $ isFile p
        if isFile'
            then yield p >> pull ps
            else do
                follow' <- liftIO $ isDirectory p
                if follow'
                    then do
                        ps' <- liftIO $ listDirectory p
                        pull ps
                        pull ps'
                    else pull ps

Go ahead and run that program. You should get output that looks something like:

copy-pipes.hs: input/13/1/12: openFile: resource exhausted (Too many open files)

The exact file it crashes on may be different, and if you've increased your ulimits, the program may succeed. But the core problem is that pipes provides no means of guaranteeing that a resource is cleaned up. What's even more troubling is that the behavior of pipes is worse than that of lazy I/O. Since pipes continues to hold on to open file handles after they are no longer needed, the garbage collector has no chance of helping us. By contrast, the following lazy I/O version of the program generally runs without crashing:

{-# LANGUAGE OverloadedStrings #-}
import           Control.Monad
import           Control.Monad.IO.Class
import           Filesystem                (createTree, isDirectory, isFile,
                                            listDirectory)
import           Filesystem.Path.CurrentOS
import           Prelude                   hiding (FilePath)

main = traverse "input" $ \infile -> do
    Just suffix <- return $ stripPrefix "input/" infile
    let outfile = "output" </> suffix
    createTree $ directory outfile
    readFile (encodeString infile) >>= writeFile (encodeString outfile)

traverse :: MonadIO m => FilePath -> (FilePath -> m a) -> m ()
traverse root f =
    liftIO (listDirectory root) >>= pull
  where
    pull [] = return ()
    pull (p:ps) = do
        isFile' <- liftIO $ isFile p
        if isFile'
            then f p >> pull ps
            else do
                follow' <- liftIO $ isDirectory p
                if follow'
                    then do
                        ps' <- liftIO $ listDirectory p
                        pull ps
                        pull ps'
                    else pull ps

Why this is a problem

For many of us, the primary goal of a streaming data library is to provide for deterministic resource handling. While not the only issue with lazy I/O, its non-determinism was high on the list. The issue is that, based on various environmental factors, cleanup of resources could be delayed until the next garbage collection, whose timing cannot be guaranteed.

There are three different aspects to resource handling in a streaming library:

  • On demand acquisition (a.k.a., laziness).
  • Prompt finalization.
  • Exception safety.

One of the beauties of the iteratee pattern is that it allows for all three of these to be addressed on the data producer side. However, exception safety cannot be guaranteed on the data consumer side. When I started work on conduit, I wanted to ensure that both the producer and consumer could reliably allocate resources on demand in an exception safe manner. This pattern is allowed via the resourcet package. conduit itself then provides on demand acquisition and prompt finalization.

pipes-safe includes a SafeT transformer which is almost identical to ResourceT. And this transformer guarantees that, short of a program crash, a cleanup action is always called. However, just like ResourceT, it can give no guarantees about promptness. I'll get into the details of why in my next blog post, but pipes is unable to guarantee that it will run code at a specific point.

Let's look at one of the examples from the pipes-safe docs:

runSafeT $ runEffect $ readFile "readFile.hs" >-> P.take 4 >-> P.stdoutLn

Running this will in fact promptly close readFile.hs. But that's just due to the small nature of this example. What actually happens is that readFile opens the file, after reading four lines, the pipeline terminates, and then runSafeT closes the file. This reliance on SafeT to do normal resource finalization is the problem. The first example I gave demonstrates that in simple real-world examples, there may be many operations between resource acquisition and exiting SafeT.

It's true that the example I started off with could be rewritten to embed the SafeT block inside, and run two separate pipelines instead of one. That's certainly true, but that's only because of the simplicity of the example. A more difficult example to work around would be one where the data source needs to be shared amongst the consumers, and therefore you can't get away with running multiple pipelines. The following example in conduit opens an input file and splits it into 50 byte chunks:

import Data.Conduit
import Data.Conduit.List (peek)
import Data.Conduit.Binary

main =
    runResourceT $ sourceFile "input.dat" $$ loop 0
  where
    loop i = do
        mx <- peek
        case mx of
            Nothing -> return ()
            Just _ -> do
                let fp = "out-conduit/" ++ show i
                isolate 50 =$ sinkFile fp
                loop $ i + 1

Given a large enough input file (I used /usr/share/dict/words), the following pipes version will crash:

import Pipes
import qualified Pipes.Prelude as P
import Pipes.Safe
import qualified Pipes.Safe.Prelude as P

main =
    runSafeT $ runEffect $ P.readFile "input.dat" >-> loop 0
  where
    loop i = do
        let fp = "out-pipes/" ++ show i
        P.take 50 >-> P.writeFile fp
        loop $ i + 1

Note that, due to differences in sourceFile in conduit and readFile in pipes-safe, these programs are doing slightly different things: conduit deals with 50 byte chunks, while pipes is dealing with 50 line chunks. This distinction is irrelevant for the current discussion, I'm just trying to keep the examples concise by using functions built into the libraries.

Ignoring any issues of which programs can or cannot be rewritten to work with pipes, the more glaring issue is that pipes makes it easy and natural to write programs which have very detrimental behavior regarding resources.

UPDATE: There's a more sophisticated example the better demonstrates the problem at the end of this blog post.

It's unreliable

One last point is that this behavior is unreliable. Consider this example again:

runSafeT $ runEffect $ readFile "readFile.hs" >-> P.take 4 >-> P.stdoutLn

Above, I claimed that readFile would not end up closing the file handle. This wasn't strictly accurate. If the file contains less than four lines, readFile will close the file handle. This isn't quite non-deterministic behavior, since we can clearly state how the program will behave on different inputs. However, it's pretty close: depending on the size of a file, the program will either do the right or the wrong thing, and we have no way to restructure our program to change this.

Given the fact that, for many of us, the entire attraction of getting away from lazy I/O is to take back deterministic, prompt resource management, this behavior is unsettling. What concerns me even more is that the pipes libraries claim to support proper behavior, but on basic analysis clearly don't.

In our conversations, Gabriel told me that pipes is about much more than just a lazy I/O replacement. I have no objection to that, and pipes can and should continue to research those directions. But in its current form, pipes is not performing the bare minimum functionality to be considered a replacement for iteratees or conduit.

Again, my point in this blog post is simply to establish the fact that there's a problem. I'll get into more details about the cause in the following blog post, and a solution to that problem in the one after that.

Update: a more complicated problem

After publishing this blog post, there was a discussion on Reddit which pointed out the presence of withFile in pipes-safe, of which I was previously unaware. That allows the two examples I gave above to be implemented, but doesn't actually solve the core problem that finalizers within a pipeline cannot be run promptly. Here's a more complicated example to demonstrate this.

The following snippet of conduit code loops over all of the files in an input folder, and spits each 50-byte chunk into a separate file in the output folder. At no point is more than one file handle open for reading and one for writing.

{-# LANGUAGE OverloadedStrings #-}
import Data.Conduit
import Data.Conduit.List (peek)
import Data.Conduit.Filesystem
import Data.Conduit.Binary (isolate)
import Data.String (fromString)

main =
    runResourceT $ src $$ loop 0
  where
    src = traverse False "input" $= awaitForever sourceFile
    loop i = do
        mx <- peek
        case mx of
            Nothing -> return ()
            Just _ -> do
                let fp = "out-conduit/" ++ show i
                isolate 50 =$ sinkFile (fromString fp)
                loop $ i + 1

To me, this demonstrates the beauty of composition that conduit provides. Our src above never has to be structured in such a way to deal with resource allocation or finalization; sourceFile automatically handles it correctly.

I'm not aware of any solution to this problem in pipes.

Update 2: Gabriel has provided a solution for this in pipes:

{-# LANGUAGE OverloadedStrings #-}

import Control.Monad
import Data.DirStream
import Pipes
import Pipes.Safe
import Pipes.ByteString
import qualified Pipes.ByteString.Parse as P
import Pipes.Parse

import qualified Filesystem as F
import qualified Filesystem.Path.CurrentOS as F
import System.IO

main =
    runSafeT $ runEffect $ (`evalStateT` src) (loop 0)
  where
    src = for (every (childFileOf "/usr/bin")) readFile'
    loop i = do
        eof <- isEndOfBytes
        unless eof $ do
            let fp = F.decodeString ("out/" ++ show i)
            runSafeT $ runEffect $
                hoist lift (input >-> P.take 50) >-> writeFile' fp
            loop (i + 1)

-- `childOf` returns all children, including directories.  This is just a quick filter to get only files
childFileOf :: (MonadSafe m) => F.FilePath -> ListT m F.FilePath
childFileOf file = do
    path <- childOf file
    isDir <- liftIO $ isDirectory path
    guard (not isDir)
    return path

-- Work around `FilePath` mismatch.  See comments below
readFile' :: (MonadSafe m) => F.FilePath -> Producer ByteString m ()
readFile' file =
    bracket (liftIO $ F.openFile file ReadMode) (liftIO . hClose) fromHandle

writeFile' :: (MonadSafe m) => F.FilePath -> Consumer ByteString m r
writeFile' file =
    bracket (liftIO $ F.openFile file WriteMode) (liftIO . hClose) toHandle

I think this is a good demonstration of the fact that having proper resource handling in the core is (1) more composable, (2) much safer and (3) far easier to use.

Comments

comments powered by Disqus

Archives