From 76552350a0790a50a24834b7f0b551895f838b67 Mon Sep 17 00:00:00 2001 From: "Tristan B. Kildaire" Date: Tue, 17 May 2022 14:42:29 +0200 Subject: [PATCH] Implemented invalidation --- source/tristanable/queue.d | 30 +++++++++++++++++++++++++++--- source/tristanable/watcher.d | 9 ++++++++- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/source/tristanable/queue.d b/source/tristanable/queue.d index ecde08b..5b7a1ef 100644 --- a/source/tristanable/queue.d +++ b/source/tristanable/queue.d @@ -17,6 +17,8 @@ import bmessage : bSendMessage = sendMessage; import core.thread : Thread; import std.container.dlist; import std.range : walkLength; +import tristanable.manager : Manager; +import tristanable.exceptions : ManagerError; public enum QueuePolicy : ubyte { @@ -25,6 +27,9 @@ public enum QueuePolicy : ubyte public final class Queue { + /* Associated Manager */ + private Manager manager; + /* This queue's tag */ private ulong tag; @@ -38,14 +43,14 @@ public final class Queue * Construct a new queue with the given * tag */ - this(ulong tag, QueuePolicy flags = cast(QueuePolicy)0) + this(Manager manager, ulong tag, QueuePolicy flags = cast(QueuePolicy)0) { + this.manager = manager; this.tag = tag; + this.flags = flags; /* Initialize the mutex */ queueLock = new Mutex(); - - this.flags = flags; } public void setLengthCap(ulong lengthCap) @@ -65,6 +70,11 @@ public final class Queue private QueuePolicy flags; + /** + * FIXME: User should not be able to call this + * Might need to put these all within the same + * package to obtain such security + */ public void enqueue(QueueItem item) { /* Lock the queue */ @@ -99,6 +109,13 @@ public final class Queue */ public bool poll() { + /* Throw an exception if the Manager endpoint is dead */ + if(manager.isInvalid()) + { + throw new ManagerError(manager, "Manager session is dead"); + } + + /* Status */ bool status; @@ -123,6 +140,13 @@ public final class Queue */ public QueueItem dequeue() { + /* Throw an exception if the Manager endpoint is dead */ + if(manager.isInvalid()) + { + throw new ManagerError(manager, "Manager session is dead"); + } + + /* The head of the queue */ QueueItem queueHead; diff --git a/source/tristanable/watcher.d b/source/tristanable/watcher.d index a1a6b16..3e85f0c 100644 --- a/source/tristanable/watcher.d +++ b/source/tristanable/watcher.d @@ -202,11 +202,18 @@ public final class Watcher : Thread /* Check if we had an error */ if(running) { - throw new TristanableException(manager, "bformat socket error"); + /* Unblock all current Queue operations and prevent future ones */ + manager.invalidate(); + + /* TODO: Remove this */ + // throw new TristanableException(manager, "bformat socket error"); } else { /* Actual shut down, do nothing */ + + /* Unblock all current Queue operations and prevent future ones */ + manager.invalidate(); } } } \ No newline at end of file