Implemented invalidation

This commit is contained in:
Tristan B. Velloza Kildaire 2022-05-17 14:42:29 +02:00
parent 4c7f6f5ea2
commit 76552350a0
2 changed files with 35 additions and 4 deletions

View File

@ -17,6 +17,8 @@ import bmessage : bSendMessage = sendMessage;
import core.thread : Thread;
import std.container.dlist;
import std.range : walkLength;
import tristanable.manager : Manager;
import tristanable.exceptions : ManagerError;
public enum QueuePolicy : ubyte
{
@ -25,6 +27,9 @@ public enum QueuePolicy : ubyte
public final class Queue
{
/* Associated Manager */
private Manager manager;
/* This queue's tag */
private ulong tag;
@ -38,14 +43,14 @@ public final class Queue
* Construct a new queue with the given
* tag
*/
this(ulong tag, QueuePolicy flags = cast(QueuePolicy)0)
this(Manager manager, ulong tag, QueuePolicy flags = cast(QueuePolicy)0)
{
this.manager = manager;
this.tag = tag;
this.flags = flags;
/* Initialize the mutex */
queueLock = new Mutex();
this.flags = flags;
}
public void setLengthCap(ulong lengthCap)
@ -65,6 +70,11 @@ public final class Queue
private QueuePolicy flags;
/**
* FIXME: User should not be able to call this
* Might need to put these all within the same
* package to obtain such security
*/
public void enqueue(QueueItem item)
{
/* Lock the queue */
@ -99,6 +109,13 @@ public final class Queue
*/
public bool poll()
{
/* Throw an exception if the Manager endpoint is dead */
if(manager.isInvalid())
{
throw new ManagerError(manager, "Manager session is dead");
}
/* Status */
bool status;
@ -123,6 +140,13 @@ public final class Queue
*/
public QueueItem dequeue()
{
/* Throw an exception if the Manager endpoint is dead */
if(manager.isInvalid())
{
throw new ManagerError(manager, "Manager session is dead");
}
/* The head of the queue */
QueueItem queueHead;

View File

@ -202,11 +202,18 @@ public final class Watcher : Thread
/* Check if we had an error */
if(running)
{
throw new TristanableException(manager, "bformat socket error");
/* Unblock all current Queue operations and prevent future ones */
manager.invalidate();
/* TODO: Remove this */
// throw new TristanableException(manager, "bformat socket error");
}
else
{
/* Actual shut down, do nothing */
/* Unblock all current Queue operations and prevent future ones */
manager.invalidate();
}
}
}