From 6e2a92a59bd1e8607a60bd096f3066306ec98ef3 Mon Sep 17 00:00:00 2001 From: "Tristan B. Kildaire" Date: Tue, 23 Jun 2020 00:33:01 +0200 Subject: [PATCH] Added all code needed to get it working. --- README.md | 2 +- source/tristanable/manager.d | 58 ++++++++++++++++++++++++++++++++++-- source/tristanable/request.d | 9 ++++-- source/tristanable/watcher.d | 51 ++++++++++++++++++++++++++++++- 4 files changed, 112 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 0004a48..71ab57e 100644 --- a/README.md +++ b/README.md @@ -6,5 +6,5 @@ Tag-based asynchronous messaging framework ## Format ``` -[4 bytes (size-2, little endian)][2 bytes - tag][(2-size) bytes - data] +[4 bytes (size-2, little endian)][8 bytes - tag][(2-size) bytes - data] ``` \ No newline at end of file diff --git a/source/tristanable/manager.d b/source/tristanable/manager.d index ec0cfa9..0f91bfc 100644 --- a/source/tristanable/manager.d +++ b/source/tristanable/manager.d @@ -70,7 +70,6 @@ public final class Manager /* Send the message */ bSendMessage(socket, messageData); - /* Create a new Request */ Request newRequest = new Request(tag); @@ -78,10 +77,53 @@ public final class Manager enqueue(newRequest); } + public bool isValidTag(ulong tag) + { + for(ulong i = 0; i < requestQueue.length; i++) + { + if(requestQueue[i].tag == tag) + { + return true; + } + } + return false; + } + + public ulong getTagPosition(ulong tag) + { + for(ulong i = 0; i < requestQueue.length; i++) + { + if(requestQueue[i].tag == tag) + { + return i; + } + } + return 0; + } + public byte[] receiveMessage(ulong tag) { - /* TODO: Implement me */ - return []; + /* The received data */ + byte[] receivedData; + + /* Loop till fulfilled */ + while(true) + { + /* Lock the queue for reading */ + lockQueue(); + + /* Check if the request has been fulfilled */ + if(requestQueue[getTagPosition(tag)].isFulfilled()) + { + receivedData = requestQueue[getTagPosition(tag)].dataReceived; + break; + } + + /* Unlock the queue */ + unlockQueue(); + } + + return receivedData; } public Request[] getQueue() @@ -94,4 +136,14 @@ public final class Manager { /* TODO: Implement me */ } + + public void lockQueue() + { + queueMutex.lock(); + } + + public void unlockQueue() + { + queueMutex.unlock(); + } } \ No newline at end of file diff --git a/source/tristanable/request.d b/source/tristanable/request.d index 7cc3391..19c0db1 100644 --- a/source/tristanable/request.d +++ b/source/tristanable/request.d @@ -12,7 +12,7 @@ public final class Request /** * The data received */ - private byte[] dataReceived; + public byte[] dataReceived; /** * Whether or not this request has been @@ -30,10 +30,13 @@ public final class Request this.tag = tag; } + public void fulfill(byte[] data) + { + dataReceived = data; + } public bool isFulfilled() { - /* TODO: Implement me */ - return true; + return fulfilled; } } \ No newline at end of file diff --git a/source/tristanable/watcher.d b/source/tristanable/watcher.d index e11e037..c166c22 100644 --- a/source/tristanable/watcher.d +++ b/source/tristanable/watcher.d @@ -1,6 +1,7 @@ module tristanable.watcher; import tristanable.manager : Manager; +import tristanable.request : Request; import std.socket : Socket; import core.thread : Thread; import bmessage : receiveMessage; @@ -32,7 +33,55 @@ public final class Watcher : Thread { while(true) { - /* TODO: Loop here */ + /* The received message (tag+data) */ + byte[] receivedPayload; + + /* The message's tag */ + ulong receivedTag; + + /* The message's data */ + byte[] receivedMessage; + + + /* Receive a message */ + bool recvStatus = receiveMessage(endpoint, receivedPayload); + + /* TODO: Status check */ + + /* Fetch the `tag` */ + receivedTag = *(cast(ulong*)receivedPayload.ptr); + + /* Fetch the `data` */ + receivedMessage = receivedPayload[8..receivedMessage.length]; + + /* Lock the queue for reading */ + manager.lockQueue(); + + /* Get the queue */ + Request[] currentQueue = manager.getQueue(); + + /* Check to see if this is a tag we are awaiting */ + bool foundTag = manager.isValidTag(receivedTag); + ulong requestPosition = manager.getTagPosition(receivedTag); + + + + if(foundTag) + { + + + /* Fulfill the request */ + currentQueue[requestPosition].fulfill(receivedMessage); + + + } + else + { + + } + + /* Unlock the queue */ + manager.unlockQueue(); } } } \ No newline at end of file