From be91f5ffc50b5b0372395f671793495bb89b4648 Mon Sep 17 00:00:00 2001 From: "Tristan B. Velloza Kildaire" Date: Mon, 2 Oct 2023 14:40:44 +0200 Subject: [PATCH 1/2] Queue - Removed TODOs which are irrevevant for now --- source/tristanable/queue/queue.d | 252 +++++++++++++++++++++++++++++++ 1 file changed, 252 insertions(+) create mode 100644 source/tristanable/queue/queue.d diff --git a/source/tristanable/queue/queue.d b/source/tristanable/queue/queue.d new file mode 100644 index 0000000..6a51351 --- /dev/null +++ b/source/tristanable/queue/queue.d @@ -0,0 +1,252 @@ +/** + * A queue of queue items all of the same tag + */ +module tristanable.queue.queue; + +import tristanable.queue.listener : TListener; + +import core.sync.mutex : Mutex; +import core.sync.condition : Condition; +import core.sync.exception : SyncError; +import std.container.slist : SList; +import tristanable.encoding; +import core.time : Duration, dur; +import tristanable.exceptions; + +version(unittest) +{ + import std.stdio; + import std.conv : to; +} + +/** + * Represents a queue whereby messages of a certain tag/id + * can be enqueued to (by the `Watcher`) and dequeued from + * (by the user application) + */ +public class Queue +{ + /** + * This queue's unique ID + */ + private ulong queueID; + + /** + * The libsnooze event used to sleep/wake + * on queue events + * Mutex for the condition variable + */ + private Mutex mutex; + + /** + * The condition variable used to sleep/wake + * on queue of events + */ + private Condition signal; + + /** + * The queue of messages + */ + private SList!(TaggedMessage) queue; + + /** + * The lock for the message queue + */ + private Mutex queueLock; + + /** + * Attached queue listeners + */ + private SList!(TListener) listeners; + + /** + * Lock for the listeners queue + */ + private Mutex listenersLock; + + /** + * If a message is enqueued prior + * to us sleeping then we won't + * wake up and return for it. + * + * Therefore a periodic wakeup + * is required. + */ + private Duration wakeInterval; + + /** + * Constructs a new Queue and immediately sets up the notification + * sub-system for the calling thread (the thread constructing this + * object) which ensures that a call to dequeue will immediately + * unblock on the first message received under this tag + * + * Params: + * queueID = the id to use for this queue + */ + this(ulong queueID) + { + /* Initialize the queue lock */ + this.queueLock = new Mutex(); + + /* Initialize the condition variable */ + this.mutex = new Mutex(); + this.signal = new Condition(this.mutex); + + /* Set the queue id */ + this.queueID = queueID; + + /* Set the slumber interval */ + this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value + } + + /** + * Returns the current wake interval + * for the queue checker + * + * Returns: the `Duration` + */ + public Duration getWakeInterval() + { + return this.wakeInterval; + } + + /** + * Sets the wake up interval + * + * Params: + * interval = the new interval + */ + public void setWakeInterval(Duration interval) + { + this.wakeInterval = interval; + } + + /** + * Enqueues the provided tagged message onto this queue + * and then wakes up any thread that has called dequeue + * on this queue as well + * + * On error enqueueing a `TristanableException` will be + * thrown. + * + * Params: + * message = the TaggedMessage to enqueue + */ + public void enqueue(TaggedMessage message) + { + version(unittest) + { + writeln("queue["~to!(string)(queueID)~"]: Enqueuing '"~to!(string)(message)~"'..."); + } + + scope(exit) + { + version(unittest) + { + writeln("queue["~to!(string)(queueID)~"]: Enqueued '"~to!(string)(message)~"'!"); + } + + /* Unlock the item queue */ + queueLock.unlock(); + } + + /* Lock the item queue */ + queueLock.lock(); + + /* Add the item to the queue */ + queue.insertAfter(queue[], message); + + /* Wake up anyone wanting to dequeue from us */ + try + { + // TODO: Make us wait on the event (optional with a time-out) + signal.notifyAll(); + } + catch(SyncError snozErr) + { + // Throw an exception on a fatal exception + throw new TristanableException(ErrorType.ENQUEUE_FAILED); + } + } + + // TODO: Make a version of this which can time out + + /** + * Blocks till a message can be dequeued from this queue + * + * On error dequeueing a `TristanableException` will be + * thrown. + * + * Returns: the dequeued TaggedMessage + */ + public TaggedMessage dequeue() + { + version(unittest) + { + writeln("queue["~to!(string)(queueID)~"]: Dequeueing..."); + } + + /* The dequeued message */ + TaggedMessage dequeuedMessage; + + scope(exit) + { + version(unittest) + { + writeln("queue["~to!(string)(queueID)~"]: Dequeued '"~to!(string)(dequeuedMessage)~"'!"); + } + } + + /* Block till we dequeue a message successfully */ + while(dequeuedMessage is null) + { + scope(exit) + { + // Unlock the mutex + this.mutex.unlock(); + } + + // Lock the mutex + this.mutex.lock(); + + try + { + this.signal.wait(this.wakeInterval); + } + catch(SyncError e) + { + // Throw an exception on a fatal exception + throw new TristanableException(ErrorType.DEQUEUE_FAILED); + } + + + /* Lock the item queue */ + queueLock.lock(); + + /* Consume the front of the queue (if non-empty) */ + if(!queue.empty()) + { + /* Pop the front item off */ + dequeuedMessage = queue.front(); + + /* Remove the front item from the queue */ + queue.linearRemoveElement(dequeuedMessage); + } + + /* Unlock the item queue */ + queueLock.unlock(); + } + + return dequeuedMessage; + } + + /** + * Get the id/tag of this queue + * + * Returns: the queue's id + */ + public ulong getID() + { + return queueID; + } +} \ No newline at end of file From 5cafbb8130f748d606c75b9656d6f1253f6f099f Mon Sep 17 00:00:00 2001 From: "Tristan B. Velloza Kildaire" Date: Mon, 2 Oct 2023 14:43:43 +0200 Subject: [PATCH 2/2] Queue - Removed `TListener` references Everything else - Removed reference to old/duplicate `queue.d` module --- source/tristanable/manager/manager.d | 2 +- source/tristanable/manager/watcher.d | 2 +- source/tristanable/package.d | 2 +- source/tristanable/queue.d | 238 --------------------------- source/tristanable/queue/queue.d | 12 -- 5 files changed, 3 insertions(+), 253 deletions(-) delete mode 100644 source/tristanable/queue.d diff --git a/source/tristanable/manager/manager.d b/source/tristanable/manager/manager.d index 28ecee1..014e9db 100644 --- a/source/tristanable/manager/manager.d +++ b/source/tristanable/manager/manager.d @@ -4,7 +4,7 @@ module tristanable.manager.manager; import std.socket; -import tristanable.queue : Queue; +import tristanable.queue.queue : Queue; import core.sync.mutex : Mutex; import tristanable.manager.watcher : Watcher; import tristanable.encoding : TaggedMessage; diff --git a/source/tristanable/manager/watcher.d b/source/tristanable/manager/watcher.d index 4c9ba38..b766482 100644 --- a/source/tristanable/manager/watcher.d +++ b/source/tristanable/manager/watcher.d @@ -11,7 +11,7 @@ import std.socket; import bformat; import tristanable.encoding; import tristanable.exceptions; -import tristanable.queue; +import tristanable.queue.queue; import bformat.client; /** diff --git a/source/tristanable/package.d b/source/tristanable/package.d index 1b11fb4..7e0fbf8 100644 --- a/source/tristanable/package.d +++ b/source/tristanable/package.d @@ -12,7 +12,7 @@ public import tristanable.manager; /** * A queue of queue items all of the same tag */ -public import tristanable.queue : Queue; +public import tristanable.queue.queue : Queue; /** * Error handling type definitions diff --git a/source/tristanable/queue.d b/source/tristanable/queue.d deleted file mode 100644 index 00cddd9..0000000 --- a/source/tristanable/queue.d +++ /dev/null @@ -1,238 +0,0 @@ -/** - * A queue of queue items all of the same tag - */ -module tristanable.queue; - -import core.sync.mutex : Mutex; -import core.sync.condition : Condition; -import core.sync.exception : SyncError; -import std.container.slist : SList; -import tristanable.encoding; -import core.time : Duration, dur; -import tristanable.exceptions; - -version(unittest) -{ - import std.stdio; - import std.conv : to; -} - -/** - * Represents a queue whereby messages of a certain tag/id - * can be enqueued to (by the `Watcher`) and dequeued from - * (by the user application) - */ -public class Queue -{ - /** - * Mutex for the condition variable - */ - private Mutex mutex; - - /** - * The condition variable used to sleep/wake - * on queue of events - */ - private Condition signal; - - /** - * The queue of messages - */ - private SList!(TaggedMessage) queue; - - /** - * The lock for the message queue - */ - private Mutex queueLock; - - /** - * This queue's unique ID - */ - private ulong queueID; - - /** - * If a message is enqueued prior - * to us sleeping then we won't - * wake up and return for it. - * - * Therefore a periodic wakeup - * is required. - */ - private Duration wakeInterval; - - /** - * Constructs a new Queue and immediately sets up the notification - * sub-system for the calling thread (the thread constructing this - * object) which ensures that a call to dequeue will immediately - * unblock on the first message received under this tag - * - * Params: - * queueID = the id to use for this queue - */ - this(ulong queueID) - { - /* Initialize the queue lock */ - this.queueLock = new Mutex(); - - /* Initialize the condition variable */ - this.mutex = new Mutex(); - this.signal = new Condition(this.mutex); - - /* Set the queue id */ - this.queueID = queueID; - - /* Set the slumber interval */ - this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value - } - - /** - * Returns the current wake interval - * for the queue checker - * - * Returns: the `Duration` - */ - public Duration getWakeInterval() - { - return this.wakeInterval; - } - - /** - * Sets the wake up interval - * - * Params: - * interval = the new interval - */ - public void setWakeInterval(Duration interval) - { - this.wakeInterval = interval; - } - - /** - * Enqueues the provided tagged message onto this queue - * and then wakes up any thread that has called dequeue - * on this queue as well - * - * On error enqueueing a `TristanableException` will be - * thrown. - * - * Params: - * message = the TaggedMessage to enqueue - */ - public void enqueue(TaggedMessage message) - { - version(unittest) - { - writeln("queue["~to!(string)(queueID)~"]: Enqueuing '"~to!(string)(message)~"'..."); - } - - scope(exit) - { - version(unittest) - { - writeln("queue["~to!(string)(queueID)~"]: Enqueued '"~to!(string)(message)~"'!"); - } - - /* Unlock the item queue */ - queueLock.unlock(); - } - - /* Lock the item queue */ - queueLock.lock(); - - /* Add the item to the queue */ - queue.insertAfter(queue[], message); - - /* Wake up anyone wanting to dequeue from us */ - try - { - // TODO: Make us wait on the event (optional with a time-out) - signal.notifyAll(); - } - catch(SyncError snozErr) - { - // Throw an exception on a fatal exception - throw new TristanableException(ErrorType.ENQUEUE_FAILED); - } - } - - // TODO: Make a version of this which can time out - - /** - * Blocks till a message can be dequeued from this queue - * - * On error dequeueing a `TristanableException` will be - * thrown. - * - * Returns: the dequeued TaggedMessage - */ - public TaggedMessage dequeue() - { - version(unittest) - { - writeln("queue["~to!(string)(queueID)~"]: Dequeueing..."); - } - - /* The dequeued message */ - TaggedMessage dequeuedMessage; - - scope(exit) - { - version(unittest) - { - writeln("queue["~to!(string)(queueID)~"]: Dequeued '"~to!(string)(dequeuedMessage)~"'!"); - } - } - - /* Block till we dequeue a message successfully */ - while(dequeuedMessage is null) - { - scope(exit) - { - // Unlock the mutex - this.mutex.unlock(); - } - - // Lock the mutex - this.mutex.lock(); - - try - { - this.signal.wait(this.wakeInterval); - } - catch(SyncError e) - { - // Throw an exception on a fatal exception - throw new TristanableException(ErrorType.DEQUEUE_FAILED); - } - - - /* Lock the item queue */ - queueLock.lock(); - - /* Consume the front of the queue (if non-empty) */ - if(!queue.empty()) - { - /* Pop the front item off */ - dequeuedMessage = queue.front(); - - /* Remove the front item from the queue */ - queue.linearRemoveElement(dequeuedMessage); - } - - /* Unlock the item queue */ - queueLock.unlock(); - } - - return dequeuedMessage; - } - - /** - * Get the id/tag of this queue - * - * Returns: the queue's id - */ - public ulong getID() - { - return queueID; - } -} \ No newline at end of file diff --git a/source/tristanable/queue/queue.d b/source/tristanable/queue/queue.d index 6a51351..8db3aa8 100644 --- a/source/tristanable/queue/queue.d +++ b/source/tristanable/queue/queue.d @@ -3,8 +3,6 @@ */ module tristanable.queue.queue; -import tristanable.queue.listener : TListener; - import core.sync.mutex : Mutex; import core.sync.condition : Condition; import core.sync.exception : SyncError; @@ -54,16 +52,6 @@ public class Queue */ private Mutex queueLock; - /** - * Attached queue listeners - */ - private SList!(TListener) listeners; - - /** - * Lock for the listeners queue - */ - private Mutex listenersLock; - /** * If a message is enqueued prior * to us sleeping then we won't