From 42c6b111c8353c723ac4fb67779cf9018745e522 Mon Sep 17 00:00:00 2001 From: "Tristan B. Velloza Kildaire" Date: Sun, 1 Oct 2023 20:51:08 +0200 Subject: [PATCH] Migrate from libsnooze (#8) * Dub - Removed `libsnooze` dependency * Queue - Removed `libsnooze` imports * Queue - Added mutex+condition variable * Queue - Removed old `ensure()` call * Queue - Switched one thing over to mutex+condvar * Queue - Switched to using condition variable - Added configurable slumber interval --- dub.json | 3 +- source/tristanable/queue.d | 117 +++++++++++++++++++++---------------- 2 files changed, 67 insertions(+), 53 deletions(-) diff --git a/dub.json b/dub.json index fd61123..1f2fe98 100644 --- a/dub.json +++ b/dub.json @@ -4,8 +4,7 @@ ], "copyright": "Copyright © 2023, Tristan B. Kildaire", "dependencies": { - "bformat": ">=4.1.1", - "libsnooze": ">=1.3.0-beta" + "bformat": ">=4.1.1" }, "description": "Tristanable network message queuing framework", "homepage": "https://deavmi.assigned.network/projects/tristanable", diff --git a/source/tristanable/queue.d b/source/tristanable/queue.d index 0271934..00cddd9 100644 --- a/source/tristanable/queue.d +++ b/source/tristanable/queue.d @@ -3,14 +3,12 @@ */ module tristanable.queue; -// TODO: Examine the below import which seemingly fixes stuff for libsnooze -import libsnooze.clib; -import libsnooze; - import core.sync.mutex : Mutex; +import core.sync.condition : Condition; +import core.sync.exception : SyncError; import std.container.slist : SList; import tristanable.encoding; -import core.thread : dur; +import core.time : Duration, dur; import tristanable.exceptions; version(unittest) @@ -27,10 +25,15 @@ version(unittest) public class Queue { /** - * The libsnooze event used to sleep/wake - * on queue events + * Mutex for the condition variable */ - private Event event; + private Mutex mutex; + + /** + * The condition variable used to sleep/wake + * on queue of events + */ + private Condition signal; /** * The queue of messages @@ -47,6 +50,15 @@ public class Queue */ private ulong queueID; + /** + * If a message is enqueued prior + * to us sleeping then we won't + * wake up and return for it. + * + * Therefore a periodic wakeup + * is required. + */ + private Duration wakeInterval; /** * Constructs a new Queue and immediately sets up the notification @@ -62,14 +74,37 @@ public class Queue /* Initialize the queue lock */ this.queueLock = new Mutex(); - /* Initialize the event */ - this.event = new Event(); + /* Initialize the condition variable */ + this.mutex = new Mutex(); + this.signal = new Condition(this.mutex); /* Set the queue id */ this.queueID = queueID; - /* Ensure pipe existence (see https://deavmi.assigned.network/git/deavmi/tristanable/issues/5) */ - event.wait(dur!("seconds")(0)); + /* Set the slumber interval */ + this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value + } + + /** + * Returns the current wake interval + * for the queue checker + * + * Returns: the `Duration` + */ + public Duration getWakeInterval() + { + return this.wakeInterval; + } + + /** + * Sets the wake up interval + * + * Params: + * interval = the new interval + */ + public void setWakeInterval(Duration interval) + { + this.wakeInterval = interval; } /** @@ -111,9 +146,9 @@ public class Queue try { // TODO: Make us wait on the event (optional with a time-out) - event.notifyAll(); + signal.notifyAll(); } - catch(FatalException snozErr) + catch(SyncError snozErr) { // Throw an exception on a fatal exception throw new TristanableException(ErrorType.ENQUEUE_FAILED); @@ -151,45 +186,25 @@ public class Queue /* Block till we dequeue a message successfully */ while(dequeuedMessage is null) { - /** - * Call `wait()` and catch any interrupts - * in which case loop back and call `wait()` - * again - */ - while(true) + scope(exit) { - try - { - // TODO: Make us wait on the event (optional with a time-out) - event.wait(); - } - catch(InterruptedException e) - { - version(unittest) - { - import std.stdio; - writeln("dequeue() had libsnooze wait() get interrupted!"); - } - - // Retry the wait() - continue; - } - catch(FatalException fatalErr) - { - version(unittest) - { - import std.stdio; - writeln("dequeue() had libsnooze wait() get FATALLY fail! Exception will now throw..."); - } - - // Throw an exception on a fatal exception - throw new TristanableException(ErrorType.DEQUEUE_FAILED); - } - - // On successful wait() wake-up exit this wait()-retry loop - break; + // Unlock the mutex + this.mutex.unlock(); } - + + // Lock the mutex + this.mutex.lock(); + + try + { + this.signal.wait(this.wakeInterval); + } + catch(SyncError e) + { + // Throw an exception on a fatal exception + throw new TristanableException(ErrorType.DEQUEUE_FAILED); + } + /* Lock the item queue */ queueLock.lock();