diff --git a/source/tristanable/queue/queue.d b/source/tristanable/queue/queue.d index 8db3aa8..c170646 100644 --- a/source/tristanable/queue/queue.d +++ b/source/tristanable/queue/queue.d @@ -62,6 +62,13 @@ public class Queue */ private Duration wakeInterval; + /** + * Reason for a `dequeue()` + * to have failed + */ + private ErrorType exitReason; + private bool alive; + /** * Constructs a new Queue and immediately sets up the notification * sub-system for the calling thread (the thread constructing this @@ -85,6 +92,9 @@ public class Queue /* Set the slumber interval */ this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value + + /* Set status to alive */ + this.alive = true; } /** @@ -157,6 +167,25 @@ public class Queue } } + + public void shutdownQueue(ErrorType reason) + { + // Set running state and reason + this.alive = false; + this.exitReason = reason; + + // Wakeup sleeping dequeue() + + // Lock the mutex + this.mutex.lock(); + + // Awake all condition variable sleepers + this.signal.notifyAll(); + + // Unlock the mutex + this.mutex.unlock(); + } + // TODO: Make a version of this which can time out /** @@ -188,6 +217,13 @@ public class Queue /* Block till we dequeue a message successfully */ while(dequeuedMessage is null) { + /* Check if this queue is still alive */ + if(!this.alive) + { + // Throw an exception to unblock the calling `dequeue()` + throw new TristanableException(this.exitReason); + } + scope(exit) { // Unlock the mutex @@ -207,7 +243,6 @@ public class Queue throw new TristanableException(ErrorType.DEQUEUE_FAILED); } - /* Lock the item queue */ queueLock.lock();