diff --git a/dub.json b/dub.json index a3634d4..0d93ba3 100644 --- a/dub.json +++ b/dub.json @@ -2,13 +2,12 @@ "authors": [ "Tristan B. Kildaire" ], - "copyright": "Copyright © 2020, Tristan B. Kildaire", + "copyright": "Copyright © 2023, Tristan B. Kildaire", "dependencies": { - "bformat": "~>3.1.3" + "libsnooze": "0.2.5" }, "description": "Tag-based asynchronous messaging framework", "license": "LGPL-3.0", "name": "tristanable", - "targetType": "library", - "sourcePaths": ["source/tristanable"] + "targetType": "library" } \ No newline at end of file diff --git a/dub.selections.json b/dub.selections.json index b638810..1d9ad1c 100644 --- a/dub.selections.json +++ b/dub.selections.json @@ -1,6 +1,7 @@ { "fileVersion": 1, "versions": { - "bformat": "3.1.3" + "bformat": "3.1.3", + "libsnooze": "0.2.5" } } diff --git a/example/client/.gitignore b/example/client/.gitignore deleted file mode 100644 index 689dd76..0000000 --- a/example/client/.gitignore +++ /dev/null @@ -1,15 +0,0 @@ -.dub -docs.json -__dummy.html -docs/ -/example -example.so -example.dylib -example.dll -example.a -example.lib -example-test-* -*.exe -*.o -*.obj -*.lst diff --git a/example/client/dub.json b/example/client/dub.json deleted file mode 100644 index 9e1c483..0000000 --- a/example/client/dub.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "authors": [ - "Tristan B. Kildaire" - ], - "copyright": "Copyright © 2020, Tristan B. Kildaire", - "dependencies": { - "tristanable": "~>2.2.1" - }, - "description": "A minimal D application.", - "license": "proprietary", - "name": "example" -} \ No newline at end of file diff --git a/example/client/dub.selections.json b/example/client/dub.selections.json deleted file mode 100644 index 87e1076..0000000 --- a/example/client/dub.selections.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "fileVersion": 1, - "versions": { - "bformat": "1.0.8", - "tristanable": "2.2.1" - } -} diff --git a/example/client/source/app.d b/example/client/source/app.d deleted file mode 100644 index 1aec16c..0000000 --- a/example/client/source/app.d +++ /dev/null @@ -1,59 +0,0 @@ -import std.stdio; -import tristanable.manager : Manager; -import std.socket; -import core.thread; - -void main() -{ - writeln("Edit source/app.d to start your project."); - Socket socket = new Socket(AddressFamily.INET, SocketType.STREAM, ProtocolType.TCP); - socket.connect(parseAddress("127.0.0.1",7777)); - Manager manager = new Manager(socket); - - class bruh : Thread - { - this() - { - super(&run); - } - - private void run() - { - while(true) - { - manager.lockQueue(); - writeln(manager.getQueue()); - manager.unlockQueue(); - import core.thread; - - Thread.sleep(dur!("seconds")(1)); - } - } - } - - new bruh().start(); - - manager.sendMessage(69, [77]); - manager.sendMessage(70, [78]); - - - byte[] receivedKaka = manager.receiveMessage(69); - writeln(receivedKaka); - - receivedKaka = manager.receiveMessage(70); - writeln(receivedKaka); - - manager.sendMessage(70, [78]); - - receivedKaka = manager.receiveMessage(70); - writeln(receivedKaka); - - - - - - - - - -} diff --git a/example/server.d b/example/server.d deleted file mode 100644 index e69de29..0000000 diff --git a/example/server/.gitignore b/example/server/.gitignore deleted file mode 100644 index 99ecae8..0000000 --- a/example/server/.gitignore +++ /dev/null @@ -1,15 +0,0 @@ -.dub -docs.json -__dummy.html -docs/ -/server -server.so -server.dylib -server.dll -server.a -server.lib -server-test-* -*.exe -*.o -*.obj -*.lst diff --git a/example/server/dub.json b/example/server/dub.json deleted file mode 100644 index a153999..0000000 --- a/example/server/dub.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "authors": [ - "Tristan B. Kildaire" - ], - "copyright": "Copyright © 2020, Tristan B. Kildaire", - "dependencies": { - "bformat": "~>1.0.8", - "tristanable": "~>2.2.1" - }, - "description": "A minimal D application.", - "license": "proprietary", - "name": "server" -} \ No newline at end of file diff --git a/example/server/dub.selections.json b/example/server/dub.selections.json deleted file mode 100644 index 87e1076..0000000 --- a/example/server/dub.selections.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "fileVersion": 1, - "versions": { - "bformat": "1.0.8", - "tristanable": "2.2.1" - } -} diff --git a/example/server/source/app.d b/example/server/source/app.d deleted file mode 100644 index ab239d4..0000000 --- a/example/server/source/app.d +++ /dev/null @@ -1,37 +0,0 @@ -import std.stdio; -import std.socket; -import tristanable.encoding : DataMessage; -import bmessage; -import core.thread; - -void main() -{ - writeln("Edit source/app.d to start your project."); - Socket socket = new Socket(AddressFamily.INET, SocketType.STREAM, ProtocolType.TCP); - socket.bind(parseAddress("127.0.0.1",7777)); - socket.listen(1); - - Socket conn = socket.accept(); - byte[] receivedData; - - while(true) - { - receiveMessage(conn, receivedData); - - DataMessage message = DataMessage.decode(receivedData); - - writeln("Tag: ", message.tag); - writeln("Data: ", message.data); - - DataMessage d = new DataMessage(70, [2]); - sendMessage(conn, d.encode()); - - d = new DataMessage(69, [1]); - Thread.sleep(dur!("seconds")(5)); - sendMessage(conn, d.encode()); - } - - - - -} diff --git a/source/tristanable/encoding.d b/source/tristanable/encoding.d deleted file mode 100644 index c28bae4..0000000 --- a/source/tristanable/encoding.d +++ /dev/null @@ -1,67 +0,0 @@ -module tristanable.encoding; - -public final class DataMessage -{ - - private ulong tag; - private byte[] data; - - public static DataMessage decode(byte[] bytes) - { - /* Fetch the `tag` */ - ulong receivedTag = *(cast(ulong*)bytes.ptr); - - /* Fetch the `data` */ - byte[] receivedData = bytes[8..bytes.length]; - - return new DataMessage(receivedTag, receivedData); - } - - /** - * Constructs a new DataMessage with - * the give `tag` and bytes `data` - */ - this(ulong tag, byte[] data) - { - this.tag = tag; - this.data = data; - } - - public byte[] encode() - { - /* Construct the message array */ - byte[] messageData; - - /* Add the `tag` bytes */ - messageData ~= *(cast(byte*)&tag); - messageData ~= *(cast(byte*)&tag+1); - messageData ~= *(cast(byte*)&tag+2); - messageData ~= *(cast(byte*)&tag+3); - messageData ~= *(cast(byte*)&tag+4); - messageData ~= *(cast(byte*)&tag+5); - messageData ~= *(cast(byte*)&tag+6); - messageData ~= *(cast(byte*)&tag+7); - - /* Add the `data` bytes (the actual message) */ - messageData ~= data; - - return messageData; - } - - public byte[] getData() - { - return data; - } - - public ulong getTag() - { - return tag; - } -} - -public static byte[] encodeForSend(DataMessage message) -{ - import bmessage; - - return encodeBformat(message.encode()); -} \ No newline at end of file diff --git a/source/tristanable/exceptions.d b/source/tristanable/exceptions.d deleted file mode 100644 index c95e14e..0000000 --- a/source/tristanable/exceptions.d +++ /dev/null @@ -1,29 +0,0 @@ -module tristanable.exceptions; - -import tristanable.manager; -import tristanable.queue : Queue; - -public final class TristanableException : Exception -{ - this(Manager manager, string message) - { - super(generateMessage(message)); - } - - private string generateMessage(string errMesg) - { - string msg; - - // msg = "TRistanable failure: "~errMesg~"\n\n"; - // msg ~= "Queue stats:\n\n" - - // Queue[] queues = manager.getQueues(); - // foreach(Queue queue; queues) - // { - // msg ~= "Queue["~to!(string)(queue.getTag())~"]: "~ - // } - // msg ~= manager.getQueues() - - return msg; - } -} \ No newline at end of file diff --git a/source/tristanable/manager.d b/source/tristanable/manager.d index e071d4e..3222138 100644 --- a/source/tristanable/manager.d +++ b/source/tristanable/manager.d @@ -1,217 +1,10 @@ module tristanable.manager; -import std.socket : Socket; -import core.sync.mutex : Mutex; -import bmessage : bSendMessage = sendMessage; -import tristanable.queue : Queue; -import tristanable.watcher; -import std.container.dlist; -import tristanable.exceptions; - -/** -* Manager -* -* This is the core class that is to be instantiated -* that represents an instance of the tristanable -* framework. It is passed a Socket from which it -* reads from (using a bformat block reader). -* -* It contains a Watcher which does the reading and -* appending to respective queues (the user need not -* worry about this factum). -* -* The functions provided allow users to wait in a -* tight loop to dequeue ("receive" in a blcoking mannger) -* from a specified queue. -*/ -public final class Manager +/** + * Allows one to add new queues, control + * existing ones by waiting on them etc + */ +public class Manager { - /* All queues */ - private DList!(Queue) queues; - private Mutex queuesLock; - - /* TODO Add drop queue? */ - - /** - * The remote host - */ - private Socket socket; - - private Watcher watcher; - - - /** - * Constructs a new Manager with the given - * endpoint Socket - * - */ - this(Socket socket) - { - /* TODO: Make sure the socket is in STREAM mode */ - - /* Set the socket */ - this.socket = socket; - - /* Initialize the queues mutex */ - queuesLock = new Mutex(); - - /* Initialize the watcher */ - watcher = new Watcher(this, socket); - } - - public Queue getQueue(ulong tag) - { - Queue matchingQueue; - - queuesLock.lock(); - - foreach(Queue queue; queues) - { - if(queue.getTag() == tag) - { - matchingQueue = queue; - break; - } - } - - queuesLock.unlock(); - - return matchingQueue; - } - - /* TODO: Probably remove this or keep it */ - public bool isValidTag(ulong tag) - { - return !(getQueue(tag) is null); - } - - /** - * Returns a new queue with a new ID, - * if all IDs are used then it returns - * null - * - * Use this if you don't care about reserving - * queues IDs and just want a throwaway queue - * - * FIXME: All tags in use, this won't handle it - */ - public Queue generateQueue() - { - /* Newly generated queue */ - Queue newQueue; - - queuesLock.lock(); - - ulong curGuess = 0; - bool bad = true; - reguess: while(bad) - { - if(isValidTag(curGuess)) - { - curGuess++; - continue reguess; - } - - bad = false; - } - - /* Create the new queue with the free id found */ - newQueue = new Queue(curGuess); - - /* Add the queue (recursive mutex) */ - addQueue(newQueue); - - queuesLock.unlock(); - - - return newQueue; - } - - public Queue[] getQueues() - { - Queue[] queues; - queuesLock.lock(); - - foreach(Queue queue; this.queues) - { - queues ~= queue; - } - - queuesLock.unlock(); - - return queues; - } - - /** - * Removes the given Queue, `queue`, from the manager - * - * Throws a TristanableException if the id of the - * queue wanting to be removed is not in use by any - * queue already added - */ - public void removeQueue(Queue queue) - { - queuesLock.lock(); - - /* Make sure such a tag exists */ - if(isValidTag(queue.getTag())) - { - queues.linearRemoveElement(queue); - } - else - { - /* Unlock queue before throwing an exception */ - queuesLock.unlock(); - throw new TristanableException(this, "Cannot remove a queue with an id not in use"); - } - - queuesLock.unlock(); - } - - /** - * Adds the given Queue, `queue`, to the manager - * - * Throws a TristanableException if the id of the - * queue wanting to be added is already in use by - * another already added queue - */ - public void addQueue(Queue queue) - { - queuesLock.lock(); - - /* Make sure such a tag does not exist already */ - if(!isValidTag(queue.getTag())) - { - queues ~= queue; - } - else - { - /* Unlock queue before throwing an exception */ - queuesLock.unlock(); - throw new TristanableException(this, "Cannot add queue with id already in use"); - } - - queuesLock.unlock(); - } - - public Socket getSocket() - { - return socket; - } - - /** - * TODO: Comment - * TODO: Testing - */ - public void shutdown() - { - /* TODO: Implement me */ - - /* Make the loop stop whenever it does */ - watcher.shutdown(); - - /* Wait for the thread to end */ - watcher.join(); - } } \ No newline at end of file diff --git a/source/tristanable/package.d b/source/tristanable/package.d index af77314..1f5dead 100644 --- a/source/tristanable/package.d +++ b/source/tristanable/package.d @@ -1,7 +1,2 @@ module tristanable; -public import tristanable.encoding; -public import tristanable.manager; -public import tristanable.queue; -public import tristanable.queueitem; -public import tristanable.watcher; diff --git a/source/tristanable/queue.d b/source/tristanable/queue.d index ecde08b..c7613da 100644 --- a/source/tristanable/queue.d +++ b/source/tristanable/queue.d @@ -1,172 +1,18 @@ -/** -* Queue -* -* Represents a queue with a tag. -* -* Any messages that are received with -* the matching tag (to this queue) are -* then enqueued to this queue -*/ - module tristanable.queue; -import tristanable.queueitem : QueueItem; -import std.socket : Socket; -import core.sync.mutex : Mutex; -import bmessage : bSendMessage = sendMessage; -import core.thread : Thread; -import std.container.dlist; -import std.range : walkLength; - -public enum QueuePolicy : ubyte +public class Queue { - LENGTH_CAP = 1 -} + private this() + { -public final class Queue -{ - /* This queue's tag */ - private ulong tag; + } - /* The queue */ - private DList!(QueueItem) queue; + public static Queue newQueue(ulong queueID) + { + Queue queue; - /* The queue mutex */ - private Mutex queueLock; + // TODO: Implement me - /** - * Construct a new queue with the given - * tag - */ - this(ulong tag, QueuePolicy flags = cast(QueuePolicy)0) - { - this.tag = tag; - - /* Initialize the mutex */ - queueLock = new Mutex(); - - this.flags = flags; - } - - public void setLengthCap(ulong lengthCap) - { - this.lengthCap = lengthCap; - } - - public ulong getLengthCap(ulong lengthCap) - { - return lengthCap; - } - - /** - * Queue policy settings - */ - private ulong lengthCap = 1; - private QueuePolicy flags; - - - public void enqueue(QueueItem item) - { - /* Lock the queue */ - queueLock.lock(); - - /** - * Check to see if the queue has a length cap - * - * If so then determine whether to drop or - * keep dependent on current capacity - */ - if(flags & QueuePolicy.LENGTH_CAP) - { - if(walkLength(queue[]) == lengthCap) - { - goto unlock; - } - } - - /* Add it to the queue */ - queue ~= item; - - unlock: - - /* Unlock the queue */ - queueLock.unlock(); - } - - /** - * Returns true if this queue has items ready - * to be dequeued, false otherwise - */ - public bool poll() - { - /* Status */ - bool status; - - /* Lock the queue */ - queueLock.lock(); - - status = !queue.empty(); - - /* Unlock the queue */ - queueLock.unlock(); - - return status; - } - - /** - * Attempts to coninuously dequeue the - * head of the queue - * - * TODO: Add a timeout capability - * TODO: Add tryLock, yield on failure (with loop for recheck ofc) - * TODO: Possible multiple dequeue feature? Like .receive - */ - public QueueItem dequeue() - { - /* The head of the queue */ - QueueItem queueHead; - - while(!queueHead) - { - /* Lock the queue */ - queueLock.lock(); - - /* Check if we can dequeue anything */ - if(!queue.empty()) - { - /* If we can then dequeue */ - queueHead = queue.front(); - queue.removeFront(); - - /* Chop off the head */ - // offWithTheHead(); - } - - /* Unlock the queue */ - queueLock.unlock(); - - - /** - * Move away from this thread, let - * the watcher (presumably) try - * access our queue (successfully) - * by getting a lock on it - * - * Prevents us possibly racing back - * and locking queue again hence - * starving the system - */ - Thread.getThis().yield(); - } - - return queueHead; - } - - /** - * Returns the tag for this queue - */ - public ulong getTag() - { - return tag; - } + return queue; + } } \ No newline at end of file diff --git a/source/tristanable/queueitem.d b/source/tristanable/queueitem.d deleted file mode 100644 index f8b61e9..0000000 --- a/source/tristanable/queueitem.d +++ /dev/null @@ -1,18 +0,0 @@ -module tristanable.queueitem; - -public final class QueueItem -{ - /* This item's data */ - private byte[] data; - - /* TODO: */ - this(byte[] data) - { - this.data = data; - } - - public byte[] getData() - { - return data; - } -} \ No newline at end of file diff --git a/source/tristanable/watcher.d b/source/tristanable/watcher.d deleted file mode 100644 index 0e2d508..0000000 --- a/source/tristanable/watcher.d +++ /dev/null @@ -1,107 +0,0 @@ -module tristanable.watcher; - -import std.socket : Socket; -import core.sync.mutex : Mutex; -import bmessage : receiveMessage; -import tristanable.queue : Queue; -import tristanable.queueitem : QueueItem; -import tristanable.manager : Manager; -import core.thread : Thread; -import tristanable.encoding; -import tristanable.exceptions; - -public final class Watcher : Thread -{ - /* The manager */ - private Manager manager; - - /* The socket to read from */ - private Socket socket; - - private bool running; - - this(Manager manager, Socket endpoint) - { - super(&run); - this.manager = manager; - socket = endpoint; - - running = true; - start(); - } - - public void shutdown() - { - running=false; - - /* Close the socket, causing an error, breaking the event loop */ - socket.close(); - - } - - private void run() - { - /* Continuously dequeue tristanable packets from socket */ - while(true) - { - /* Receive payload (tag+data) */ - byte[] receivedPayload; - - /* Block for socket response */ - bool recvStatus = receiveMessage(socket, receivedPayload); - - /* If the receive was successful */ - if(recvStatus) - { - /* Decode the ttag-encoded message */ - DataMessage message = DataMessage.decode(receivedPayload); - - /* TODO: Remove isTag, improve later, oneshot */ - - /* The matching queue (if any) */ - Queue queue = manager.getQueue(message.getTag()); - - /* If the tag belongs to a queue */ - if(queue) - { - /* Add an item to this queue */ - queue.enqueue(new QueueItem(message.getData())); - } - /* If the tag is unknwon */ - else - { - /* TODO: Add to dropped queue? */ - - /* Do nothing */ - } - } - /* If the receive failed */ - else - { - /* TODO: depending on `running`, different error */ - - /* TODO: Stop everything */ - break; - } - - /** - * Like in `dequeue` we don't want the possibility - * of racing back to the top of the loop and locking - * the mutex again right before a thread switch, - * so we make sure that a switch occurs to a different - * thread - */ - Thread.getThis().yield(); - } - - /* Check if we had an error */ - if(running) - { - throw new TristanableException(manager, "bformat socket error"); - } - else - { - /* Actual shut down, do nothing */ - } - } -} \ No newline at end of file diff --git a/todo b/todo deleted file mode 100644 index 481aa86..0000000 --- a/todo +++ /dev/null @@ -1,10 +0,0 @@ -- [x] Use queues -- [x] Immediate head chop after dequeue -- [x] Thread.yields -- [ ] Sleep option -- [x] Use linked list for queues (increase performance) -- [ ] shutdown option -- [x] Queue policies - - [x] Length cap -- [ ] Exceptions -- [x] Queue deletion \ No newline at end of file diff --git a/tristanable b/tristanable deleted file mode 160000 index cb68e9f..0000000 --- a/tristanable +++ /dev/null @@ -1 +0,0 @@ -Subproject commit cb68e9f673dd7b2f490ee4ff2262e45b41a90a0f diff --git a/tristanable-test-library b/tristanable-test-library index 0f0ba67..e9c700c 100755 Binary files a/tristanable-test-library and b/tristanable-test-library differ