# Example of streaming data from the database using Persistent and Conduit libraries in Haskell

At work, I have to deal with increasingly larger datasets. And by large, I mean a PostgreSQL table with over 7 millions rows of data. If I really wanted to, I could probably be able to load it all into RAM…maybe, I haven’t even tried as it just sounds silly, and especially given the fact that I have quite a few more than one monstrous table like that to deal with. At the same time, I would also like to do as much processing in parallel as possible (you know, put those multi-core CPUs and/or GPUs into action). In short, big data problem.

This problem has made me look into Haskell yet again. After all, Haskell is meant to make it much easier for the programmer to parallelise their algorithms. This book by Simon Marlow is well worth a look when it comes to parallel programming in Haskell, by the way.

There are quite a few packages that make it easy (or easier) to interface with a database in Haskell. Personally, I’ve decided to give Persistent a shot. Persistent is the default database interface used by Yesod Web Framework, works with the majority of popular databases (SQLite, PostgreSQL, MySQL, etc.), and is quite well documented.

In this post, I will provide a (hopefully informative) example of how to use Persistent together with Conduit library. Conduit is a Haskell library that allows streaming data (not necessarily from a database) in constant memory. In other words, with Conduit, it is possible to pull into memory just enough data for processing and not more, thus minimising unnecessary memory use.

## The problem

Suppose we are given a database with one massive table of integers, and our objective is to print the records as pairwise tuples to screen. That is, if xs :: [Int] is the list of all the records, we want to zip it with its tail and print it to the screen:

## Create test SQLite database

Firstly, let’s define the structure of the table using Persistent. Persistent relies on Template Haskell to specify the structure of the database tables. For example, the following code (the source code for all the snippets in this post can be found on GitHub):

generates the following SQL:

Having specified the table structure, let’s go ahead and create an SQLite database with a single table with 100000 integers starting from 1 and incrementing by 1:

When compiled and executed, this code creates a dummy SQLite database “test.sqlite” with a table consisting of 100000 rows of integers.

In order to query the database for all the records, Persistent provides a helper function, selectList :: [Filter val] -> [SelectOpt val] -> m [Key val]. This function takes two lists containing search criteria as arguments: a list of SQL filter constraints (WHERE), and a list of other SELECT options (e.g., ASC or LIMIT). The result is a list enclosed in a monad m of all the records matching the search criteria. Therefore, in order for us to get all the records from our dummy database, it suffices to call the function with two empty lists:

All that remains to be done, is to extract the integer values from the list of records into a new list, zip the resultant list with its tail, and print the result to screen. That is,

You’ll notice three odd functions used in here: myRecordValue, entityVal and liftIO. The first one, myRecordValue is inferred by Template Haskell and represents the constructor of MyRecord data type. If you recall the definition of the SQL table used in this example:

Persistent, behind the scenes, translates this template into Haskell data type of the following structure:

Thus, we can use myRecordValue to access the underlying Int value from MyRecord data type.

The entityVal function, on the other hand, is a constructor for Entity data type, and can be used to access the record data type that’s stored within it. That is, Persistent wraps up every record received from the database in Entity data type. Therefore, in our example, as selectList returns a list [Entity MyRecord] (wrapped in a monad), it is necessary to first recover MyRecord from the Entity, and that can be accomplished via the use of entityVal function.

Lastly, liftIO function is used to lift a computation from the IO monad. In our example, we use it to lift (or complete) an internal IO-type computation used by Persistent, and return back to the standard IO monad.

The full code looks like this:

If we now run the compiled version of the code with the statistics flag -s; that is,

the following output will be generated:

Notice the total memory used of 35 MB.

Now, let’s do the same, but this time, let’s use the Conduit library to stream the data from the database. In order to stream the data, we need to have a Conduit Source. Persistent provides a helper function that is much like selectList but returns a Source rather than a list. It’s not difficult to guess its name which is selectSource. The function has a signature selectSource :: [Filter val] -> [SelectOpt val] -> Source m (Entity val).

Having acquired the Source of our data, we now need to extract the underlying Int from the input data type Entity MyRecord, combine the extracted integers into pairwise tuples, convert the tuples into their String representation, and finally, print them to screen.

In order to extract Ints from the upstream, we can use Data.Conduit.List.map helper function. This function applies the supplied function to the data provided by the upstream. In our case, selectSource returns data of type Entity MyRecord, therefore, we can use a composition of myRecordValue with entityVal to extract the underlying Int from input. That is,

Note that the type of entityToValue function is Conduit (Entity MyRecord) m Int. This can be read as: for every input of type Entity MyRecord I, the Conduit, will do some processing and pass an Int forward to the downstream (for other Conduits or a Sink to receive).

The next thing to do is to combine the Ints into pairs, and cast them into String. In order to achieve this, we can use the await, yield and leftover helper functions provided by Data.Conduit package. The idea is thus to await for two consecutive Ints, wrap them in a tuple, convert it to String, yield it back to the downstream, and pass the second of the Ints (a leftover) back to the upstream. In code, that would go like this:

Finally, all that remains is to print each tuple to screen, and terminate the flow of the data stream. To accomplish this, we’ll use the Data.Conduit.List.mapM_ which applies a monadic action to all values in the stream. It’s not difficult to guess that the action we are going to apply is printing the tuples to screen. That is,

Having all the components defined, we can link them into the following data stream which will result in the desired outcome:

The full code looks like this:

If we now run the compiled version of the code with a statistics flag -s; that is,

the following output will be generated:

Note that the total memory used equals only 1 MB compared to 35 MB when using the list approach! I don’t know about you, but I’m impressed.