CHICKEN extension to manage a pool of worker processes: Artifact [68c10b9084]

Artifact 68c10b90844d0835ac7a1a9a598bf70d718cabaa42cda6fe172eeff91ce71a6f:


(module (poule)
  
  ;
  ; exports
  ;
  (
   poule?
   poule-create
   poule-submit
   poule-result
   poule-dispose-results
   poule-wait
   poule-destroy
   poule-stats
   poule-trace)

  ;
  ; imports
  ;
  (import
    (scheme)
    (chicken base)
    (chicken condition)
    (chicken file posix)
    (chicken foreign)
    (chicken gc)
    (chicken port)
    (chicken process)
    (chicken process signal)
    (chicken process-context posix)
    (chicken string)
    (chicken time)
    (chicken type)
    (datatype)
    (mailbox)
    (matchable)
    (srfi-1)
    (srfi-18)
    (typed-records))

  (define-syntax record
    (syntax-rules ()
      ((_ name body ...)
       (begin
         (defstruct name body ...)
         (define-type name (struct name))))))

  ;
  ; datatypes
  ;
  (define-datatype result result?
                   (result-value (val (constantly #t)))
                   (result-error (err (constantly #t)))) 
  (define-type result (struct result))

  ;
  ; records
  ;
  (record job
          (num         : fixnum)            ; job number, incrementally allocated
          (arg         : *)                 ; argument of the worker function
          ((result #f) : (or result false)) ; result from the worker function
          ((ready? #f) : boolean)           ; is the result ready?
          )


  (record worker
          (pid      : fixnum)            ; pid of the child process
          (out      : output-port)       ; parent -> child output port
          (in       : input-port)        ; child -> parent input port
          ((job #f) : (or fixnum false)) ; currently assigned job number, or #f if worker is free
          )

  (record poule
          ((active? #t)          : boolean)           ; is this poule usable?
          (fn                    : procedure)         ; worker function
          (max-workers           : fixnum)            ; maximum number of workers to create
          (workers               : (list-of worker))  ; list of workers
          (idle-timeout          : fixnum)            ; let workers die after so many idle seconds
          (submission-thread     : thread)            ; thread to submit jobs to workers
          ((mbox (make-mailbox)) : (struct mailbox))  ; mailbox for incoming jobs, for the submission thread to pick up
          ((jobs '())            : (list-of job))     ; list of submitted jobs
          ((job-count 0)         : fixnum)            ; incremental number to assign each job a unique id
          ((mutex (make-mutex))  : (struct mutex))    ; mutuate access between main process and submission thread
          )

  (: poule-create          (('a -> 'b) fixnum #!optional fixnum -> poule))
  (: poule-submit          (poule 'a -> fixnum))
  (: poule-result          (poule fixnum #!optional boolean -> (or 'b false)))
  (: poule-dispose-results (poule -> undefined))
  (: poule-wait            (poule -> undefined))
  (: poule-destroy         (poule #!optional boolean -> undefined))
  (: poule-stats           (poule -> (list-of (pair symbol undefined))))

  (define poule-trace (make-parameter #f))

  ;
  ; a free worker has a #f job
  ;
  (define worker-free?         (complement worker-job))
  (define (worker-assign! w n) (worker-job-set! w n))
  (define (worker-unassign! w) (worker-job-set! w #f))

  ; debug print
  (define (dp . args)
    (if (poule-trace)
      (apply print
             (current-seconds)
             " [" (current-process-id) "." (thread-name (current-thread)) "] "
             args)))

  ; guard some expressions with the poule mutex
  (define-syntax guarded-p
    (syntax-rules ()
      ((_ body ...)
       (dynamic-wind
         (lambda ()
           (dp "mutex-lock!")
           (mutex-lock! (poule-mutex p)))
         (lambda () body ...)
         (lambda ()
           (dp "mutex-unlock!")
           (mutex-unlock! (poule-mutex p)))))))

  ;
  ; helpers
  ;
  (define (write/flush obj port)
    (write obj port)
    (flush-output port))

  (define (make-backoff initial multiplier)
    (lambda ()
      (dp "Sleeping for " initial)
      (thread-sleep! initial)
      (set! initial (exact->inexact (* initial multiplier)))))

  (define (exn->string e)
    (->string (if (condition? e)
                (condition->list e)
                e)))

  (define (elapsed? last-checkpoint-ms seconds)
    (< (+ last-checkpoint-ms (* 1000 seconds)) (current-process-milliseconds)))

  (define (next-job-number p)
    (let ((n (add1 (poule-job-count p))))
      (poule-job-count-set! p n)
      n))

  ;
  ; pipes are unidirectional on Linux, use socketpair(2)
  ;
  (foreign-declare "#include <sys/socket.h>")
  (define create-bidirectional-pipe
    (foreign-primitive ()
      "
      int ends[2] = { -1, -1 };
      socketpair(PF_LOCAL, SOCK_STREAM, 0, ends);
      C_word av[4] = { C_SCHEME_UNDEFINED, C_k, C_fix(ends[0]), C_fix(ends[1]) };
      C_values(4, av);
      "))

  ;
  ; arguments checkers
  ;
  (define (check-poule p loc)
    (unless (and (poule? p) (poule-active? p))
      (signal (condition `(exn location ,loc message "invalid poule")))))

  (define (check-number n loc)
    (unless (and (fixnum? n) (positive? n))
      (signal (condition `(exn location ,loc message "invalid num")))))

  (define (check-procedure p loc)
    (unless (procedure? p)
      (signal (condition `(exn location ,loc message "invalid procedure")))))

  (define (check-boolean b loc)
    (unless (boolean? b)
      (signal (condition `(exn location ,loc message "invalid boolean")))))

  ; fork a new worker process
  (define (spawn-worker fn idle-timeout)
    (dp "spawn-worker")

    (let-values (((p c) (create-bidirectional-pipe)))
      (define (child)
        (let ((out (open-output-file* c))
              (in  (open-input-file* c))
              (last-idle (current-process-milliseconds)))

          (set-signal-handler!
            signal/alrm
            (lambda (_)
              (when (elapsed? last-idle idle-timeout)
                (dp "idle timeout, exiting...")
                (exit 0))))

          (let loop ()
            (set! last-idle (current-process-milliseconds))
            (set-alarm! idle-timeout) 

            (match (read in)
              (('work x)
               (set-alarm! 0)
               (handle-exceptions exn
                 (write/flush (cons #f (exn->string exn)) out)
                 (write/flush (cons #t (fn x)) out))
               (loop))
              (('exit)
               (dp "got 'exit")
               (write/flush (cons #t #t) out))))))

      (match (process-fork child #t)
        (0
         (exit))
        (n
          (dp "spawned worker pid " n)
          (make-worker pid: n
                       out: (open-output-file* p)
                       in:  (open-input-file* p))))))

  ; gather results from ready workers, kill dead workers, make sure we always
  ; have at least one worker alive
  ; FIXME - this does a bunch of things, it would be best to refactor it a bit
  (define (scan-workers p)
    (dp "scan-workers")

    (define (reap-ready)
      (dp "reap-ready")
      (pair?
        (filter-map
          (lambda (w)
            (and-let* ((n (worker-job w))
                       ((char-ready? (worker-in w)))
                       (j (find (lambda (j) (eq? (job-num j) n)) (poule-jobs p)))
                       (r (handle-exceptions exn
                            (result-error exn)
                            (match (read (worker-in w))
                              ((#t . val) (result-value val))
                              ((#f . err) (result-error err))))))
              ; TODO - convert to condition here, so we don't have
              ; to handle a cons with #t or #f later on?
              (worker-unassign! w)
              (job-ready?-set! j #t)
              (job-arg-set! j #f) ; let it be GC'd
              (job-result-set! j r)))
          (poule-workers p))))

    (define (gc-dead)
      (define (alive? w)
        (let-values (((pid _ _ ) (process-wait (worker-pid w) #t)))
          (zero? pid)))
      (let-values (((alive dead) (partition alive? (poule-workers p))))
        (poule-workers-set!
          p
          (if (pair? alive)
            alive
            (list (spawn-worker (poule-fn p) (poule-idle-timeout p)))))))

    (let ((any-reaped? (reap-ready)))
      (gc-dead)
      any-reaped?))

  ; submission thread
  (define (submission p)
    (dp "submission")

    (define (continue?)
      (thread-specific (current-thread)))

    (let mbox-loop ()
      (dp "mbox-loop")
      (let ((j (mailbox-receive! (poule-mbox p) 1.0 #f)))
        (cond
          ((not (continue?))
           (dp "submit: done"))
          (j
            (let ((backoff (make-backoff 0.1 1.05)))
              (dp "submit: got job " (job-num j))
              (let worker-loop ()
                (guarded-p (scan-workers p))
                (cond
                  ((guarded-p
                     (and-let* ((w (find worker-free? (poule-workers p))))
                       (dp "submit: assigned job " (job-num j) " to worker " (worker-pid w))
                       (worker-assign! w (job-num j))
                       (write/flush `(work ,(job-arg j)) (worker-out w))
                       #t))
                   (mbox-loop))
                  ((guarded-p
                     (if (> (poule-max-workers p) (length (poule-workers p)))
                       (begin
                         (dp "submit: no worker, spawning a new one")
                         (poule-workers-set!
                           p
                           (cons (spawn-worker (poule-fn p)
                                               (poule-idle-timeout p))
                                 (poule-workers p)))
                         #t)
                       #f))
                   (worker-loop))
                  (else
                    (dp "submit: no worker, backing off")
                    (backoff)
                    (worker-loop))))))
          (else
            (thread-yield!)
            (mbox-loop))))))

  ;
  ; POULE API
  ;
  (define (poule-create fn num #!optional (idle-timeout 15))
    (check-procedure fn 'poule-create)
    (check-number num 'poule-create)

    (letrec* ((t (make-thread (cut submission p)))
              (p (make-poule fn:                fn
                             max-workers:       num
                             workers:           (list-tabulate num (lambda (_) (spawn-worker fn idle-timeout)))
                             idle-timeout:      idle-timeout
                             submission-thread: t)))
      (thread-specific-set! t #t)
      (set-finalizer! p poule-destroy)
      (thread-start! t)
      p))

  (define (poule-submit p arg)
    (check-poule p 'poule-submit)

    (dp "poule-submit " arg)

    (guarded-p
      (let* ((n (next-job-number p))
             (j (make-job num: n arg: arg)))
        (poule-job-count-set! p n)
        (poule-jobs-set! p (cons j (poule-jobs p)))
        (mailbox-send! (poule-mbox p) j)
        n)))

  (define (poule-result p num #!optional (wait? #t))
    (check-poule p 'poule-result)
    (check-number num 'poule-result)
    (check-boolean wait? 'poule-result)

    (define backoff (make-backoff 0.1 1.05))

    (define-datatype try-result
                     (try-done (val (constantly #t)))
                     (try-retry-now)
                     (try-retry-later))
    (define (try)
      (guarded-p
        (let ((j (find (lambda (j) (eq? (job-num j) num)) (poule-jobs p))))
          (if (or (not j) (null? (poule-workers p)))
            (try-done #f)
            (cond
              ((job-ready? j)
               (dp "poule-result " num " is ready")
               (cases result (job-result j)
                 (result-value (v) (try-done v))
                 (result-error (e) (signal (if (condition? e)
                                             e
                                             (condition `(exn
                                                           location worker
                                                           message ,e)))))))
              ((scan-workers p)
               (dp "poule-result " num ": some worker is done, trying again...")
               (try-retry-now))
              (wait?
                (dp "poule-result " num ": worker is not done, backing off...")
                (try-retry-later))
              (else
                (dp "poule-result " num ": worker is not done, returning")
                (try-done #f)))))))

    (let loop ()
      (dp "looping")
      (let ((t (try)))
        (cases try-result t
          (try-done (val) val)
          (try-retry-now () (loop))
          (try-retry-later () (backoff) (loop))))))

  (define (poule-dispose-results p)
    (check-poule p 'poule-dispose-results)

    (guarded-p
      (poule-jobs-set! p (remove job-ready? (poule-jobs p)))))

  (define (poule-wait p)
    (check-poule p 'poule-wait)

    (dp "poule-wait")
    (define backoff (make-backoff 0.1 1.05))
    (let loop ()
      (when
        (guarded-p
          (scan-workers p)
          (and (any (complement job-ready?) (poule-jobs p))
               (not (null? (poule-workers p)))))
        (thread-yield!)
        (backoff)
        (loop))))

  (define (poule-destroy p #!optional (wait? #t))
    #;(check-poule p 'poule-destroy) ; when called from the finalizer, the poule might not be active anymore
    (check-boolean wait? 'poule-destroy)

    (dp "poule-destroy")
    (when (poule-active? p)
      (if wait? (poule-wait p))
      (guarded-p
        (thread-specific-set! (poule-submission-thread p) #f)
        (thread-yield!)
        (let ((w (poule-workers p)))
          (for-each
            (lambda (w)
              (write/flush '(exit) (worker-out w))
              (let-values (((pid succ rc) (process-wait (worker-pid w)))) '()))
            w)
          (poule-active?-set! p #f)))))

  (define (poule-stats p)
    (guarded-p
      (let* ((w (poule-workers p))
             (j (poule-jobs p))
             (idle (length (filter worker-free? w)))
             (busy (- (length w) idle))
             (ready (length (filter job-ready? j)))
             (pending (- (length j) ready)))
        `((submitted-jobs ,(poule-job-count p))
          (pending-jobs ,(mailbox-count (poule-mbox p)))
          (busy-workers ,busy)
          (idle-workers ,idle)
          (ready-results ,ready)
          (pending-results ,pending)))))

  )