tristanable/source/tristanable/queue.d

150 lines
2.5 KiB
D
Raw Normal View History

2020-09-29 09:57:25 +00:00
/**
* Queue
*
* Represents a queue with a tag.
*
* Any messages that are received with
* the matching tag (to this queue) are
* then enqueued to this queue
*/
module tristanable.queue;
import tristanable.queueitem : QueueItem;
2020-09-29 17:18:59 +00:00
import std.socket : Socket;
import core.sync.mutex : Mutex;
import bmessage : bSendMessage = sendMessage;
2020-09-29 17:19:34 +00:00
import core.thread : Thread;
2021-09-08 09:14:03 +00:00
import std.container.dlist;
2021-09-08 12:07:56 +00:00
import std.range : walkLength;
2020-09-29 09:57:25 +00:00
public enum QueuePolicy : ubyte
{
LENGTH_CAP = 1
}
2020-09-29 09:57:25 +00:00
public final class Queue
{
/* This queue's tag */
private ulong tag;
/* The queue */
2021-09-08 09:14:03 +00:00
private DList!(QueueItem) queue;
2020-09-29 09:57:25 +00:00
/* The queue mutex */
private Mutex queueLock;
/**
* Construct a new queue with the given
* tag
*/
this(ulong tag, QueuePolicy flags = cast(QueuePolicy)0)
2020-09-29 09:57:25 +00:00
{
this.tag = tag;
/* Initialize the mutex */
queueLock = new Mutex();
}
public void setLengthCap(ulong lengthCap)
{
this.lengthCap = lengthCap;
}
public ulong getLengthCap(ulong lengthCap)
{
return lengthCap;
}
/**
* Queue policy settings
*/
private ulong lengthCap = 1;
private QueuePolicy flags;
2020-09-29 09:57:25 +00:00
public void enqueue(QueueItem item)
{
/* Lock the queue */
queueLock.lock();
/**
* Check to see if the queue has a length cap
*
* If so then determine whether to drop or
* keep dependent on current capacity
*/
if(flags & QueuePolicy.LENGTH_CAP)
{
if(walkLength(queue[]) == lengthCap)
{
goto unlock;
}
}
2020-09-29 09:57:25 +00:00
/* Add it to the queue */
queue ~= item;
unlock:
2020-09-29 09:57:25 +00:00
/* Unlock the queue */
queueLock.unlock();
}
/**
* Attempts to coninuously dequeue the
* head of the queue
2021-09-08 12:29:27 +00:00
*
* TODO: Add a timeout capability
* TODO: Add tryLock, yield on failure (with loop for recheck ofc)
* TODO: Possible multiple dequeue feature? Like .receive
2020-09-29 09:57:25 +00:00
*/
public QueueItem dequeue()
{
/* The head of the queue */
QueueItem queueHead;
while(!queueHead)
{
/* Lock the queue */
queueLock.lock();
/* Check if we can dequeue anything */
2021-09-08 09:14:03 +00:00
if(!queue.empty())
2020-09-29 09:57:25 +00:00
{
/* If we can then dequeue */
2021-09-08 09:14:03 +00:00
queueHead = queue.front();
queue.removeFront();
2020-09-29 09:57:25 +00:00
/* Chop off the head */
2021-09-08 09:14:03 +00:00
// offWithTheHead();
2020-09-29 09:57:25 +00:00
}
/* Unlock the queue */
queueLock.unlock();
/**
* Move away from this thread, let
* the watcher (presumably) try
* access our queue (successfully)
* by getting a lock on it
*
* Prevents us possibly racing back
* and locking queue again hence
* starving the system
*/
Thread.getThis().yield();
}
return queueHead;
}
/**
* Returns the tag for this queue
*/
public ulong getTag()
{
return tag;
}
}