Compare commits

..

6 Commits

32 changed files with 906 additions and 1562 deletions

View File

@ -1,41 +0,0 @@
# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.
name: D
on:
push:
branches: [ "**" ]
pull_request:
branches: [ "**" ]
permissions:
contents: read
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: dlang-community/setup-dlang@4c99aa991ce7d19dd3064de0a4f2f6b2f152e2d7
- name: Install Doveralls (code coverage tool)
run: |
dub fetch doveralls
sudo apt install libcurl4-openssl-dev
- name: 'Build & Test'
run: |
# Build the project, with its main file included, without unittests
dub build --compiler=$DC
# Build and run tests, as defined by `unittest` configuration
# In this mode, `mainSourceFile` is excluded and `version (unittest)` are included
# See https://dub.pm/package-format-json.html#configurations
dub test --compiler=$DC --coverage
- name: Coverage upload
run: |
dub run doveralls -- -t ${{secrets.COVERALLS_REPO_TOKEN}}

3
.gitignore vendored
View File

@ -22,6 +22,3 @@ docs/
# Code coverage
*.lst
source/tristanable/queue.d
dub.selections.json
tristanable-test-library

192
README.md
View File

@ -1,175 +1,61 @@
![](branding/logo_small.png)
![](https://code.dlang.org/packages/tristanable/logo?s=5ef1c9f1250f57dd4c37efbf)
tristanable
===========
[![D](https://github.com/deavmi/tristanable/actions/workflows/d.yml/badge.svg)](https://github.com/deavmi/tristanable/actions/workflows/d.yml) ![DUB](https://img.shields.io/dub/v/tristanable?color=%23c10000ff%20&style=flat-square) ![DUB](https://img.shields.io/dub/dt/tristanable?style=flat-square) ![DUB](https://img.shields.io/dub/l/tristanable?style=flat-square) [![Coverage Status](https://coveralls.io/repos/github/deavmi/tristanable/badge.svg?branch=master)](https://coveralls.io/github/deavmi/tristanable?branch=master)
**Tristanable** is a library for D-based libraries and applications that need a way to receive variable-length messages of different types (via a `Socket`) and place these messages into their own respectively tagged queues indicated by their _"type"_ or `id`.
**Tristanable** is a library for D-based libraries and applications that need a way to receive variable-length messages of different types (via a `Socket`) and place these messages into their own resepctively tagged queues indicated by their _"type"_ or `id`.
## What problems does it solve?
### Human example
Say now you made a request to a server with a tag `1` and expect a reply with that same tag `1`. Now, for a moment, think about what would happen in a tagless system. You would be expecting a reply, say now the weather report for your area, but what if the server has another thread that writes an instant messenger notification to the server's socket before the weather message is sent? Now you will interpret those bytes as if they were a weather message.
Say now you made a request to a server with a tag `1` and expect a reply with that same tag `1`. Now, for a moment, think about what would happen in a tagless system. You would be expecting a reply, say now the weather report for your area, but what if the server has another thread that writes an instant messenger notification to the server's socket before the weather message is sent? Now you will inetrpret those bytes as if they were a weather message.
Tristanable provides a way for you to receive the "IM notification first" but block and dequeue (when it arrives in the queue) for the "weather report". Irrespective of wether (no pun intended) the weather report arrives before the "IM notification" or after.
Tristanable provides a way for you to receive the "IM notification first" but block and dequeue (when it arrives in the queue) for the "weather report". Irresepctoive of wether (no pun intended) the weather report arrives before the "IM notification" or after.
### Code example
Below is a fully-fledged example of the types of places (networking) where tristanable can be of help. The code is all explained in the comments:
If we wanted to implement the following we would do the following. One note is that instead of waiting on messages of a specific _"type"_ (or rather **tag**), tristanable provides not just a one-message lengthb uffer per tag but infact a full queue per tag, meaning any received message with tag `1` will be enqueued and not dropped after the first message of type `1` is buffered.
```d
import std.socket;
import std.stdio;
import core.thread;
import tristanable.manager;
import tristanable.queue;
import tristanable.queueitem;
Address serverAddress = parseAddress("::1", 0);
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
server.bind(serverAddress);
server.listen(0);
/* Create a manager to manage the socket for us */
Manager manager = new Manager(socket);
class ServerThread : Thread
{
this()
{
super(&worker);
}
/* Create a Queue for all "weather messages" */
Queue weatherQueue = new Queue(1);
private void worker()
{
Socket clientSocket = server.accept();
BClient bClient = new BClient(clientSocket);
/* Create a Queue for all "IM notifications" */
Queue instantNotification = new Queue(2);
Thread.sleep(dur!("seconds")(7));
writeln("Server start");
/* Tell the manager to look out for tagged messages `1` and `2` */
manager.addQueue(weatherQueue);
manager.addQueue(instantNotification);
/**
* Create a tagged message to send
*
* tag 42 payload Cucumber 😳️
*/
TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️");
byte[] tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 69 payload Hello
*/
message = new TaggedMessage(69, cast(byte[])"Hello");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 69 payload Bye
*/
message = new TaggedMessage(69, cast(byte[])"Bye");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 100 payload Bye
*/
message = new TaggedMessage(100, cast(byte[])"DEFQUEUE_1");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 200 payload Bye
*/
message = new TaggedMessage(200, cast(byte[])"DEFQUEUE_2");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
}
}
ServerThread serverThread = new ServerThread();
serverThread.start();
Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
writeln(server.localAddress);
Manager manager = new Manager(client);
Queue sixtyNine = new Queue(69);
Queue fortyTwo = new Queue(42);
manager.registerQueue(sixtyNine);
manager.registerQueue(fortyTwo);
// Register a default queue (tag ignored)
Queue defaultQueue = new Queue(2332);
manager.setDefaultQueue(defaultQueue);
/* Connect our socket to the server */
client.connect(server.localAddress);
/* Start the manager and let it manage the socket */
manager.start();
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
TaggedMessage dequeuedMessage = sixtyNine.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 69);
assert(dequeuedMessage.getPayload() == cast(byte[])"Hello");
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = sixtyNine.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 69);
assert(dequeuedMessage.getPayload() == cast(byte[])"Bye");
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = fortyTwo.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 42);
assert(dequeuedMessage.getPayload() == cast(byte[])"Cucumber 😳️");
/* Dequeue two messages from the default queue */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = defaultQueue.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 100);
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_1");
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = defaultQueue.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 200);
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_2");
/* Stop the manager */
manager.stop();
/* Now we can block on this queue and return with its head */
QueueItem message = weatherQueue.dequeue();
```
And let tristanable handle it! We even handle the message lengths and everything using another great project [bformat](https://deavmi.assigned.network/projects/bformat).
Surely, there must be some sort of encoding mechanism too? The messages afterall need to be encoded. **No problem!**, we have that sorted:
```d
import tristanable.encoding;
/* Let's send it with tag 1 and data "Hello" */
ulong tag = 1;
byte[] data = cast(byte[])"Hello";
/* When sending a message */
DataMessage tristanEncoded = new DataMessage(tag, data);
/* Then send it */
socket.send(encodeForSend(tristanEncoded));
```
And let tristanable handle it! We even handle the message lengths and everything using another great project [bformat](http://deavmi.assigned.network/projects/bformat).
## Format
@ -178,9 +64,9 @@ And let tristanable handle it! We even handle the message lengths and everything
```
## Using tristanable in your D project
You can easily add the library (source-based) to your project by running the following command in your project's root:
You can easily add the library (source-based) to your project by running the following command in your
project's root:
```bash
dub add tristanable
```
```

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.5 KiB

Binary file not shown.

View File

@ -1,15 +1,14 @@
{
"authors": [
"Tristan B. Velloza Kildaire"
"Tristan B. Kildaire"
],
"copyright": "Copyright © 2023, Tristan B. Kildaire",
"copyright": "Copyright © 2020, Tristan B. Kildaire",
"dependencies": {
"bformat": ">=4.1.1",
"niknaks": ">=0.3.0"
"bformat": "~>3.1.3"
},
"description": "Tristanable network message queuing framework",
"homepage": "https://deavmi.assigned.network/projects/tristanable",
"description": "Tag-based asynchronous messaging framework",
"license": "LGPL-3.0",
"name": "tristanable",
"targetType": "library"
"targetType": "library",
"sourcePaths": ["source/tristanable"]
}

6
dub.selections.json Normal file
View File

@ -0,0 +1,6 @@
{
"fileVersion": 1,
"versions": {
"bformat": "3.1.3"
}
}

15
example/client/.gitignore vendored Normal file
View File

@ -0,0 +1,15 @@
.dub
docs.json
__dummy.html
docs/
/example
example.so
example.dylib
example.dll
example.a
example.lib
example-test-*
*.exe
*.o
*.obj
*.lst

12
example/client/dub.json Normal file
View File

@ -0,0 +1,12 @@
{
"authors": [
"Tristan B. Kildaire"
],
"copyright": "Copyright © 2020, Tristan B. Kildaire",
"dependencies": {
"tristanable": "~>2.2.1"
},
"description": "A minimal D application.",
"license": "proprietary",
"name": "example"
}

View File

@ -0,0 +1,7 @@
{
"fileVersion": 1,
"versions": {
"bformat": "1.0.8",
"tristanable": "2.2.1"
}
}

View File

@ -0,0 +1,59 @@
import std.stdio;
import tristanable.manager : Manager;
import std.socket;
import core.thread;
void main()
{
writeln("Edit source/app.d to start your project.");
Socket socket = new Socket(AddressFamily.INET, SocketType.STREAM, ProtocolType.TCP);
socket.connect(parseAddress("127.0.0.1",7777));
Manager manager = new Manager(socket);
class bruh : Thread
{
this()
{
super(&run);
}
private void run()
{
while(true)
{
manager.lockQueue();
writeln(manager.getQueue());
manager.unlockQueue();
import core.thread;
Thread.sleep(dur!("seconds")(1));
}
}
}
new bruh().start();
manager.sendMessage(69, [77]);
manager.sendMessage(70, [78]);
byte[] receivedKaka = manager.receiveMessage(69);
writeln(receivedKaka);
receivedKaka = manager.receiveMessage(70);
writeln(receivedKaka);
manager.sendMessage(70, [78]);
receivedKaka = manager.receiveMessage(70);
writeln(receivedKaka);
}

0
example/server.d Normal file
View File

15
example/server/.gitignore vendored Normal file
View File

@ -0,0 +1,15 @@
.dub
docs.json
__dummy.html
docs/
/server
server.so
server.dylib
server.dll
server.a
server.lib
server-test-*
*.exe
*.o
*.obj
*.lst

13
example/server/dub.json Normal file
View File

@ -0,0 +1,13 @@
{
"authors": [
"Tristan B. Kildaire"
],
"copyright": "Copyright © 2020, Tristan B. Kildaire",
"dependencies": {
"bformat": "~>1.0.8",
"tristanable": "~>2.2.1"
},
"description": "A minimal D application.",
"license": "proprietary",
"name": "server"
}

View File

@ -0,0 +1,7 @@
{
"fileVersion": 1,
"versions": {
"bformat": "1.0.8",
"tristanable": "2.2.1"
}
}

View File

@ -0,0 +1,37 @@
import std.stdio;
import std.socket;
import tristanable.encoding : DataMessage;
import bmessage;
import core.thread;
void main()
{
writeln("Edit source/app.d to start your project.");
Socket socket = new Socket(AddressFamily.INET, SocketType.STREAM, ProtocolType.TCP);
socket.bind(parseAddress("127.0.0.1",7777));
socket.listen(1);
Socket conn = socket.accept();
byte[] receivedData;
while(true)
{
receiveMessage(conn, receivedData);
DataMessage message = DataMessage.decode(receivedData);
writeln("Tag: ", message.tag);
writeln("Data: ", message.data);
DataMessage d = new DataMessage(70, [2]);
sendMessage(conn, d.encode());
d = new DataMessage(69, [1]);
Thread.sleep(dur!("seconds")(5));
sendMessage(conn, d.encode());
}
}

View File

@ -1,168 +1,67 @@
/**
* Encoding/decoding of the tristanable format
*/
module tristanable.encoding;
import std.conv : to;
import niknaks.bits : bytesToIntegral, Order, order, toBytes;
/**
* Represents a tagged message that has been decoded
* from its raw byte encoding, this is a tuple of
* a numeric tag and a byte array of payload data
*
* Also provides a static method to decode from such
* raw encoding and an instance method to do the reverse
*/
public final class TaggedMessage
public final class DataMessage
{
/**
* This message's tag
*/
private ulong tag;
/**
* The payload
*/
private ulong tag;
private byte[] data;
/**
* Constructs a new TaggedMessage with the given tag and payload
*
* Params:
* tag = the tag to use
* data = the payload
*/
public static DataMessage decode(byte[] bytes)
{
/* Fetch the `tag` */
ulong receivedTag = *(cast(ulong*)bytes.ptr);
/* Fetch the `data` */
byte[] receivedData = bytes[8..bytes.length];
return new DataMessage(receivedTag, receivedData);
}
/**
* Constructs a new DataMessage with
* the give `tag` and bytes `data`
*/
this(ulong tag, byte[] data)
{
this.tag = tag;
this.data = data;
}
/**
* Parameterless constructor used for decoder
*/
private this() {}
/**
* Decodes the wire-formatted tristanable bytes into an instance
* of TaggedMessage whereby the tag and data can be seperately
* accessed and manipulated
*
* Params:
* encodedMessage = the wire-format encoded bytes
* Returns: an instance of TaggedMessage
*/
public static TaggedMessage decode(byte[] encodedMessage)
{
/* The decoded message */
TaggedMessage decodedMessage = new TaggedMessage();
/* The decoded tag */
ulong decodedTag;
/* Take ulong-many bytes and only flip them to LE if not on LE host */
decodedTag = order(bytesToIntegral!(ushort)(cast(ubyte[])encodedMessage), Order.LE);
/* Set the tag */
decodedMessage.setTag(decodedTag);
/* Set the data *(9-th byte onwards) */
decodedMessage.setPayload(encodedMessage[8..$]);
return decodedMessage;
}
/**
* Encodes the tagged message into the tristanable
* wire format ready for transmission
*
* Returns: the encoded bytes
*/
public byte[] encode()
{
/* The encoded bytes */
byte[] encodedMessage;
/* Construct the message array */
byte[] messageData;
/* If on little endian then no re-order, if host is BE flip (the tag) */
encodedMessage ~= toBytes(order(tag, Order.LE));
/* Add the `tag` bytes */
messageData ~= *(cast(byte*)&tag);
messageData ~= *(cast(byte*)&tag+1);
messageData ~= *(cast(byte*)&tag+2);
messageData ~= *(cast(byte*)&tag+3);
messageData ~= *(cast(byte*)&tag+4);
messageData ~= *(cast(byte*)&tag+5);
messageData ~= *(cast(byte*)&tag+6);
messageData ~= *(cast(byte*)&tag+7);
/* Add the `data` bytes (the actual message) */
messageData ~= data;
/* Tack on the data */
encodedMessage ~= data;
return encodedMessage;
return messageData;
}
/**
* Get the message's payload
*
* Returns: the payload
*/
public byte[] getPayload()
public byte[] getData()
{
return data;
}
/**
* Get the message's tag
*
* Returns: the tag
*/
public ulong getTag()
{
return tag;
}
/**
* Set the message's payload
*
* Params:
* newPayload = the payload to use
*/
public void setPayload(byte[] newPayload)
{
this.data = newPayload;
}
/**
* Set the message's tag
*
* Params:
* newTag = the tag to use
*/
public void setTag(ulong newTag)
{
this.tag = newTag;
}
/**
* Returns a string representation of the TaggedMessage
*
* Returns: the string represenation
*/
public override string toString()
{
return "TMessage [Tag: "~to!(string)(tag)~", Payload: "~to!(string)(data)~"]";
}
}
/**
* Test encoding and decoding
*/
unittest
public static byte[] encodeForSend(DataMessage message)
{
/* Setup testing data */
TaggedMessage testData = new TaggedMessage(420, [1,2,3]);
/* Encode */
byte[] encoded = testData.encode();
/* Decode */
TaggedMessage decoded = TaggedMessage.decode(encoded);
/* Now ensure that `decoded` == original `testData` */
assert(decoded.getTag() == testData.getTag);
assert(decoded.getPayload() == testData.getPayload());
import bmessage;
return encodeBformat(message.encode());
}

View File

@ -1,73 +1,29 @@
/**
* Error handling type definitions
*/
module tristanable.exceptions;
import std.conv : to;
import tristanable.manager;
import tristanable.queue : Queue;
/**
* The type of sub-error of the `TristanableException`
*/
public enum ErrorType
public final class TristanableException : Exception
{
/**
* If the requested queue could not be found
*/
QUEUE_NOT_FOUND,
/**
* If the queue wanting to be registered has already
* been registered under the same tag
*/
QUEUE_ALREADY_EXISTS,
/**
* If no default queue is configured
*/
NO_DEFAULT_QUEUE,
/**
* The blocking call to `dequeue()`, somehow, failed
*/
DEQUEUE_FAILED,
/**
* The call to `enqueue()`, somehow, failed
*/
ENQUEUE_FAILED
}
/**
* Any sort of error that occurs during runtime of the tristanable
* engine
*/
public class TristanableException : Exception
{
/**
* The sub-error type
*/
private ErrorType err;
/**
* Constructs a new `TristanableException` with the provided
* sub-error type
*
* Params:
* err = the `ErrorType`
*/
this(ErrorType err)
this(Manager manager, string message)
{
super(this.classinfo.name~": "~to!(string)(err));
this.err = err;
super(generateMessage(message));
}
/**
* Retrieve the sub-error type
*
* Returns: the sub-error type as a `ErrorType`
*/
public ErrorType getError()
private string generateMessage(string errMesg)
{
return err;
string msg;
// msg = "TRistanable failure: "~errMesg~"\n\n";
// msg ~= "Queue stats:\n\n"
// Queue[] queues = manager.getQueues();
// foreach(Queue queue; queues)
// {
// msg ~= "Queue["~to!(string)(queue.getTag())~"]: "~
// }
// msg ~= manager.getQueues()
return msg;
}
}

View File

@ -0,0 +1,217 @@
module tristanable.manager;
import std.socket : Socket;
import core.sync.mutex : Mutex;
import bmessage : bSendMessage = sendMessage;
import tristanable.queue : Queue;
import tristanable.watcher;
import std.container.dlist;
import tristanable.exceptions;
/**
* Manager
*
* This is the core class that is to be instantiated
* that represents an instance of the tristanable
* framework. It is passed a Socket from which it
* reads from (using a bformat block reader).
*
* It contains a Watcher which does the reading and
* appending to respective queues (the user need not
* worry about this factum).
*
* The functions provided allow users to wait in a
* tight loop to dequeue ("receive" in a blcoking mannger)
* from a specified queue.
*/
public final class Manager
{
/* All queues */
private DList!(Queue) queues;
private Mutex queuesLock;
/* TODO Add drop queue? */
/**
* The remote host
*/
private Socket socket;
private Watcher watcher;
/**
* Constructs a new Manager with the given
* endpoint Socket
*
*/
this(Socket socket)
{
/* TODO: Make sure the socket is in STREAM mode */
/* Set the socket */
this.socket = socket;
/* Initialize the queues mutex */
queuesLock = new Mutex();
/* Initialize the watcher */
watcher = new Watcher(this, socket);
}
public Queue getQueue(ulong tag)
{
Queue matchingQueue;
queuesLock.lock();
foreach(Queue queue; queues)
{
if(queue.getTag() == tag)
{
matchingQueue = queue;
break;
}
}
queuesLock.unlock();
return matchingQueue;
}
/* TODO: Probably remove this or keep it */
public bool isValidTag(ulong tag)
{
return !(getQueue(tag) is null);
}
/**
* Returns a new queue with a new ID,
* if all IDs are used then it returns
* null
*
* Use this if you don't care about reserving
* queues IDs and just want a throwaway queue
*
* FIXME: All tags in use, this won't handle it
*/
public Queue generateQueue()
{
/* Newly generated queue */
Queue newQueue;
queuesLock.lock();
ulong curGuess = 0;
bool bad = true;
reguess: while(bad)
{
if(isValidTag(curGuess))
{
curGuess++;
continue reguess;
}
bad = false;
}
/* Create the new queue with the free id found */
newQueue = new Queue(curGuess);
/* Add the queue (recursive mutex) */
addQueue(newQueue);
queuesLock.unlock();
return newQueue;
}
public Queue[] getQueues()
{
Queue[] queues;
queuesLock.lock();
foreach(Queue queue; this.queues)
{
queues ~= queue;
}
queuesLock.unlock();
return queues;
}
/**
* Removes the given Queue, `queue`, from the manager
*
* Throws a TristanableException if the id of the
* queue wanting to be removed is not in use by any
* queue already added
*/
public void removeQueue(Queue queue)
{
queuesLock.lock();
/* Make sure such a tag exists */
if(isValidTag(queue.getTag()))
{
queues.linearRemoveElement(queue);
}
else
{
/* Unlock queue before throwing an exception */
queuesLock.unlock();
throw new TristanableException(this, "Cannot remove a queue with an id not in use");
}
queuesLock.unlock();
}
/**
* Adds the given Queue, `queue`, to the manager
*
* Throws a TristanableException if the id of the
* queue wanting to be added is already in use by
* another already added queue
*/
public void addQueue(Queue queue)
{
queuesLock.lock();
/* Make sure such a tag does not exist already */
if(!isValidTag(queue.getTag()))
{
queues ~= queue;
}
else
{
/* Unlock queue before throwing an exception */
queuesLock.unlock();
throw new TristanableException(this, "Cannot add queue with id already in use");
}
queuesLock.unlock();
}
public Socket getSocket()
{
return socket;
}
/**
* TODO: Comment
* TODO: Testing
*/
public void shutdown()
{
/* TODO: Implement me */
/* Make the loop stop whenever it does */
watcher.shutdown();
/* Wait for the thread to end */
watcher.join();
}
}

View File

@ -1,34 +0,0 @@
/**
* Configuration for the manager
*/
module tristanable.manager.config;
/**
* Manager parameters
*/
public struct Config
{
/**
* If set to true then when one uses
* `sendMessage(TaggedMessage)` the
* manager will check if a queue with
* said tag has not been registered
* and if so, then register it for
* us before encoding-and-sending
*/
public bool registerOnSend;
}
/**
* Generates the default configuration to use for
* the manager
*
* Returns: the Config
*/
public Config defaultConfig()
{
Config config;
config.registerOnSend = false;
return config;
}

View File

@ -1,549 +0,0 @@
/**
* Management of a tristanable instance
*/
module tristanable.manager.manager;
import std.socket;
import tristanable.queue.queue : Queue;
import core.sync.mutex : Mutex;
import tristanable.manager.watcher : Watcher;
import tristanable.encoding : TaggedMessage;
import tristanable.exceptions;
import std.container.slist : SList;
import tristanable.manager.config;
import river.core;
import river.impls.sock : SockStream;
import bformat.client;
/**
* Manages a provided socket by spawning
* a watcher thread to read from it and file
* mail into the corresponding queues.
*
* Queues are managed via this an instance
* of a manager.
*/
public class Manager
{
/**
* Configuration
*/
private Config config;
/**
* The bformat client to read and write from
*/
private BClient bClient;
/**
* Currently registered queues
*/
private SList!(Queue) queues;
/**
* Lock for currently registered queues
*/
private Mutex queuesLock;
/**
* Default queue
*/
private Queue defaultQueue;
/**
* Watcher which manages the socket and
* enqueues new messages into the respective
* quueue for us
*/
private Watcher watcher;
/**
* Constructs a new manager which will read from
* this socket and file mail for us
*
* Params:
* stream = the underlying stream to use
*/
this(Stream stream, Config config = defaultConfig())
{
this.bClient = new BClient(stream);
this.queuesLock = new Mutex();
this.config = config;
this.watcher = new Watcher(this, bClient);
}
// TODO: Comment this
// This is for backwards compatibility (whereby a `Socket` was taken in)
this(Socket socket, Config config = defaultConfig())
{
this(new SockStream(socket), config);
}
/**
* Starts the management of the socket,
* resulting in queues being updated upon
* reciving messages tagged for them
*/
public void start()
{
watcher.startWatcher();
}
/**
* Stops the management of the socket, resulting
* in ending the updating of queues and closing
* the underlying connection
*/
public void stop()
{
watcher.shutdown();
}
/**
* Retrieves the queue mathcing the provided id
*
* Params:
* id = the id to lookup by
* Returns: the Queue
* Throws: TristanableException if the queue is not found
*/
public Queue getQueue(ulong id)
{
/* The found queue */
Queue queue = getQueue_nothrow(id);
/* If no queue is found then throw an error */
if(queue is null)
{
throw new TristanableException(ErrorType.QUEUE_NOT_FOUND);
}
return queue;
}
/**
* Retrieves the queue mathcing the provided id
*
* This is the nothrow version
*
* Params:
* id = the id to lookup by
* Returns: the Queue if found, null otherwise
*/
public Queue getQueue_nothrow(ulong id)
{
/* The found queue */
Queue queue;
/* Lock the queue of queues */
queuesLock.lock();
/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}
/* Search for the queue */
foreach(Queue curQueue; queues)
{
if(curQueue.getID() == id)
{
queue = curQueue;
break;
}
}
return queue;
}
/**
* Get a new queue thatis unique in its tag
* (unused/not regustered yet), register it
* and then return it
*
* Returns: the newly registered Queue
*/
public Queue getUniqueQueue()
{
/* The newly created queue */
Queue uniqueQueue;
/* Lock the queue of queues */
queuesLock.lock();
/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}
// TODO: Throw exception if all tags used
/* The unused tag */
ulong unusedTag = 0;
/* Try the current tag and ensure no queue uses it */
tagLoop: for(ulong curPotentialTag = 0; true; curPotentialTag++)
{
foreach(Queue curQueue; queues)
{
if(curQueue.getID() == curPotentialTag)
{
continue tagLoop;
}
}
/* Then we have found a unique tag */
unusedTag = curPotentialTag;
break;
}
/* Create the queue */
uniqueQueue = new Queue(unusedTag);
/* Register it */
registerQueue(uniqueQueue);
return uniqueQueue;
}
/**
* Registers the given queue with the manager
*
* Params:
* queue = the queue to register
* Throws:
* TristanableException if a queue with the provided id already exists
*/
public void registerQueue(Queue queue)
{
/* Try to register the queue */
bool status = registerQueue_nothrow(queue);
/* If registration was not successful */
if(!status)
{
throw new TristanableException(ErrorType.QUEUE_ALREADY_EXISTS);
}
}
/**
* Registers the given queue with the manager
*
* Params:
* queue = the queue to register
* Returns: true if registration was successful, false otherwise
*/
public bool registerQueue_nothrow(Queue queue)
{
/* Lock the queue of queues */
queuesLock.lock();
/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}
/* Search for the queue, throw an exception if it exists */
foreach(Queue curQueue; queues)
{
if(curQueue.getID() == queue.getID())
{
/* Registration failed */
return false;
}
}
/* Insert the queue as it does not exist */
queues.insertAfter(queues[], queue);
/* Registration was a success */
return true;
}
/**
* De-registers the given queue from the manager
*
* Params:
* queue = the queue to de-register
* Throws:
* TristanableException if a queue with the provided id cannot be found
*/
public void releaseQueue(Queue queue)
{
/* Try to de-register the queue */
bool status = releaseQueue_nothrow(queue);
/* If de-registration was not successful */
if(!status)
{
throw new TristanableException(ErrorType.QUEUE_NOT_FOUND);
}
}
/**
* De-registers the given queue from the manager
*
* Params:
* queue = the queue to de-register
* Returns: true if de-registration was successful, false otherwise
*/
public bool releaseQueue_nothrow(Queue queue)
{
/* Lock the queue of queues */
queuesLock.lock();
/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}
/* Search for the queue, return false if it does NOT exist */
foreach(Queue curQueue; queues)
{
if(curQueue.getID() == queue.getID())
{
/* Remove the queue */
queues.linearRemoveElement(queue);
/* De-registration succeeded */
return true;
}
}
/* De-registration failed */
return false;
}
/**
* Sets the default queue
*
* The default queue, when set/enabled, is the queue that will
* be used to enqueue messages that have a tag which doesn't
* match any of the normally registered queues.
*
* Please note that the ID of the queue passed in here does not
* mean anything in this context; only the queuing facilities
* of the Queue object are used
*
* Params:
* queue = the default queue to use
*/
public void setDefaultQueue(Queue queue)
{
this.defaultQueue = queue;
}
/**
* Returns the default queue
*
* Returns: the default queue
* Throws:
* TristanableException if there is no default queue
*/
public Queue getDefaultQueue()
{
/* The potential default queue */
Queue potentialDefaultQueue = getDefaultQueue_nothrow();
if(potentialDefaultQueue is null)
{
throw new TristanableException(ErrorType.NO_DEFAULT_QUEUE);
}
return potentialDefaultQueue;
}
/**
* Returns the default queue
*
* This is the nothrow version
*
* Returns: the default queue if found, null otherwise
*/
public Queue getDefaultQueue_nothrow()
{
return defaultQueue;
}
/**
* Sends the provided message over the socket
*
* Params:
* message = the TaggedMessage to send
*/
public void sendMessage(TaggedMessage message)
{
/**
* If a queue with the tag of the message does
* not exist, then register it if the config
* option was enabled
*/
if(config.registerOnSend)
{
/* Create a Queue with the tag */
Queue createdQueue = new Queue(message.getTag());
/* Attempt to register the queue */
registerQueue_nothrow(createdQueue);
}
/* Encode the message */
byte[] encodedMessage = message.encode();
/* Send it using bformat (encode-and-send) */
bClient.sendMessage(encodedMessage);
}
}
// TODO: Fix this, write it in a nicer way
// ... or make a private constructor here that
// ... does not take it in
version(unittest)
{
Socket nullSock = null;
}
/**
* Test retrieving a queue which does not
* exist
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Shouldn't be found */
try
{
manager.getQueue(69);
assert(false);
}
catch(TristanableException e)
{
assert(e.getError() == ErrorType.QUEUE_NOT_FOUND);
}
/* Shouldn't be found */
assert(manager.getQueue_nothrow(69) is null);
}
/**
* Test registering a queue and then fetching it
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Create a new queue with tag 69 */
Queue queue = new Queue(69);
try
{
/* Register the queue */
manager.registerQueue(queue);
/* Fetch the queue */
Queue fetchedQueue = manager.getQueue(69);
/* Ensure the queue we fetched is the one we stored (the references would be equal) */
assert(fetchedQueue == queue);
}
catch(TristanableException e)
{
assert(false);
}
/* Should be found */
assert(manager.getQueue_nothrow(69) !is null);
}
/**
* Tests registering a queue and then registering
* another queue with the same id
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Create a new queue with tag 69 */
Queue queue = new Queue(69);
/* Register the queue */
manager.registerQueue(queue);
try
{
/* Register the queue (try again) */
manager.registerQueue(queue);
assert(false);
}
catch(TristanableException e)
{
assert(e.getError() == ErrorType.QUEUE_ALREADY_EXISTS);
}
}
/**
* Tests registering a queue, de-registering it and
* then registering it again
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Create a new queue with tag 69 */
Queue queue = new Queue(69);
/* Register the queue */
manager.registerQueue(queue);
/* Ensure it is registered */
assert(queue == manager.getQueue(69));
/* De-register the queue */
manager.releaseQueue(queue);
/* Ensure it is de-registered */
assert(manager.getQueue_nothrow(69) is null);
/* Register the queue (again) */
manager.registerQueue(queue);
/* Ensure it is registered (again) */
assert(queue == manager.getQueue(69));
}
/**
* Tests registering a queue using the "next available queue"
* method
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Get the next 3 available queues */
Queue queue1 = manager.getUniqueQueue();
Queue queue2 = manager.getUniqueQueue();
Queue queue3 = manager.getUniqueQueue();
/* The queues should have tags [0, 1, 2] respectively */
assert(queue1.getID() == 0);
assert(queue2.getID() == 1);
assert(queue3.getID() == 2);
}
// TODO: Add testing for queue existence (internal method)

View File

@ -1,15 +0,0 @@
/**
* Interface which manages a provided socket
* and enqueuing and dequeuing of queues
*/
module tristanable.manager;
/**
* The management facilities
*/
public import tristanable.manager.manager : Manager;
/**
* Configuration for the manager
*/
public import tristanable.manager.config : Config, defaultConfig;

View File

@ -1,297 +0,0 @@
/**
* Facilitates the reading of messages from the socket,
* decoding thereof and final enqueuing thereof into their
* respective queus
*/
module tristanable.manager.watcher;
import core.thread : Thread;
import tristanable.manager.manager : Manager;
import std.socket;
import bformat;
import tristanable.encoding;
import tristanable.exceptions;
import tristanable.queue.queue;
import bformat.client;
/**
* Watches the socket on a thread of its own,
* performs the decoding of the incoming messages
* and places them into the correct queues via
* the associated Manager instance
*/
public class Watcher : Thread
{
/**
* The associated manager to use
* such that we can place new mail
* into their respective inboxes (queues)
*/
private Manager manager;
/**
* The BClient to read from
*/
private BClient bClient;
/**
* Creates a new `Watcher` that is associated
* with the provided `Manager` such that it can
* add to its registered queues. The provided `Socket`
* is such that it can be read from and managed.
*
* Params:
* manager = the `Manager` to associate with
* bclient = the underlying `BClient` to read data from
*/
package this(Manager manager, BClient bClient)
{
this.manager = manager;
this.bClient = bClient;
super(&watch);
}
/**
* Starts the underlying thread
*/
package void startWatcher()
{
/* Start the watch method on a new thread */
start();
}
/**
* Watches the socket for incoming messages
* and decodes them on the fly, placing
* the final message in the respective queue
*/
private void watch()
{
import std.stdio;
while(true)
{
/* Do a bformat read-and-decode */
byte[] wireTristan;
version(unittest) { writeln("Before bformat recv()"); }
bool recvStatus = bClient.receiveMessage(wireTristan); // TODO: Add a check for the status of read
version(unittest) { writeln("After bformat recv()"); }
version(unittest) { writeln("bformat recv() status: ", recvStatus); }
if(recvStatus)
{
/* Decode the received bytes into a tagged message */
TaggedMessage decodedMessage = TaggedMessage.decode(wireTristan);
version(unittest) { writeln("Watcher received: ", decodedMessage); }
/* Search for the queue with the id provided */
ulong messageTag = decodedMessage.getTag();
Queue potentialQueue = manager.getQueue_nothrow(messageTag);
/* If a queue can be found */
if(potentialQueue !is null)
{
/* Enqueue the message */
potentialQueue.enqueue(decodedMessage);
}
/* If the queue if not found */
else
{
/**
* Look for a default queue, and if one is found
* then enqueue the message there. Otherwise, drop
* it by simply doing nothing.
*/
try
{
potentialQueue = manager.getDefaultQueue();
/* Enqueue the message */
potentialQueue.enqueue(decodedMessage);
}
catch(TristanableException e) {}
}
version(unittest) { writeln("drip"); }
}
/**
* If there was an error receiving on the socket.
*
* This can be either because we have shut the socket down
* or the remote end has closed the connection.
*
* In any case, exit the loop therefore ending this thread.
*/
else
{
break;
}
}
}
/**
* Shuts down the watcher, unblocks the blocking read in the loop
* resulting in the watcher thread ending
*/
package void shutdown()
{
/* Closes the bformat reader */
bClient.close();
}
}
/**
* Set up a server which will send some tagged messages to us (the client),
* where we have setup a `Manager` to watch the queues with tags `42` and `69`,
* we then dequeue some messages from both queus. Finally, we shut down the manager.
*/
unittest
{
import std.socket;
import std.stdio;
import core.thread;
Address serverAddress = parseAddress("::1", 0);
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
server.bind(serverAddress);
server.listen(0);
class ServerThread : Thread
{
this()
{
super(&worker);
}
private void worker()
{
Socket clientSocket = server.accept();
BClient bClient = new BClient(clientSocket);
Thread.sleep(dur!("seconds")(7));
writeln("Server start");
/**
* Create a tagged message to send
*
* tag 42 payload Cucumber 😳
*/
TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️");
byte[] tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 69 payload Hello
*/
message = new TaggedMessage(69, cast(byte[])"Hello");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 69 payload Bye
*/
message = new TaggedMessage(69, cast(byte[])"Bye");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 100 payload Bye
*/
message = new TaggedMessage(100, cast(byte[])"DEFQUEUE_1");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 200 payload Bye
*/
message = new TaggedMessage(200, cast(byte[])"DEFQUEUE_2");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
}
}
ServerThread serverThread = new ServerThread();
serverThread.start();
Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
writeln(server.localAddress);
Manager manager = new Manager(client);
Queue sixtyNine = new Queue(69);
Queue fortyTwo = new Queue(42);
manager.registerQueue(sixtyNine);
manager.registerQueue(fortyTwo);
// Register a default queue (tag ignored)
Queue defaultQueue = new Queue(2332);
manager.setDefaultQueue(defaultQueue);
/* Connect our socket to the server */
client.connect(server.localAddress);
/* Start the manager and let it manage the socket */
manager.start();
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
TaggedMessage dequeuedMessage = sixtyNine.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 69);
assert(dequeuedMessage.getPayload() == cast(byte[])"Hello");
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = sixtyNine.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 69);
assert(dequeuedMessage.getPayload() == cast(byte[])"Bye");
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = fortyTwo.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 42);
assert(dequeuedMessage.getPayload() == cast(byte[])"Cucumber 😳️");
/* Dequeue two messages from the default queue */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = defaultQueue.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 100);
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_1");
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = defaultQueue.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 200);
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_2");
/* Stop the manager */
manager.stop();
}

View File

@ -1,25 +1,7 @@
/**
* Tristanable network message queuing framework
*/
module tristanable;
/**
* Interface which manages a provided socket
* and enqueuing and dequeuing of queues
*/
public import tristanable.encoding;
public import tristanable.manager;
/**
* A queue of queue items all of the same tag
*/
public import tristanable.queue.queue : Queue;
/**
* Error handling type definitions
*/
public import tristanable.exceptions : TristanableException, ErrorType;
/**
* Encoding/decoding of the tristanable format
*/
public import tristanable.encoding : TaggedMessage;
public import tristanable.queue;
public import tristanable.queueitem;
public import tristanable.watcher;

172
source/tristanable/queue.d Normal file
View File

@ -0,0 +1,172 @@
/**
* Queue
*
* Represents a queue with a tag.
*
* Any messages that are received with
* the matching tag (to this queue) are
* then enqueued to this queue
*/
module tristanable.queue;
import tristanable.queueitem : QueueItem;
import std.socket : Socket;
import core.sync.mutex : Mutex;
import bmessage : bSendMessage = sendMessage;
import core.thread : Thread;
import std.container.dlist;
import std.range : walkLength;
public enum QueuePolicy : ubyte
{
LENGTH_CAP = 1
}
public final class Queue
{
/* This queue's tag */
private ulong tag;
/* The queue */
private DList!(QueueItem) queue;
/* The queue mutex */
private Mutex queueLock;
/**
* Construct a new queue with the given
* tag
*/
this(ulong tag, QueuePolicy flags = cast(QueuePolicy)0)
{
this.tag = tag;
/* Initialize the mutex */
queueLock = new Mutex();
this.flags = flags;
}
public void setLengthCap(ulong lengthCap)
{
this.lengthCap = lengthCap;
}
public ulong getLengthCap(ulong lengthCap)
{
return lengthCap;
}
/**
* Queue policy settings
*/
private ulong lengthCap = 1;
private QueuePolicy flags;
public void enqueue(QueueItem item)
{
/* Lock the queue */
queueLock.lock();
/**
* Check to see if the queue has a length cap
*
* If so then determine whether to drop or
* keep dependent on current capacity
*/
if(flags & QueuePolicy.LENGTH_CAP)
{
if(walkLength(queue[]) == lengthCap)
{
goto unlock;
}
}
/* Add it to the queue */
queue ~= item;
unlock:
/* Unlock the queue */
queueLock.unlock();
}
/**
* Returns true if this queue has items ready
* to be dequeued, false otherwise
*/
public bool poll()
{
/* Status */
bool status;
/* Lock the queue */
queueLock.lock();
status = !queue.empty();
/* Unlock the queue */
queueLock.unlock();
return status;
}
/**
* Attempts to coninuously dequeue the
* head of the queue
*
* TODO: Add a timeout capability
* TODO: Add tryLock, yield on failure (with loop for recheck ofc)
* TODO: Possible multiple dequeue feature? Like .receive
*/
public QueueItem dequeue()
{
/* The head of the queue */
QueueItem queueHead;
while(!queueHead)
{
/* Lock the queue */
queueLock.lock();
/* Check if we can dequeue anything */
if(!queue.empty())
{
/* If we can then dequeue */
queueHead = queue.front();
queue.removeFront();
/* Chop off the head */
// offWithTheHead();
}
/* Unlock the queue */
queueLock.unlock();
/**
* Move away from this thread, let
* the watcher (presumably) try
* access our queue (successfully)
* by getting a lock on it
*
* Prevents us possibly racing back
* and locking queue again hence
* starving the system
*/
Thread.getThis().yield();
}
return queueHead;
}
/**
* Returns the tag for this queue
*/
public ulong getTag()
{
return tag;
}
}

View File

@ -1,240 +0,0 @@
/**
* A queue of queue items all of the same tag
*/
module tristanable.queue.queue;
import core.sync.mutex : Mutex;
import core.sync.condition : Condition;
import core.sync.exception : SyncError;
import std.container.slist : SList;
import tristanable.encoding;
import core.time : Duration, dur;
import tristanable.exceptions;
version(unittest)
{
import std.stdio;
import std.conv : to;
}
/**
* Represents a queue whereby messages of a certain tag/id
* can be enqueued to (by the `Watcher`) and dequeued from
* (by the user application)
*/
public class Queue
{
/**
* This queue's unique ID
*/
private ulong queueID;
/**
* The libsnooze event used to sleep/wake
* on queue events
* Mutex for the condition variable
*/
private Mutex mutex;
/**
* The condition variable used to sleep/wake
* on queue of events
*/
private Condition signal;
/**
* The queue of messages
*/
private SList!(TaggedMessage) queue;
/**
* The lock for the message queue
*/
private Mutex queueLock;
/**
* If a message is enqueued prior
* to us sleeping then we won't
* wake up and return for it.
*
* Therefore a periodic wakeup
* is required.
*/
private Duration wakeInterval;
/**
* Constructs a new Queue and immediately sets up the notification
* sub-system for the calling thread (the thread constructing this
* object) which ensures that a call to dequeue will immediately
* unblock on the first message received under this tag
*
* Params:
* queueID = the id to use for this queue
*/
this(ulong queueID)
{
/* Initialize the queue lock */
this.queueLock = new Mutex();
/* Initialize the condition variable */
this.mutex = new Mutex();
this.signal = new Condition(this.mutex);
/* Set the queue id */
this.queueID = queueID;
/* Set the slumber interval */
this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value
}
/**
* Returns the current wake interval
* for the queue checker
*
* Returns: the `Duration`
*/
public Duration getWakeInterval()
{
return this.wakeInterval;
}
/**
* Sets the wake up interval
*
* Params:
* interval = the new interval
*/
public void setWakeInterval(Duration interval)
{
this.wakeInterval = interval;
}
/**
* Enqueues the provided tagged message onto this queue
* and then wakes up any thread that has called dequeue
* on this queue as well
*
* On error enqueueing a `TristanableException` will be
* thrown.
*
* Params:
* message = the TaggedMessage to enqueue
*/
public void enqueue(TaggedMessage message)
{
version(unittest)
{
writeln("queue["~to!(string)(queueID)~"]: Enqueuing '"~to!(string)(message)~"'...");
}
scope(exit)
{
version(unittest)
{
writeln("queue["~to!(string)(queueID)~"]: Enqueued '"~to!(string)(message)~"'!");
}
/* Unlock the item queue */
queueLock.unlock();
}
/* Lock the item queue */
queueLock.lock();
/* Add the item to the queue */
queue.insertAfter(queue[], message);
/* Wake up anyone wanting to dequeue from us */
try
{
// TODO: Make us wait on the event (optional with a time-out)
signal.notifyAll();
}
catch(SyncError snozErr)
{
// Throw an exception on a fatal exception
throw new TristanableException(ErrorType.ENQUEUE_FAILED);
}
}
// TODO: Make a version of this which can time out
/**
* Blocks till a message can be dequeued from this queue
*
* On error dequeueing a `TristanableException` will be
* thrown.
*
* Returns: the dequeued TaggedMessage
*/
public TaggedMessage dequeue()
{
version(unittest)
{
writeln("queue["~to!(string)(queueID)~"]: Dequeueing...");
}
/* The dequeued message */
TaggedMessage dequeuedMessage;
scope(exit)
{
version(unittest)
{
writeln("queue["~to!(string)(queueID)~"]: Dequeued '"~to!(string)(dequeuedMessage)~"'!");
}
}
/* Block till we dequeue a message successfully */
while(dequeuedMessage is null)
{
scope(exit)
{
// Unlock the mutex
this.mutex.unlock();
}
// Lock the mutex
this.mutex.lock();
try
{
this.signal.wait(this.wakeInterval);
}
catch(SyncError e)
{
// Throw an exception on a fatal exception
throw new TristanableException(ErrorType.DEQUEUE_FAILED);
}
/* Lock the item queue */
queueLock.lock();
/* Consume the front of the queue (if non-empty) */
if(!queue.empty())
{
/* Pop the front item off */
dequeuedMessage = queue.front();
/* Remove the front item from the queue */
queue.linearRemoveElement(dequeuedMessage);
}
/* Unlock the item queue */
queueLock.unlock();
}
return dequeuedMessage;
}
/**
* Get the id/tag of this queue
*
* Returns: the queue's id
*/
public ulong getID()
{
return queueID;
}
}

View File

@ -0,0 +1,18 @@
module tristanable.queueitem;
public final class QueueItem
{
/* This item's data */
private byte[] data;
/* TODO: */
this(byte[] data)
{
this.data = data;
}
public byte[] getData()
{
return data;
}
}

View File

@ -0,0 +1,212 @@
module tristanable.watcher;
import std.socket : Socket, SocketSet;
import core.sync.mutex : Mutex;
import bmessage : receiveMessage;
import tristanable.queue : Queue;
import tristanable.queueitem : QueueItem;
import tristanable.manager : Manager;
import core.thread : Thread, Duration, dur;
import tristanable.encoding;
import tristanable.exceptions;
public final class Watcher : Thread
{
/* The manager */
private Manager manager;
/* The socket to read from */
private Socket endpoint;
private bool running;
private bool newSys;
private SocketSet socketSetR, socketSetW, socketSetE;
/**
* Timeout for select()
*/
private Duration timeOut;
this(Manager manager, Socket endpoint, Duration timeOut = dur!("msecs")(100), bool newSys = false)
{
super(&run);
this.manager = manager;
this.endpoint = endpoint;
this.newSys = newSys;
if(newSys)
{
initSelect();
}
this.timeOut = timeOut;
running = true;
start();
}
/**
* Initializes the SocketSet which is needed for the use
* of the select() method0
*/
private void initSelect()
{
socketSetR = new SocketSet();
socketSetW = new SocketSet();
socketSetE = new SocketSet();
}
public void shutdown()
{
running=false;
/* Close the socket, causing an error, breaking the event loop */
endpoint.close();
}
private void run()
{
/* Continuously dequeue tristanable packets from socket */
while(true)
{
/* Receive payload (tag+data) */
byte[] receivedPayload;
if(newSys)
{
/**
* We want to check the readable status of the `endpoint` socket, we use
* the `select()` function for this. However, after selecting it will need
* to be re-added if you want to check again. Example, if you add it to
* the `socketSetR` (the readable-socketset) then if we time out or it
* is not readable from it will be removed from said set.
*
* Therefore we will need to add it back again for our next check (via
* calling `select()`)
*/
socketSetR.add(endpoint);
int status = Socket.select(socketSetR, null, null, timeOut);
import std.stdio : writeln;
/* If we timed out on the select() */
if(status == 0)
{
/* Check if we need to exit */
writeln("We got 0");
continue;
}
/* Interrupt */
else if (status == -1)
{
/* TODO: Not sure what we should do here */
writeln("We got -1");
import core.stdc.errno;
writeln(errno);
continue;
}
/* Either data is available or a network occurred */
else
{
writeln("Info: ", endpoint.isAlive);
writeln("info: ", endpoint.handle);
writeln("We got ", status);
/* If the socket is still connected */
if(endpoint.isAlive())
{
/* If we have data */
if(socketSetR.isSet(endpoint))
{
/* Do nothing (fall through) */
writeln("We got ready socket");
/* I don't want to do mulitple additions, so let's clear the socket read set */
socketSetR.reset();
}
/* There is no else as the only socket set checked for IS read */
}
/* If the socket is not connected (network error) */
else
{
/* TODO: Maybe handle? */
writeln("We have socket error");
// throw new TristanableException(manager, "Network error with endpoint socket");
break;
}
}
}
/* Block for socket response */
bool recvStatus = receiveMessage(endpoint, receivedPayload);
/* If the receive was successful */
if(recvStatus)
{
/* Decode the ttag-encoded message */
DataMessage message = DataMessage.decode(receivedPayload);
/* TODO: Remove isTag, improve later, oneshot */
/* The matching queue (if any) */
Queue queue = manager.getQueue(message.getTag());
/* If the tag belongs to a queue */
if(queue)
{
/* Add an item to this queue */
queue.enqueue(new QueueItem(message.getData()));
}
/* If the tag is unknwon */
else
{
/* TODO: Add to dropped queue? */
/* Do nothing */
}
}
/* If the receive failed */
else
{
/* TODO: depending on `running`, different error */
/* TODO: Stop everything */
break;
}
/**
* Like in `dequeue` we don't want the possibility
* of racing back to the top of the loop and locking
* the mutex again right before a thread switch,
* so we make sure that a switch occurs to a different
* thread
*/
Thread.getThis().yield();
}
/* Check if we had an error */
if(running)
{
throw new TristanableException(manager, "bformat socket error");
}
else
{
/* Actual shut down, do nothing */
}
}
}

10
todo Normal file
View File

@ -0,0 +1,10 @@
- [x] Use queues
- [x] Immediate head chop after dequeue
- [x] Thread.yields
- [ ] Sleep option
- [x] Use linked list for queues (increase performance)
- [ ] shutdown option
- [x] Queue policies
- [x] Length cap
- [ ] Exceptions
- [x] Queue deletion

1
tristanable Submodule

@ -0,0 +1 @@
Subproject commit cb68e9f673dd7b2f490ee4ff2262e45b41a90a0f

BIN
tristanable-test-library Executable file

Binary file not shown.