CHICKEN extension to manage a pool of worker processes: Home
poule

Poule

Introduction

A poule manages a pool of worker processes, ready to execute a worker function. This function takes a single argument. Jobs are submitted to the poule by specifying an argument for the worker function. The value produced by the application of the worker function to this argument is the result of a job.

Communication between the poule and the workers happens via pipes. Job arguments and results are serialized via (read) and (write), and thus must be readable/writable scheme forms.

Example

Here's a sample program that computes the first 50 powers of two using 10 parallel processes.

(import (poule) (scheme) (chicken base) (srfi-1))
(let* ((p (poule-create (cut expt 2 <>) 10))
       (j (list-tabulate 50 (cut poule-submit p <>)))
       (r (map (cut poule-result p <>) j))
       (_ (poule-destroy p)))
  (display r)
  (newline))

(1 2 4 8 16 32 64 128 256 512 1024 2048 4096 8192 16384 32768 65536 131072 262144 524288 1048576 2097152 4194304 8388608 16777216 33554432 67108864 134217728 268435456 536870912 1073741824 2147483648 4294967296 8589934592 17179869184 34359738368 68719476736 137438953472 274877906944 549755813888 1099511627776 2199023255552 4398046511104 8796093022208 17592186044416 35184372088832 70368744177664 140737488355328 281474976710656 562949953421312)

Author

Pietro Cerutti

Repository

https://code.ptrcrt.ch/poule

API

[procedure] (: poule-create (('a -> 'b) fixnum #!optional fixnum -> poule))

(poule-create fn max-workers idle-timeout) returns a poule that manages up to max-workers worker processes that can execute the function fn. It is an error if max-workers is not a positive number. Workers are created on-demand and are kept in a wait state until a job is submitted to the poule. After idle-timeout (default: 15) seconds without receiving any work to do, a worker quits.

[procedure] (: poule-submit (poule 'a -> fixnum))

(poule-submit poule arg) submits a job to the poule by placing it in a submission queue and returns a jobid. The function returns immediately and the job is eventually realized in an available worker process by calling (fn arg), where fn is the function passed to (poule-create). The jobid can later be used to retrieve the result.

[procedure] (: poule-result (poule fixnum #!optional boolean -> (or 'b false)))

(poule-result poule jobid wait?) returns the result of the job jobid. If the result is not ready yet and wait? is #t (the default), the call waits, otherwise it returns #f. If jobid is invalid (e.g., it has been disposed), the call returns #f. If the job submission resulted in an error condition being raised, (poule-result) will propagate (i.e., (signal)) the condition.

[procedure] (: poule-dispose-results (poule -> undefined))

(poule-dispose-results poule) makes all ready results irretrievable.

[procedure] (: poule-wait (poule -> undefined))

(poule-wait poule) waits until all submitted jobs are ready.

[procedure] (: poule-destroy (poule #!optional boolean -> undefined))

(poule-destroy poule wait?) terminates all workers and releases the poule. If wait? is #t (the default), the function calls (poule-wait) before asking the workers to terminate. A finalizer is set to call this function when a poule is about to be GC'd. However, because there is no guarantee that such a finalizer would be called on program exit, it is advised to always destroy poules explicitely when they are no longer needed. This makes sure worker processes are informed and can quit as soon as possible.

[procedure] (: poule-stats (poule -> (list-of (pair symbol undefined))))

(poule-stats poule) reports statistics about the poule in form of an alist. The following keys are defined:

  • submitted-jobs (fixnum): number of jobs submitted since the creation of the poule
  • pending-jobs (fixnum): number of jobs in the submission queue
  • busy-workers (fixnum): number of busy workers
  • idle-workers (fixnum): number of idle workers
  • ready-results (fixnum): number of results ready for retrieval
  • pending-results (fixnum): number of results not yet ready for retrieval

License

 Copyright (c) Pietro Cerutti
 All rights reserved.
 
 Redistribution and use in source and binary forms, with or without
 modification, are permitted provided that the following conditions
 are met:
 
 1. Redistributions of source code must retain the above copyright
    notice, this list of conditions and the following disclaimer.
 2. Redistributions in binary form must reproduce the above copyright
    notice, this list of conditions and the following disclaimer in the
    documentation and/or other materials provided with the distribution.
 
 THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 SUCH DAMAGE.

Version History

  • 0.1.0 - initial release