Queue
- Removed `TListener` references Everything else - Removed reference to old/duplicate `queue.d` module
This commit is contained in:
parent
be91f5ffc5
commit
5cafbb8130
|
@ -4,7 +4,7 @@
|
|||
module tristanable.manager.manager;
|
||||
|
||||
import std.socket;
|
||||
import tristanable.queue : Queue;
|
||||
import tristanable.queue.queue : Queue;
|
||||
import core.sync.mutex : Mutex;
|
||||
import tristanable.manager.watcher : Watcher;
|
||||
import tristanable.encoding : TaggedMessage;
|
||||
|
|
|
@ -11,7 +11,7 @@ import std.socket;
|
|||
import bformat;
|
||||
import tristanable.encoding;
|
||||
import tristanable.exceptions;
|
||||
import tristanable.queue;
|
||||
import tristanable.queue.queue;
|
||||
import bformat.client;
|
||||
|
||||
/**
|
||||
|
|
|
@ -12,7 +12,7 @@ public import tristanable.manager;
|
|||
/**
|
||||
* A queue of queue items all of the same tag
|
||||
*/
|
||||
public import tristanable.queue : Queue;
|
||||
public import tristanable.queue.queue : Queue;
|
||||
|
||||
/**
|
||||
* Error handling type definitions
|
||||
|
|
|
@ -1,238 +0,0 @@
|
|||
/**
|
||||
* A queue of queue items all of the same tag
|
||||
*/
|
||||
module tristanable.queue;
|
||||
|
||||
import core.sync.mutex : Mutex;
|
||||
import core.sync.condition : Condition;
|
||||
import core.sync.exception : SyncError;
|
||||
import std.container.slist : SList;
|
||||
import tristanable.encoding;
|
||||
import core.time : Duration, dur;
|
||||
import tristanable.exceptions;
|
||||
|
||||
version(unittest)
|
||||
{
|
||||
import std.stdio;
|
||||
import std.conv : to;
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a queue whereby messages of a certain tag/id
|
||||
* can be enqueued to (by the `Watcher`) and dequeued from
|
||||
* (by the user application)
|
||||
*/
|
||||
public class Queue
|
||||
{
|
||||
/**
|
||||
* Mutex for the condition variable
|
||||
*/
|
||||
private Mutex mutex;
|
||||
|
||||
/**
|
||||
* The condition variable used to sleep/wake
|
||||
* on queue of events
|
||||
*/
|
||||
private Condition signal;
|
||||
|
||||
/**
|
||||
* The queue of messages
|
||||
*/
|
||||
private SList!(TaggedMessage) queue;
|
||||
|
||||
/**
|
||||
* The lock for the message queue
|
||||
*/
|
||||
private Mutex queueLock;
|
||||
|
||||
/**
|
||||
* This queue's unique ID
|
||||
*/
|
||||
private ulong queueID;
|
||||
|
||||
/**
|
||||
* If a message is enqueued prior
|
||||
* to us sleeping then we won't
|
||||
* wake up and return for it.
|
||||
*
|
||||
* Therefore a periodic wakeup
|
||||
* is required.
|
||||
*/
|
||||
private Duration wakeInterval;
|
||||
|
||||
/**
|
||||
* Constructs a new Queue and immediately sets up the notification
|
||||
* sub-system for the calling thread (the thread constructing this
|
||||
* object) which ensures that a call to dequeue will immediately
|
||||
* unblock on the first message received under this tag
|
||||
*
|
||||
* Params:
|
||||
* queueID = the id to use for this queue
|
||||
*/
|
||||
this(ulong queueID)
|
||||
{
|
||||
/* Initialize the queue lock */
|
||||
this.queueLock = new Mutex();
|
||||
|
||||
/* Initialize the condition variable */
|
||||
this.mutex = new Mutex();
|
||||
this.signal = new Condition(this.mutex);
|
||||
|
||||
/* Set the queue id */
|
||||
this.queueID = queueID;
|
||||
|
||||
/* Set the slumber interval */
|
||||
this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current wake interval
|
||||
* for the queue checker
|
||||
*
|
||||
* Returns: the `Duration`
|
||||
*/
|
||||
public Duration getWakeInterval()
|
||||
{
|
||||
return this.wakeInterval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the wake up interval
|
||||
*
|
||||
* Params:
|
||||
* interval = the new interval
|
||||
*/
|
||||
public void setWakeInterval(Duration interval)
|
||||
{
|
||||
this.wakeInterval = interval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enqueues the provided tagged message onto this queue
|
||||
* and then wakes up any thread that has called dequeue
|
||||
* on this queue as well
|
||||
*
|
||||
* On error enqueueing a `TristanableException` will be
|
||||
* thrown.
|
||||
*
|
||||
* Params:
|
||||
* message = the TaggedMessage to enqueue
|
||||
*/
|
||||
public void enqueue(TaggedMessage message)
|
||||
{
|
||||
version(unittest)
|
||||
{
|
||||
writeln("queue["~to!(string)(queueID)~"]: Enqueuing '"~to!(string)(message)~"'...");
|
||||
}
|
||||
|
||||
scope(exit)
|
||||
{
|
||||
version(unittest)
|
||||
{
|
||||
writeln("queue["~to!(string)(queueID)~"]: Enqueued '"~to!(string)(message)~"'!");
|
||||
}
|
||||
|
||||
/* Unlock the item queue */
|
||||
queueLock.unlock();
|
||||
}
|
||||
|
||||
/* Lock the item queue */
|
||||
queueLock.lock();
|
||||
|
||||
/* Add the item to the queue */
|
||||
queue.insertAfter(queue[], message);
|
||||
|
||||
/* Wake up anyone wanting to dequeue from us */
|
||||
try
|
||||
{
|
||||
// TODO: Make us wait on the event (optional with a time-out)
|
||||
signal.notifyAll();
|
||||
}
|
||||
catch(SyncError snozErr)
|
||||
{
|
||||
// Throw an exception on a fatal exception
|
||||
throw new TristanableException(ErrorType.ENQUEUE_FAILED);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Make a version of this which can time out
|
||||
|
||||
/**
|
||||
* Blocks till a message can be dequeued from this queue
|
||||
*
|
||||
* On error dequeueing a `TristanableException` will be
|
||||
* thrown.
|
||||
*
|
||||
* Returns: the dequeued TaggedMessage
|
||||
*/
|
||||
public TaggedMessage dequeue()
|
||||
{
|
||||
version(unittest)
|
||||
{
|
||||
writeln("queue["~to!(string)(queueID)~"]: Dequeueing...");
|
||||
}
|
||||
|
||||
/* The dequeued message */
|
||||
TaggedMessage dequeuedMessage;
|
||||
|
||||
scope(exit)
|
||||
{
|
||||
version(unittest)
|
||||
{
|
||||
writeln("queue["~to!(string)(queueID)~"]: Dequeued '"~to!(string)(dequeuedMessage)~"'!");
|
||||
}
|
||||
}
|
||||
|
||||
/* Block till we dequeue a message successfully */
|
||||
while(dequeuedMessage is null)
|
||||
{
|
||||
scope(exit)
|
||||
{
|
||||
// Unlock the mutex
|
||||
this.mutex.unlock();
|
||||
}
|
||||
|
||||
// Lock the mutex
|
||||
this.mutex.lock();
|
||||
|
||||
try
|
||||
{
|
||||
this.signal.wait(this.wakeInterval);
|
||||
}
|
||||
catch(SyncError e)
|
||||
{
|
||||
// Throw an exception on a fatal exception
|
||||
throw new TristanableException(ErrorType.DEQUEUE_FAILED);
|
||||
}
|
||||
|
||||
|
||||
/* Lock the item queue */
|
||||
queueLock.lock();
|
||||
|
||||
/* Consume the front of the queue (if non-empty) */
|
||||
if(!queue.empty())
|
||||
{
|
||||
/* Pop the front item off */
|
||||
dequeuedMessage = queue.front();
|
||||
|
||||
/* Remove the front item from the queue */
|
||||
queue.linearRemoveElement(dequeuedMessage);
|
||||
}
|
||||
|
||||
/* Unlock the item queue */
|
||||
queueLock.unlock();
|
||||
}
|
||||
|
||||
return dequeuedMessage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the id/tag of this queue
|
||||
*
|
||||
* Returns: the queue's id
|
||||
*/
|
||||
public ulong getID()
|
||||
{
|
||||
return queueID;
|
||||
}
|
||||
}
|
|
@ -3,8 +3,6 @@
|
|||
*/
|
||||
module tristanable.queue.queue;
|
||||
|
||||
import tristanable.queue.listener : TListener;
|
||||
|
||||
import core.sync.mutex : Mutex;
|
||||
import core.sync.condition : Condition;
|
||||
import core.sync.exception : SyncError;
|
||||
|
@ -54,16 +52,6 @@ public class Queue
|
|||
*/
|
||||
private Mutex queueLock;
|
||||
|
||||
/**
|
||||
* Attached queue listeners
|
||||
*/
|
||||
private SList!(TListener) listeners;
|
||||
|
||||
/**
|
||||
* Lock for the listeners queue
|
||||
*/
|
||||
private Mutex listenersLock;
|
||||
|
||||
/**
|
||||
* If a message is enqueued prior
|
||||
* to us sleeping then we won't
|
||||
|
|
Loading…
Reference in New Issue