-
-
Notifications
You must be signed in to change notification settings - Fork 61
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
565fe03
commit 9e8226f
Showing
22 changed files
with
1,202 additions
and
1,151 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,52 +1,67 @@ | ||
(require-builtin steel/time) | ||
(require "steel/result") | ||
(provide make-thread-pool | ||
lock! | ||
submit-task | ||
block-on-task | ||
task-done? | ||
task-err) | ||
|
||
(provide spawn-cancellable-thread-looping) | ||
;;@doc | ||
;; Lock the given lock during the duration | ||
;; of the thunk. | ||
(define (lock! lock thunk) | ||
(lock-acquire! lock) | ||
(dynamic-wind (lambda () void) (lambda () (thunk)) (lambda () (lock-release! lock)))) | ||
|
||
;; Spawns a thread, returning a handle to the sender to that thread. | ||
; (define (message-passing) | ||
; (define channels (make-channels)) | ||
; (define sender (list-ref channels 0)) | ||
; (define receiver (list-ref channels 1)) | ||
(struct ThreadPool (task-sender capacity thread-handles)) | ||
|
||
; ;; Worker thread, listen to requests | ||
; (spawn-thread! (lambda () | ||
; ;; Process incoming requests. | ||
; (while #true (displayln (channel->recv receiver))) | ||
; (loop))) | ||
(struct Task (lock done func-or-result err) #:mutable) | ||
|
||
; sender) | ||
;;@doc | ||
;; Check if the given task is done | ||
(define task-done? Task-done) | ||
|
||
(define *CHILD_THREADS* '()) | ||
;;@doc | ||
;; Get the err object (if any) from the given task | ||
(define task-err Task-err) | ||
|
||
;;@doc | ||
;; Create a thread pool with the given capacity | ||
(define (make-thread-pool capacity) | ||
(define channels (channels/new)) | ||
(define sender (channels-sender channels)) | ||
(define receiver (channels-receiver channels)) | ||
|
||
;; Keep track of all of the threads currently running | ||
(define (record-thread-handle handle) | ||
(set! *CHILD_THREADS* (cons handle *CHILD_THREADS*))) | ||
(define (listen-for-tasks) | ||
(define next-task (channel/recv receiver)) | ||
(define func (Task-func-or-result next-task)) | ||
|
||
(struct CancellableThreadHandle (sender handle)) | ||
;; Does this work? | ||
(with-handler (lambda (err) (set-Task-err! next-task err)) | ||
;; Capture exception, if it exists. Store it in the task | ||
(lock! (Task-lock next-task) | ||
(lambda () | ||
;; This should be fine, we're updating the task to be finished, | ||
;; so we can check the progress of it | ||
(set-Task-func-or-result! next-task (func)) | ||
(set-Task-done! next-task #t)))) | ||
|
||
(listen-for-tasks)) | ||
|
||
;; Give me back a thread pool to do some work | ||
(ThreadPool sender | ||
capacity | ||
(map (lambda (_) (spawn-native-thread listen-for-tasks)) (range 0 capacity)))) | ||
|
||
;;@doc | ||
;; Submit task to the thread pool | ||
(define (submit-task tp func) | ||
;; Create the task. We'll update this to done, and replace | ||
;; the func with the proper value afterwards | ||
(define task (Task (mutex) #f func #f)) | ||
(channel/send (ThreadPool-task-sender tp) task) | ||
task) | ||
|
||
;;@doc | ||
;; Spawn a function, func, that runs on a background thread, running at the interval `delay-ms` | ||
(define (spawn-cancellable-thread-looping func delay-ms) | ||
;; Create the channels. We're going to cancel the thread using | ||
;; the sender here to interrupt the receiver | ||
(define channels (make-channels)) | ||
(define sender (list-ref channels 0)) | ||
(define receiver (list-ref channels 1)) | ||
|
||
(CancellableThreadHandle sender | ||
(spawn-thread! (lambda () | ||
(while (not (~> (channel->try-recv receiver) (unwrap-ok))) | ||
(begin | ||
(func) | ||
(time/sleep-ms delay-ms))) | ||
(stdout-simple-displayln "Shutting down thread: " | ||
(thread::current/id)))))) | ||
|
||
; (let ([tasks (map (lambda (_) | ||
; (spawn-thread! (lambda () | ||
; (time/sleep-ms 2000) | ||
; (displayln (thread::current/id)) | ||
; 1))) | ||
; (range 0 10))]) | ||
; (displayln (map thread-join! tasks))) | ||
;; Block the current thread on this task until it is finished. | ||
(define (block-on-task task) | ||
(lock! (Task-lock task) (lambda () (Task-func-or-result task)))) |
Oops, something went wrong.