franz inc logo  
  download techcorner franz inc franz inc store search franz inc          

products
services
support
  Latest Info
  Tech Corner
  Patches
     Info
  Documentation
  FAQs
  White Papers
  Tutorials
  Examples
  Archives
about
success
resources

RSS Feeds

AllegroServe at opensource.franz.com

Simulating Symmetric Multi-Processing with fork(): source code

;; -*- Mode: common-lisp; Package: common-lisp-user -*-

(in-package :user)

;;;;;;;;;;;;;;; DEBUGGING AND TIMING CODE ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defmacro my-time (&body body)
  ;; We just want to know a single thing: the real time spent in BODY.
  (let ((start (gensym))
	(res (gensym))
	(time (gensym)))
    `(let* ((,start (get-internal-real-time))
	    (,res ,@body)
	    (,time (- (get-internal-real-time) ,start)))
       (format t "REAL TIME: ~s~%" (float (/ ,time 1000)))
       (force-output t)
       ,res)))

(eval-when (compile eval)
  ;; Changing the initial value of this variable to `t' will result in a
  ;; LOT of output.  Only needed when debugging problems.
  (defparameter *debug* nil))

(defmacro .debug (format-string &rest format-arguments)
  ;; We discriminate on *debug* at macroexpand time so there will be no
  ;; runtime penalty for the debugging code we've added.
  (when *debug*
    `(..debug ,format-string ,@format-arguments)))

(defvar *debug-time-base* nil)

(defun ..debug-reset-time-base ()
  (setq *debug-time-base* (excl::cl-internal-real-time)))

(..debug-reset-time-base)

(defun ..debug (format-string &rest format-arguments)
  ;; Use an undocumented (not for long, promise) function to get the
  ;; current time that includes milliseconds.  The first value is the
  ;; number of seconds between now and 2015 (or something).  Date chosen to
  ;; be a fixnum in a 32-bit lisp when the lisp you are using was
  ;; released.
  (multiple-value-bind (seconds mseconds)
      (excl::cl-internal-real-time)
    (format t "~&~2,'0d:~3,'0d " (- seconds *debug-time-base*) mseconds))
  (format t format-string format-arguments)
  (format t "~&")
  (force-output t))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; General framework for High-level task and processor management
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

;; The object type that holds information about available processors.
(defstruct task-manager
  ;; A list of free PROCESSOR objects.  Items on this list are free to have
  ;; tasks scheduled on them.
  processors)

(defmethod print-object ((object task-manager) stream) 
  (format stream "#<TM ~s>" (task-manager-processors object)))

(defstruct processor
  ;; The stream used to write to the processor.
  to-stream
  ;; The stream used to read from the processor.
  from-stream
  ;; The operating system process ID for the processor.  Used to terminate
  ;; the processor.
  pid)

(defmethod print-object ((object processor) stream) 
  (format stream "#<PROC TO: ~s, FROM: ~s, PID ~s>"
	  (excl::stream-output-fn (processor-to-stream object))
	  (excl::stream-input-fn (processor-from-stream object))
	  (processor-pid object)))

(defvar *task-manager* nil)

(defun sigterm-handler-for-child (&rest args)
  (declare (ignore args))
  (exit 1 :quiet t :no-unwind t))

(defun initialize-processors (child-worker &optional (n 4))
  (when *task-manager* (end-processors))
  (flet ((processor ()
	   (multiple-value-bind (from-child-read from-child-write)
	       (excl.osi:pipe)
	     (multiple-value-bind (to-child-read to-child-write)
		 (excl.osi:pipe)
	       (let ((pid (excl.osi:fork)))
		 (cond
		  ((= pid 0)
		   ;; Child
		   (excl::add-signal-handler 15 'sigterm-handler-for-child)

		   ;; These are `parent' only
		   (close from-child-read)
		   (close to-child-write)

		   (unwind-protect
		       (funcall child-worker to-child-read
				from-child-write)
		     (ignore-errors (close to-child-read))
		     (ignore-errors (close from-child-write))
		     (exit 0 :quiet t :no-unwind t)))
		  (t
		   ;; Parent
		
		   ;; These are `child' only
		   (close from-child-write)
		   (close to-child-read)
		
		   (make-processor :to-stream to-child-write
				   :from-stream from-child-read
				   :pid pid))))))))
    (setq *task-manager*
      (make-task-manager
       :processors (let ((c '())) (dotimes (i n c) (push (processor) c)))))))

(defvar *processor-lock* (mp:make-process-lock :name "free processor lock"))

(defun find-free-processor ()
  ;; In the controlling process, we need a way to allocate a task to a new
  ;; procesor.  This function finds the next available processor.  It is
  ;; important that this function be efficient.
  (declare (optimize (speed 3)))
  (unless *task-manager* (error "You must call initialize-processors first."))
  (.debug "FIND FREE PROCESSOR:")
  (let ((tm *task-manager*))
    (loop
      (if* (task-manager-processors tm)
	 then (return (mp:with-process-lock (*processor-lock*)
			(pop (task-manager-processors tm))))
	 else (mp:process-wait "waiting for processor"
			       (lambda (tm)
				 (task-manager-processors tm))
			       tm)))))

(defun release-processor (processor)
  ;; Called on a PROCESSOR that should now be considered ready for use for
  ;; another task.
  (unless *task-manager* (error "You must call initialize-processors first."))
  (.debug "RELEASE-PROCESSOR: ~s" processor)
  (mp:with-process-lock (*processor-lock*)
    ;; Put it back into the pool of free processors:
    (push processor (task-manager-processors *task-manager*))))

(defun end-processors ()
  ;; Called to shutdown the child processors.
  (unless *task-manager* (error "You must call initialize-processors first."))
  (dolist (p (task-manager-processors *task-manager*))
    (.debug "shutting down processor: ~s" p)
    (close (processor-to-stream p))
    (close (processor-from-stream p))
    (excl.osi:kill (processor-pid p) excl.osi:*sigterm*)
    (system:reap-os-subprocess :pid (processor-pid p)))
  (setq *task-manager* nil))

(defun wait-for-processor (function &rest arguments)
  ;; This is the function which does the work of communicating with child
  ;; subprocesses and implements the `PROCESSOR' abstraction.  FUNCTION
  ;; must be a symbol and ARGUMENTS must be symbols, strings or numbers.
  ;;
  ;; Efficiency is important in this function, too.  We use
  ;; WAIT-FOR-INPUT-AVAILABLE to determine when the child subprocess is
  ;; done with their task and we can read the return value.
  (assert (symbolp function))
  (let ((processor (find-free-processor)))
    (unwind-protect
	(let ((to-stream (processor-to-stream processor))
	      (from-stream (processor-from-stream processor)))
	  (write-char #\( to-stream)
	  (write-string (symbol-name function) to-stream)
	  (write-char #\space to-stream)
	  (dolist (argument arguments)
	    (.debug "WAIT-FOR-PROCESSOR: arg: ~s" argument)
	    (if* (stringp argument)
	       then (write-char #\" stream)
		    (write-string argument to-stream)
		    (write-char #\" stream)
	     elseif (symbolp argument)
	       then (write-string (symbol-name argument) to-stream)
	     elseif (integerp argument)
	       then (write-string (prin1-to-string argument) to-stream)
	       else (error "Cannot convert this argument: ~s." argument))
	    (write-char #\space to-stream))
	  (write-char #\) to-stream)
	  (force-output to-stream)

	  (mp:wait-for-input-available from-stream)
    
	  (let ((result (read from-stream nil from-stream)))
	    (if* (eq result from-stream)
	       then (error "Processor did not complete for ~s." function)
	     elseif (eq '#:error result)
	       then (warn "Processor did not complete for ~s." function)
	       else result)))
      (release-processor processor))))

(defun quiet-sigpipe-handler (sig continuable)
  ;; The default SIGPIPE handler in Allegro CL is noisy.  This one merely
  ;; says "I handled the SIGPIPE" and prints nothing.  In other words,
  ;; ignore SIGPIPE.
  (declare (ignore sig continuable))
  t)

(excl::add-signal-handler 13 'quiet-sigpipe-handler)

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Example using framework
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defvar *error-marker* 'sys::error-marker)

(defun child-subprocess (input-stream output-stream)
  ;; This function runs in the child subprocess and reads forms from
  ;; INPUT-STREAM, evaluates them and writes the result to OUTPUT-STREAM.
  ;;
  ;; Errors in the evaluation are passed back to the parent using a unique
  ;; symbol bound to *error-marker*.
  (let (form)
    (loop
      (setq form (read input-stream nil input-stream))
      (.debug "CHILD-SUBPROCESS: form is ~s" form)
      (when (eq input-stream form)
	(write-string "#:eof" output-stream)
	(force-output output-stream)
	(return))
      (let* ((result (handler-case (eval form)
		       (error (c)
			 (..debug "CHILD ERROR: ~a" c)
			 (cons *error-marker*
			       (format nil "~a" c))))))
	(.debug "CHILD-SUBPROCESS: result is ~s" result)
	(write result :stream output-stream)
	(write-char #\newline output-stream)
	(force-output output-stream)))))

(defun number-of-cpus ()
  (let ((default 4))
    #+linux
    (if* (probe-file "/proc/cpuinfo")
       then (let ((processors 0))
	      (excl.osi:with-command-output (line "cat /proc/cpuinfo")
		(when (match-re "^processor" line :return nil)
		  (incf processors)))
	      processors)
       else default)
    #-linux default))

(defun create-work (child-worker &key iterations
				      work-estimate-seconds
				      processors)
  (assert iterations)
  (assert processors)

  (when work-estimate-seconds
    (format t "WORK: ~s seconds, "
	    (* iterations processors work-estimate-seconds))
    (force-output t))

  (initialize-processors #'child-subprocess processors)

  (unwind-protect
      (let ((gates '()))
	(my-time
	 (progn
	   ;; For each processor, start a Lisp process that will do the
	   ;; given amount of work.  A gate per processor is used to
	   ;; determine when all the work is done, so we can shut down.
	   (dotimes (i processors)
	     (let ((gate (mp:make-gate nil)))
	       (mp:process-run-function (format nil "foo~d" i)
		 (lambda (gate i)
		   (dotimes (j iterations)
		     (let ((result (wait-for-processor child-worker j)))
		       (if* (and (consp result)
				 (eq *error-marker* (car result)))
			  then (..debug "PARENT: Got error: ~a" (cdr result))
			elseif  (/= (1+ j) result)
			  then (..debug
				"PARENT: foo~d: invalid return value: ~s"
				i result))))
		   (mp:open-gate gate))
		 gate
		 i)
	       (push gate gates)))
	   ;; This is a very efficient way to wait.
	   (dolist (gate gates)
	     (mp:process-wait "wait for processor to finish"
			      #'mp:gate-open-p gate)))))
  
    (end-processors)))

;;;; timing loop, for getting a certain amount of "work"
;;;;
(defun dummy (i) (expt i 100))
(defun timing-loop (&key seconds)
  (let ((i 0))
    (mp:with-timeout (seconds)
      (loop 
	(dummy i)
	(setq i (1+ i))))
    i))
(defun loopit (n) (dotimes (i n) (dummy i)))
;; (setq n (timing-loop :seconds 1))
;; (loopit n) => should take 1 second

;; `defvar' instead of `defparameter' so it does not change in a Lisp
;; session, if we reload this file.  That will make comparing times between
;; runs a rational thing to do.
(defvar *iterations*
    (let ((it 0))
      (dotimes (i 10 it)
	(setq it (max it (timing-loop :seconds .1))))))

(defun example-child-worker (i)
  (loopit *iterations*)
  (+ i 1))

(defun example-application (&key processors iterations)
  (create-work 'example-child-worker
	       :iterations iterations
	       :processors processors
	       :work-estimate-seconds .1))

(defvar *number-of-cpus* (number-of-cpus))

(defun run ()
  (format t "Detected ~d CPUs~%" *number-of-cpus*)
  (..debug-reset-time-base)
  (macrolet ((run-1 (&key iterations processors)
	       `(progn
		  (format t "Iterations ~d, processors ~d, "
			  ,iterations ,processors)
		  (force-output t)
		  (example-application :iterations ,iterations
				       :processors ,processors))))
    (run-1 :iterations 40 :processors 2)
    (run-1 :iterations 40 :processors 3)
    (run-1 :iterations 40 :processors 4)
    (run-1 :iterations 40 :processors 5)
    (run-1 :iterations 40 :processors 6)
    (run-1 :iterations 40 :processors 7)
    (run-1 :iterations 40 :processors 8)
    (run-1 :iterations 40 :processors 9)
    (run-1 :iterations 40 :processors 10)
    (run-1 :iterations 40 :processors 11)
    (run-1 :iterations 40 :processors 12)))

;;;;;;;;;;;;;;; Error handling example ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defun example-child-worker2 (i)
  (if* (eql i 50)
     then (error "foo: ~a" i)
     else (1+ i)))

(defun example-application2 (&key processors iterations)
  (create-work 'example-child-worker2
	       :iterations iterations
	       :processors processors))

(defun run2 ()
  (format t "Detected ~d CPUs~%" *number-of-cpus*)
  (..debug-reset-time-base)
  (example-application2 :iterations 100 :processors *number-of-cpus*))

#|
cl-user(6): (run2)
Detected 4 CPUs
00:037 CHILD ERROR: (foo: 50)
00:037 PARENT: Got error: (foo: 50)
00:038 CHILD ERROR: (foo: 50)
00:039 PARENT: Got error: (foo: 50)
00:039 CHILD ERROR: (foo: 50)
00:040 PARENT: Got error: (foo: 50)
00:041 CHILD ERROR: (foo: 50)
00:042 PARENT: Got error: (foo: 50)
REAL TIME: 0.033
nil
cl-user(7): 
|#

 

© 2008 Franz Inc - Privacy Statement
[ Consulting Services | Packages/Pricing | Allegro NFS | Certification Program ]