Friday, 27 September 2013

Asynchronous iteratee processing in Scalaz

Asynchronous iteratee processing in Scalaz

I've been using Scalaz 7 iteratees to process a large (i.e., unbounded)
stream of data in constant heap space. Now I'd like to perform the
processing in parallel, working on P chunks of data at a time. I still
have to limit heap space, but it's reasonable to assume that there's
enough heap to store P chunks of data and the accumulated results of the
computation.
In code, it looks something like this:
type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]
def processChunk(c: Chunk): Result
def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Chunk, ErrorOr,
List[Result]] =
Iteratee.foldM[Chunk, ErrorOr, List[Result]](Nil) { (rs, c) =>
processChunk(c) :: rs
} &= data
I'm aware of the Task class and thought of mapping over the enumerator to
create a stream of tasks:
data map (c => Task.delay(processChunk(c)))
But I'm still not sure how to manage the non-determinism. While consuming
the stream, how do I ensure that P tasks are running whenever possible?

No comments:

Post a Comment