- Added an `exitReason` and an `alive` (set to `true` on construction)
- Calling `shutdownQueue(ErrorType)` will set the exit reason, will also set the aliveness to `false` and wake up ALL `dequeue()`'s blocking
- `dequeue()` first check in wakeup routine duty cycle is to check if we are alive
This commit is contained in:
Tristan B. Velloza Kildaire 2023-11-26 18:58:53 +02:00
parent 198cb52342
commit 713c102da5
1 changed files with 36 additions and 1 deletions

View File

@ -62,6 +62,13 @@ public class Queue
*/ */
private Duration wakeInterval; private Duration wakeInterval;
/**
* Reason for a `dequeue()`
* to have failed
*/
private ErrorType exitReason;
private bool alive;
/** /**
* Constructs a new Queue and immediately sets up the notification * Constructs a new Queue and immediately sets up the notification
* sub-system for the calling thread (the thread constructing this * sub-system for the calling thread (the thread constructing this
@ -85,6 +92,9 @@ public class Queue
/* Set the slumber interval */ /* Set the slumber interval */
this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value
/* Set status to alive */
this.alive = true;
} }
/** /**
@ -157,6 +167,25 @@ public class Queue
} }
} }
public void shutdownQueue(ErrorType reason)
{
// Set running state and reason
this.alive = false;
this.exitReason = reason;
// Wakeup sleeping dequeue()
// Lock the mutex
this.mutex.lock();
// Awake all condition variable sleepers
this.signal.notifyAll();
// Unlock the mutex
this.mutex.unlock();
}
// TODO: Make a version of this which can time out // TODO: Make a version of this which can time out
/** /**
@ -188,6 +217,13 @@ public class Queue
/* Block till we dequeue a message successfully */ /* Block till we dequeue a message successfully */
while(dequeuedMessage is null) while(dequeuedMessage is null)
{ {
/* Check if this queue is still alive */
if(!this.alive)
{
// Throw an exception to unblock the calling `dequeue()`
throw new TristanableException(this.exitReason);
}
scope(exit) scope(exit)
{ {
// Unlock the mutex // Unlock the mutex
@ -207,7 +243,6 @@ public class Queue
throw new TristanableException(ErrorType.DEQUEUE_FAILED); throw new TristanableException(ErrorType.DEQUEUE_FAILED);
} }
/* Lock the item queue */ /* Lock the item queue */
queueLock.lock(); queueLock.lock();