Added all code needed to get it working.

This commit is contained in:
Tristan B. Kildaire 2020-06-23 00:33:01 +02:00
parent 71879638c6
commit 6e2a92a59b
4 changed files with 112 additions and 8 deletions

View File

@ -6,5 +6,5 @@ Tag-based asynchronous messaging framework
## Format
```
[4 bytes (size-2, little endian)][2 bytes - tag][(2-size) bytes - data]
[4 bytes (size-2, little endian)][8 bytes - tag][(2-size) bytes - data]
```

View File

@ -70,7 +70,6 @@ public final class Manager
/* Send the message */
bSendMessage(socket, messageData);
/* Create a new Request */
Request newRequest = new Request(tag);
@ -78,10 +77,53 @@ public final class Manager
enqueue(newRequest);
}
public bool isValidTag(ulong tag)
{
for(ulong i = 0; i < requestQueue.length; i++)
{
if(requestQueue[i].tag == tag)
{
return true;
}
}
return false;
}
public ulong getTagPosition(ulong tag)
{
for(ulong i = 0; i < requestQueue.length; i++)
{
if(requestQueue[i].tag == tag)
{
return i;
}
}
return 0;
}
public byte[] receiveMessage(ulong tag)
{
/* TODO: Implement me */
return [];
/* The received data */
byte[] receivedData;
/* Loop till fulfilled */
while(true)
{
/* Lock the queue for reading */
lockQueue();
/* Check if the request has been fulfilled */
if(requestQueue[getTagPosition(tag)].isFulfilled())
{
receivedData = requestQueue[getTagPosition(tag)].dataReceived;
break;
}
/* Unlock the queue */
unlockQueue();
}
return receivedData;
}
public Request[] getQueue()
@ -94,4 +136,14 @@ public final class Manager
{
/* TODO: Implement me */
}
public void lockQueue()
{
queueMutex.lock();
}
public void unlockQueue()
{
queueMutex.unlock();
}
}

View File

@ -12,7 +12,7 @@ public final class Request
/**
* The data received
*/
private byte[] dataReceived;
public byte[] dataReceived;
/**
* Whether or not this request has been
@ -30,10 +30,13 @@ public final class Request
this.tag = tag;
}
public void fulfill(byte[] data)
{
dataReceived = data;
}
public bool isFulfilled()
{
/* TODO: Implement me */
return true;
return fulfilled;
}
}

View File

@ -1,6 +1,7 @@
module tristanable.watcher;
import tristanable.manager : Manager;
import tristanable.request : Request;
import std.socket : Socket;
import core.thread : Thread;
import bmessage : receiveMessage;
@ -32,7 +33,55 @@ public final class Watcher : Thread
{
while(true)
{
/* TODO: Loop here */
/* The received message (tag+data) */
byte[] receivedPayload;
/* The message's tag */
ulong receivedTag;
/* The message's data */
byte[] receivedMessage;
/* Receive a message */
bool recvStatus = receiveMessage(endpoint, receivedPayload);
/* TODO: Status check */
/* Fetch the `tag` */
receivedTag = *(cast(ulong*)receivedPayload.ptr);
/* Fetch the `data` */
receivedMessage = receivedPayload[8..receivedMessage.length];
/* Lock the queue for reading */
manager.lockQueue();
/* Get the queue */
Request[] currentQueue = manager.getQueue();
/* Check to see if this is a tag we are awaiting */
bool foundTag = manager.isValidTag(receivedTag);
ulong requestPosition = manager.getTagPosition(receivedTag);
if(foundTag)
{
/* Fulfill the request */
currentQueue[requestPosition].fulfill(receivedMessage);
}
else
{
}
/* Unlock the queue */
manager.unlockQueue();
}
}
}