Compare commits
76 Commits
Author | SHA1 | Date |
---|---|---|
Tristan B. Velloza Kildaire | 4bee62e4e2 | |
Tristan B. Velloza Kildaire | b24c3368b3 | |
Tristan B. Velloza Kildaire | a23017a747 | |
Tristan B. Velloza Kildaire | 459f4a8709 | |
Tristan B. Velloza Kildaire | 79432cef6c | |
Tristan B. Velloza Kildaire | c43ed31ca9 | |
Tristan B. Velloza Kildaire | fb36497bda | |
Tristan B. Velloza Kildaire | 153512e9ab | |
Tristan B. Velloza Kildaire | 01a5d779c8 | |
Tristan B. Velloza Kildaire | f8aa3b92cf | |
Tristan B. Velloza Kildaire | 8828e3ffdd | |
Tristan B. Velloza Kildaire | e7f93cd78a | |
Tristan B. Velloza Kildaire | cf431bdac9 | |
Tristan B. Velloza Kildaire | 17da826d07 | |
Tristan B. Velloza Kildaire | 586835a627 | |
Tristan B. Velloza Kildaire | e59dca0c4d | |
Tristan B. Velloza Kildaire | ad99ef63d9 | |
Tristan B. Velloza Kildaire | b58fb718a7 | |
Tristan B. Velloza Kildaire | ddd46f3388 | |
Tristan B. Velloza Kildaire | df4d479e54 | |
Tristan B. Velloza Kildaire | 0c43e80cf6 | |
Tristan B. Velloza Kildaire | 26bdcf7d83 | |
Tristan B. Velloza Kildaire | c241d06ea9 | |
Tristan B. Velloza Kildaire | 9c7d15dc89 | |
Tristan B. Velloza Kildaire | e1c28d9c11 | |
Tristan B. Velloza Kildaire | b4bf4d5af5 | |
Tristan B. Velloza Kildaire | e5a0a280bc | |
Tristan B. Velloza Kildaire | b060b30c44 | |
Tristan B. Velloza Kildaire | ce3772c66c | |
Tristan B. Velloza Kildaire | 256c6154cc | |
Tristan B. Velloza Kildaire | 372d4cdc78 | |
Tristan B. Velloza Kildaire | 1b88d1f8bf | |
Tristan B. Velloza Kildaire | 6382343916 | |
Tristan B. Velloza Kildaire | 4316d4817f | |
Tristan B. Velloza Kildaire | 88be3d08e3 | |
Tristan B. Velloza Kildaire | d9e5e54477 | |
Tristan B. Velloza Kildaire | 4f83991baf | |
Tristan B. Velloza Kildaire | 6d133a15ba | |
Tristan B. Velloza Kildaire | 6a00c620ec | |
Tristan B. Velloza Kildaire | a3ca66db2b | |
Tristan B. Velloza Kildaire | 3ce8bda7de | |
Tristan B. Velloza Kildaire | 7ba3c9f1f7 | |
Tristan B. Velloza Kildaire | 445e008603 | |
Tristan B. Velloza Kildaire | d48cc4267e | |
Tristan B. Velloza Kildaire | 3103adcb0f | |
Tristan B. Velloza Kildaire | e8d4e0ae20 | |
Tristan B. Velloza Kildaire | ea32c7eef4 | |
Tristan B. Velloza Kildaire | 384f286f83 | |
Tristan B. Velloza Kildaire | 7a60a31e0a | |
Tristan B. Velloza Kildaire | a193993c0d | |
Tristan B. Velloza Kildaire | 53f478dde2 | |
Tristan B. Velloza Kildaire | bfa4364a99 | |
Tristan B. Velloza Kildaire | 5f16e8d5b0 | |
Tristan B. Velloza Kildaire | 916bfec075 | |
Tristan B. Velloza Kildaire | 08a0327a5a | |
Tristan B. Velloza Kildaire | 7769ee5c6d | |
Tristan B. Velloza Kildaire | 27facbeaf5 | |
Tristan B. Kildaire | ca63a218c8 | |
Tristan B. Kildaire | e7e82c5491 | |
Tristan B. Kildaire | 7c8ab531ba | |
Tristan B. Kildaire | 9738f504eb | |
Tristan B. Kildaire | 77b4393085 | |
Tristan B. Kildaire | e529e526a7 | |
Tristan B. Kildaire | 848d1b7a63 | |
Tristan B. Kildaire | 984261a394 | |
Tristan B. Kildaire | 2c5d871378 | |
Tristan B. Kildaire | 73322667fa | |
Tristan B. Kildaire | 98d672434a | |
Tristan B. Kildaire | 41199e96fc | |
Tristan B. Kildaire | 96e755d3cc | |
Tristan B. Kildaire | 24d56c93a9 | |
Tristan B. Kildaire | 176310761c | |
Tristan B. Kildaire | 4fab6a3727 | |
Tristan B. Kildaire | 1f07b06316 | |
Tristan B. Kildaire | 02a29b5293 | |
Tristan B. Kildaire | 90835f3c97 |
|
@ -0,0 +1,41 @@
|
|||
# This workflow uses actions that are not certified by GitHub.
|
||||
# They are provided by a third-party and are governed by
|
||||
# separate terms of service, privacy policy, and support
|
||||
# documentation.
|
||||
name: D
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [ "**" ]
|
||||
pull_request:
|
||||
branches: [ "**" ]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
jobs:
|
||||
build:
|
||||
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: dlang-community/setup-dlang@4c99aa991ce7d19dd3064de0a4f2f6b2f152e2d7
|
||||
|
||||
- name: Install Doveralls (code coverage tool)
|
||||
run: |
|
||||
dub fetch doveralls
|
||||
sudo apt install libcurl4-openssl-dev
|
||||
|
||||
- name: 'Build & Test'
|
||||
run: |
|
||||
# Build the project, with its main file included, without unittests
|
||||
dub build --compiler=$DC
|
||||
# Build and run tests, as defined by `unittest` configuration
|
||||
# In this mode, `mainSourceFile` is excluded and `version (unittest)` are included
|
||||
# See https://dub.pm/package-format-json.html#configurations
|
||||
dub test --compiler=$DC --coverage
|
||||
|
||||
- name: Coverage upload
|
||||
run: |
|
||||
dub run doveralls -- -t ${{secrets.COVERALLS_REPO_TOKEN}}
|
|
@ -22,3 +22,6 @@ docs/
|
|||
|
||||
# Code coverage
|
||||
*.lst
|
||||
source/tristanable/queue.d
|
||||
dub.selections.json
|
||||
tristanable-test-library
|
||||
|
|
195
README.md
195
README.md
|
@ -1,38 +1,175 @@
|
|||
![](https://code.dlang.org/packages/tristanable/logo?s=5ef1c9f1250f57dd4c37efbf)
|
||||
![](branding/logo_small.png)
|
||||
|
||||
tristanable
|
||||
===========
|
||||
|
||||
Tag-based asynchronous messaging framework
|
||||
[![D](https://github.com/deavmi/tristanable/actions/workflows/d.yml/badge.svg)](https://github.com/deavmi/tristanable/actions/workflows/d.yml) ![DUB](https://img.shields.io/dub/v/tristanable?color=%23c10000ff%20&style=flat-square) ![DUB](https://img.shields.io/dub/dt/tristanable?style=flat-square) ![DUB](https://img.shields.io/dub/l/tristanable?style=flat-square) [![Coverage Status](https://coveralls.io/repos/github/deavmi/tristanable/badge.svg?branch=master)](https://coveralls.io/github/deavmi/tristanable?branch=master)
|
||||
|
||||
## Usage
|
||||
|
||||
The entry point is via the `Manager` type, so first create an instance as follows (passing the endpoint `Socket` in as `socket` in this example):
|
||||
**Tristanable** is a library for D-based libraries and applications that need a way to receive variable-length messages of different types (via a `Socket`) and place these messages into their own respectively tagged queues indicated by their _"type"_ or `id`.
|
||||
|
||||
## What problems does it solve?
|
||||
|
||||
### Human example
|
||||
|
||||
Say now you made a request to a server with a tag `1` and expect a reply with that same tag `1`. Now, for a moment, think about what would happen in a tagless system. You would be expecting a reply, say now the weather report for your area, but what if the server has another thread that writes an instant messenger notification to the server's socket before the weather message is sent? Now you will interpret those bytes as if they were a weather message.
|
||||
|
||||
Tristanable provides a way for you to receive the "IM notification first" but block and dequeue (when it arrives in the queue) for the "weather report". Irrespective of wether (no pun intended) the weather report arrives before the "IM notification" or after.
|
||||
|
||||
### Code example
|
||||
|
||||
Below is a fully-fledged example of the types of places (networking) where tristanable can be of help. The code is all explained in the comments:
|
||||
|
||||
```d
|
||||
Manager manager = new Manager(socket);
|
||||
import std.socket;
|
||||
import std.stdio;
|
||||
import core.thread;
|
||||
|
||||
Address serverAddress = parseAddress("::1", 0);
|
||||
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
|
||||
server.bind(serverAddress);
|
||||
server.listen(0);
|
||||
|
||||
class ServerThread : Thread
|
||||
{
|
||||
this()
|
||||
{
|
||||
super(&worker);
|
||||
}
|
||||
|
||||
private void worker()
|
||||
{
|
||||
Socket clientSocket = server.accept();
|
||||
BClient bClient = new BClient(clientSocket);
|
||||
|
||||
Thread.sleep(dur!("seconds")(7));
|
||||
writeln("Server start");
|
||||
|
||||
/**
|
||||
* Create a tagged message to send
|
||||
*
|
||||
* tag 42 payload Cucumber 😳️
|
||||
*/
|
||||
TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️");
|
||||
byte[] tEncoded = message.encode();
|
||||
writeln("server send status: ", bClient.sendMessage(tEncoded));
|
||||
|
||||
writeln("server send [done]");
|
||||
|
||||
/**
|
||||
* Create a tagged message to send
|
||||
*
|
||||
* tag 69 payload Hello
|
||||
*/
|
||||
message = new TaggedMessage(69, cast(byte[])"Hello");
|
||||
tEncoded = message.encode();
|
||||
writeln("server send status: ", bClient.sendMessage(tEncoded));
|
||||
|
||||
writeln("server send [done]");
|
||||
|
||||
/**
|
||||
* Create a tagged message to send
|
||||
*
|
||||
* tag 69 payload Bye
|
||||
*/
|
||||
message = new TaggedMessage(69, cast(byte[])"Bye");
|
||||
tEncoded = message.encode();
|
||||
writeln("server send status: ", bClient.sendMessage(tEncoded));
|
||||
|
||||
writeln("server send [done]");
|
||||
|
||||
/**
|
||||
* Create a tagged message to send
|
||||
*
|
||||
* tag 100 payload Bye
|
||||
*/
|
||||
message = new TaggedMessage(100, cast(byte[])"DEFQUEUE_1");
|
||||
tEncoded = message.encode();
|
||||
writeln("server send status: ", bClient.sendMessage(tEncoded));
|
||||
|
||||
writeln("server send [done]");
|
||||
|
||||
/**
|
||||
* Create a tagged message to send
|
||||
*
|
||||
* tag 200 payload Bye
|
||||
*/
|
||||
message = new TaggedMessage(200, cast(byte[])"DEFQUEUE_2");
|
||||
tEncoded = message.encode();
|
||||
writeln("server send status: ", bClient.sendMessage(tEncoded));
|
||||
|
||||
writeln("server send [done]");
|
||||
}
|
||||
}
|
||||
|
||||
ServerThread serverThread = new ServerThread();
|
||||
serverThread.start();
|
||||
|
||||
Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
|
||||
|
||||
writeln(server.localAddress);
|
||||
|
||||
|
||||
Manager manager = new Manager(client);
|
||||
|
||||
Queue sixtyNine = new Queue(69);
|
||||
Queue fortyTwo = new Queue(42);
|
||||
|
||||
manager.registerQueue(sixtyNine);
|
||||
manager.registerQueue(fortyTwo);
|
||||
|
||||
// Register a default queue (tag ignored)
|
||||
Queue defaultQueue = new Queue(2332);
|
||||
manager.setDefaultQueue(defaultQueue);
|
||||
|
||||
|
||||
/* Connect our socket to the server */
|
||||
client.connect(server.localAddress);
|
||||
|
||||
/* Start the manager and let it manage the socket */
|
||||
manager.start();
|
||||
|
||||
/* Block on the unittest thread for a received message */
|
||||
writeln("unittest thread: Dequeue() blocking...");
|
||||
TaggedMessage dequeuedMessage = sixtyNine.dequeue();
|
||||
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
|
||||
assert(dequeuedMessage.getTag() == 69);
|
||||
assert(dequeuedMessage.getPayload() == cast(byte[])"Hello");
|
||||
|
||||
/* Block on the unittest thread for a received message */
|
||||
writeln("unittest thread: Dequeue() blocking...");
|
||||
dequeuedMessage = sixtyNine.dequeue();
|
||||
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
|
||||
assert(dequeuedMessage.getTag() == 69);
|
||||
assert(dequeuedMessage.getPayload() == cast(byte[])"Bye");
|
||||
|
||||
/* Block on the unittest thread for a received message */
|
||||
writeln("unittest thread: Dequeue() blocking...");
|
||||
dequeuedMessage = fortyTwo.dequeue();
|
||||
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
|
||||
assert(dequeuedMessage.getTag() == 42);
|
||||
assert(dequeuedMessage.getPayload() == cast(byte[])"Cucumber 😳️");
|
||||
|
||||
|
||||
/* Dequeue two messages from the default queue */
|
||||
writeln("unittest thread: Dequeue() blocking...");
|
||||
dequeuedMessage = defaultQueue.dequeue();
|
||||
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
|
||||
assert(dequeuedMessage.getTag() == 100);
|
||||
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_1");
|
||||
|
||||
writeln("unittest thread: Dequeue() blocking...");
|
||||
dequeuedMessage = defaultQueue.dequeue();
|
||||
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
|
||||
assert(dequeuedMessage.getTag() == 200);
|
||||
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_2");
|
||||
|
||||
|
||||
/* Stop the manager */
|
||||
manager.stop();
|
||||
```
|
||||
|
||||
Now the event loop would have started, now we are ready to send out some tagged messages and blocking receive for them!
|
||||
|
||||
Let's send out two messages with tags `1` and `2`:
|
||||
|
||||
```d
|
||||
manager.sendMessage(1, [1,2,3,4,5]);
|
||||
manager.sendMessage(2, [6,7,8,9,0]);
|
||||
```
|
||||
|
||||
Now we can start two seperate threads and wait on them both:
|
||||
|
||||
```d
|
||||
byte[] receivedData = manager.receiveMessage(1);
|
||||
```
|
||||
|
||||
```d
|
||||
byte[] receivedData = manager.receiveMessage(2);
|
||||
```
|
||||
|
||||
**TODO**
|
||||
And let tristanable handle it! We even handle the message lengths and everything using another great project [bformat](https://deavmi.assigned.network/projects/bformat).
|
||||
|
||||
## Format
|
||||
|
||||
|
@ -40,6 +177,10 @@ byte[] receivedData = manager.receiveMessage(2);
|
|||
[4 bytes (size-2, little endian)][8 bytes - tag][(2-size) bytes - data]
|
||||
```
|
||||
|
||||
## Acknowledgements
|
||||
## Using tristanable in your D project
|
||||
|
||||
Thansk to Gabby Smuts for the name suggestion 😉️
|
||||
You can easily add the library (source-based) to your project by running the following command in your project's root:
|
||||
|
||||
```bash
|
||||
dub add tristanable
|
||||
```
|
||||
|
|
Binary file not shown.
Binary file not shown.
After Width: | Height: | Size: 2.5 KiB |
Binary file not shown.
13
dub.json
13
dub.json
|
@ -1,14 +1,15 @@
|
|||
{
|
||||
"authors": [
|
||||
"Tristan B. Kildaire"
|
||||
"Tristan B. Velloza Kildaire"
|
||||
],
|
||||
"copyright": "Copyright © 2020, Tristan B. Kildaire",
|
||||
"copyright": "Copyright © 2023, Tristan B. Kildaire",
|
||||
"dependencies": {
|
||||
"bformat": "~>1.0.8"
|
||||
"bformat": ">=4.1.1",
|
||||
"niknaks": ">=0.3.0"
|
||||
},
|
||||
"description": "Tag-based asynchronous messaging framework",
|
||||
"description": "Tristanable network message queuing framework",
|
||||
"homepage": "https://deavmi.assigned.network/projects/tristanable",
|
||||
"license": "LGPL-3.0",
|
||||
"name": "tristanable",
|
||||
"targetType": "library",
|
||||
"sourcePaths": ["source/tristanable"]
|
||||
"targetType": "library"
|
||||
}
|
|
@ -1,6 +0,0 @@
|
|||
{
|
||||
"fileVersion": 1,
|
||||
"versions": {
|
||||
"bformat": "1.0.8"
|
||||
}
|
||||
}
|
|
@ -1,15 +0,0 @@
|
|||
.dub
|
||||
docs.json
|
||||
__dummy.html
|
||||
docs/
|
||||
/example
|
||||
example.so
|
||||
example.dylib
|
||||
example.dll
|
||||
example.a
|
||||
example.lib
|
||||
example-test-*
|
||||
*.exe
|
||||
*.o
|
||||
*.obj
|
||||
*.lst
|
|
@ -1,12 +0,0 @@
|
|||
{
|
||||
"authors": [
|
||||
"Tristan B. Kildaire"
|
||||
],
|
||||
"copyright": "Copyright © 2020, Tristan B. Kildaire",
|
||||
"dependencies": {
|
||||
"tristanable": "~>0.0.10"
|
||||
},
|
||||
"description": "A minimal D application.",
|
||||
"license": "proprietary",
|
||||
"name": "example"
|
||||
}
|
|
@ -1,7 +0,0 @@
|
|||
{
|
||||
"fileVersion": 1,
|
||||
"versions": {
|
||||
"bformat": "1.0.8",
|
||||
"tristanable": "0.0.26"
|
||||
}
|
||||
}
|
|
@ -1,59 +0,0 @@
|
|||
import std.stdio;
|
||||
import tristanable.manager : Manager;
|
||||
import std.socket;
|
||||
import core.thread;
|
||||
|
||||
void main()
|
||||
{
|
||||
writeln("Edit source/app.d to start your project.");
|
||||
Socket socket = new Socket(AddressFamily.INET, SocketType.STREAM, ProtocolType.TCP);
|
||||
socket.connect(parseAddress("127.0.0.1",7777));
|
||||
Manager manager = new Manager(socket);
|
||||
|
||||
class bruh : Thread
|
||||
{
|
||||
this()
|
||||
{
|
||||
super(&run);
|
||||
}
|
||||
|
||||
private void run()
|
||||
{
|
||||
while(true)
|
||||
{
|
||||
manager.lockQueue();
|
||||
writeln(manager.getQueue());
|
||||
manager.unlockQueue();
|
||||
import core.thread;
|
||||
|
||||
Thread.sleep(dur!("seconds")(1));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
new bruh().start();
|
||||
|
||||
manager.sendMessage(69, [77]);
|
||||
manager.sendMessage(70, [78]);
|
||||
|
||||
|
||||
byte[] receivedKaka = manager.receiveMessage(69);
|
||||
writeln(receivedKaka);
|
||||
|
||||
receivedKaka = manager.receiveMessage(70);
|
||||
writeln(receivedKaka);
|
||||
|
||||
manager.sendMessage(70, [78]);
|
||||
|
||||
receivedKaka = manager.receiveMessage(70);
|
||||
writeln(receivedKaka);
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -1,15 +0,0 @@
|
|||
.dub
|
||||
docs.json
|
||||
__dummy.html
|
||||
docs/
|
||||
/server
|
||||
server.so
|
||||
server.dylib
|
||||
server.dll
|
||||
server.a
|
||||
server.lib
|
||||
server-test-*
|
||||
*.exe
|
||||
*.o
|
||||
*.obj
|
||||
*.lst
|
|
@ -1,13 +0,0 @@
|
|||
{
|
||||
"authors": [
|
||||
"Tristan B. Kildaire"
|
||||
],
|
||||
"copyright": "Copyright © 2020, Tristan B. Kildaire",
|
||||
"dependencies": {
|
||||
"bformat": "~>1.0.8",
|
||||
"tristanable": "~>0.0.13"
|
||||
},
|
||||
"description": "A minimal D application.",
|
||||
"license": "proprietary",
|
||||
"name": "server"
|
||||
}
|
|
@ -1,7 +0,0 @@
|
|||
{
|
||||
"fileVersion": 1,
|
||||
"versions": {
|
||||
"bformat": "1.0.8",
|
||||
"tristanable": "0.0.26"
|
||||
}
|
||||
}
|
|
@ -1,37 +0,0 @@
|
|||
import std.stdio;
|
||||
import std.socket;
|
||||
import tristanable.encoding : DataMessage;
|
||||
import bmessage;
|
||||
import core.thread;
|
||||
|
||||
void main()
|
||||
{
|
||||
writeln("Edit source/app.d to start your project.");
|
||||
Socket socket = new Socket(AddressFamily.INET, SocketType.STREAM, ProtocolType.TCP);
|
||||
socket.bind(parseAddress("127.0.0.1",7777));
|
||||
socket.listen(1);
|
||||
|
||||
Socket conn = socket.accept();
|
||||
byte[] receivedData;
|
||||
|
||||
while(true)
|
||||
{
|
||||
receiveMessage(conn, receivedData);
|
||||
|
||||
DataMessage message = DataMessage.decode(receivedData);
|
||||
|
||||
writeln("Tag: ", message.tag);
|
||||
writeln("Data: ", message.data);
|
||||
|
||||
DataMessage d = new DataMessage(70, [2]);
|
||||
sendMessage(conn, d.encode());
|
||||
|
||||
d = new DataMessage(69, [1]);
|
||||
Thread.sleep(dur!("seconds")(5));
|
||||
sendMessage(conn, d.encode());
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -1,46 +1,168 @@
|
|||
/**
|
||||
* Encoding/decoding of the tristanable format
|
||||
*/
|
||||
module tristanable.encoding;
|
||||
|
||||
public final class DataMessage
|
||||
import std.conv : to;
|
||||
import niknaks.bits : bytesToIntegral, Order, order, toBytes;
|
||||
|
||||
/**
|
||||
* Represents a tagged message that has been decoded
|
||||
* from its raw byte encoding, this is a tuple of
|
||||
* a numeric tag and a byte array of payload data
|
||||
*
|
||||
* Also provides a static method to decode from such
|
||||
* raw encoding and an instance method to do the reverse
|
||||
*/
|
||||
public final class TaggedMessage
|
||||
{
|
||||
/**
|
||||
* This message's tag
|
||||
*/
|
||||
private ulong tag;
|
||||
|
||||
public ulong tag;
|
||||
public byte[] data;
|
||||
|
||||
public static DataMessage decode(byte[] bytes)
|
||||
{
|
||||
/* Fetch the `tag` */
|
||||
ulong receivedTag = *(cast(ulong*)bytes.ptr);
|
||||
|
||||
/* Fetch the `data` */
|
||||
byte[] receivedData = bytes[8..bytes.length];
|
||||
|
||||
return new DataMessage(receivedTag, receivedData);
|
||||
}
|
||||
/**
|
||||
* The payload
|
||||
*/
|
||||
private byte[] data;
|
||||
|
||||
/**
|
||||
* Constructs a new TaggedMessage with the given tag and payload
|
||||
*
|
||||
* Params:
|
||||
* tag = the tag to use
|
||||
* data = the payload
|
||||
*/
|
||||
this(ulong tag, byte[] data)
|
||||
{
|
||||
this.tag = tag;
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parameterless constructor used for decoder
|
||||
*/
|
||||
private this() {}
|
||||
|
||||
/**
|
||||
* Decodes the wire-formatted tristanable bytes into an instance
|
||||
* of TaggedMessage whereby the tag and data can be seperately
|
||||
* accessed and manipulated
|
||||
*
|
||||
* Params:
|
||||
* encodedMessage = the wire-format encoded bytes
|
||||
* Returns: an instance of TaggedMessage
|
||||
*/
|
||||
public static TaggedMessage decode(byte[] encodedMessage)
|
||||
{
|
||||
/* The decoded message */
|
||||
TaggedMessage decodedMessage = new TaggedMessage();
|
||||
|
||||
/* The decoded tag */
|
||||
ulong decodedTag;
|
||||
|
||||
/* Take ulong-many bytes and only flip them to LE if not on LE host */
|
||||
decodedTag = order(bytesToIntegral!(ushort)(cast(ubyte[])encodedMessage), Order.LE);
|
||||
|
||||
|
||||
/* Set the tag */
|
||||
decodedMessage.setTag(decodedTag);
|
||||
|
||||
/* Set the data *(9-th byte onwards) */
|
||||
decodedMessage.setPayload(encodedMessage[8..$]);
|
||||
|
||||
return decodedMessage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Encodes the tagged message into the tristanable
|
||||
* wire format ready for transmission
|
||||
*
|
||||
* Returns: the encoded bytes
|
||||
*/
|
||||
public byte[] encode()
|
||||
{
|
||||
/* Construct the message array */
|
||||
byte[] messageData;
|
||||
/* The encoded bytes */
|
||||
byte[] encodedMessage;
|
||||
|
||||
/* Add the `tag` bytes */
|
||||
messageData ~= *(cast(byte*)&tag);
|
||||
messageData ~= *(cast(byte*)&tag+1);
|
||||
messageData ~= *(cast(byte*)&tag+2);
|
||||
messageData ~= *(cast(byte*)&tag+3);
|
||||
messageData ~= *(cast(byte*)&tag+4);
|
||||
messageData ~= *(cast(byte*)&tag+5);
|
||||
messageData ~= *(cast(byte*)&tag+6);
|
||||
messageData ~= *(cast(byte*)&tag+7);
|
||||
|
||||
/* Add the `data` bytes (the actual message) */
|
||||
messageData ~= data;
|
||||
/* If on little endian then no re-order, if host is BE flip (the tag) */
|
||||
encodedMessage ~= toBytes(order(tag, Order.LE));
|
||||
|
||||
return messageData;
|
||||
|
||||
/* Tack on the data */
|
||||
encodedMessage ~= data;
|
||||
|
||||
return encodedMessage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the message's payload
|
||||
*
|
||||
* Returns: the payload
|
||||
*/
|
||||
public byte[] getPayload()
|
||||
{
|
||||
return data;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the message's tag
|
||||
*
|
||||
* Returns: the tag
|
||||
*/
|
||||
public ulong getTag()
|
||||
{
|
||||
return tag;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the message's payload
|
||||
*
|
||||
* Params:
|
||||
* newPayload = the payload to use
|
||||
*/
|
||||
public void setPayload(byte[] newPayload)
|
||||
{
|
||||
this.data = newPayload;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the message's tag
|
||||
*
|
||||
* Params:
|
||||
* newTag = the tag to use
|
||||
*/
|
||||
public void setTag(ulong newTag)
|
||||
{
|
||||
this.tag = newTag;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a string representation of the TaggedMessage
|
||||
*
|
||||
* Returns: the string represenation
|
||||
*/
|
||||
public override string toString()
|
||||
{
|
||||
return "TMessage [Tag: "~to!(string)(tag)~", Payload: "~to!(string)(data)~"]";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test encoding and decoding
|
||||
*/
|
||||
unittest
|
||||
{
|
||||
/* Setup testing data */
|
||||
TaggedMessage testData = new TaggedMessage(420, [1,2,3]);
|
||||
|
||||
/* Encode */
|
||||
byte[] encoded = testData.encode();
|
||||
|
||||
/* Decode */
|
||||
TaggedMessage decoded = TaggedMessage.decode(encoded);
|
||||
|
||||
/* Now ensure that `decoded` == original `testData` */
|
||||
assert(decoded.getTag() == testData.getTag);
|
||||
assert(decoded.getPayload() == testData.getPayload());
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
/**
|
||||
* Error handling type definitions
|
||||
*/
|
||||
module tristanable.exceptions;
|
||||
|
||||
import std.conv : to;
|
||||
|
||||
/**
|
||||
* The type of sub-error of the `TristanableException`
|
||||
*/
|
||||
public enum ErrorType
|
||||
{
|
||||
/**
|
||||
* If the requested queue could not be found
|
||||
*/
|
||||
QUEUE_NOT_FOUND,
|
||||
|
||||
/**
|
||||
* If the queue wanting to be registered has already
|
||||
* been registered under the same tag
|
||||
*/
|
||||
QUEUE_ALREADY_EXISTS,
|
||||
|
||||
/**
|
||||
* If no default queue is configured
|
||||
*/
|
||||
NO_DEFAULT_QUEUE,
|
||||
|
||||
/**
|
||||
* The blocking call to `dequeue()`, somehow, failed
|
||||
*/
|
||||
DEQUEUE_FAILED,
|
||||
|
||||
/**
|
||||
* The call to `enqueue()`, somehow, failed
|
||||
*/
|
||||
ENQUEUE_FAILED
|
||||
}
|
||||
|
||||
/**
|
||||
* Any sort of error that occurs during runtime of the tristanable
|
||||
* engine
|
||||
*/
|
||||
public class TristanableException : Exception
|
||||
{
|
||||
/**
|
||||
* The sub-error type
|
||||
*/
|
||||
private ErrorType err;
|
||||
|
||||
/**
|
||||
* Constructs a new `TristanableException` with the provided
|
||||
* sub-error type
|
||||
*
|
||||
* Params:
|
||||
* err = the `ErrorType`
|
||||
*/
|
||||
this(ErrorType err)
|
||||
{
|
||||
super(this.classinfo.name~": "~to!(string)(err));
|
||||
this.err = err;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the sub-error type
|
||||
*
|
||||
* Returns: the sub-error type as a `ErrorType`
|
||||
*/
|
||||
public ErrorType getError()
|
||||
{
|
||||
return err;
|
||||
}
|
||||
}
|
|
@ -1,77 +0,0 @@
|
|||
module tristanable.garbage;
|
||||
|
||||
import tristanable.manager : Manager;
|
||||
import tristanable.request : Request;
|
||||
import std.socket : Socket;
|
||||
import core.thread : Thread, Duration, dur;
|
||||
import bmessage : receiveMessage;
|
||||
|
||||
public final class GarbageCollector : Thread
|
||||
{
|
||||
|
||||
/**
|
||||
* The associated manager
|
||||
*/
|
||||
private Manager manager;
|
||||
|
||||
/**
|
||||
* The queue variable pointer
|
||||
*/
|
||||
private Request[]* requestQueueVariable;
|
||||
|
||||
/**
|
||||
* Whether or not the watcher is active
|
||||
*/
|
||||
private bool isActive;
|
||||
|
||||
this(Manager manager)
|
||||
{
|
||||
/* Set the worker function */
|
||||
super(&cleaner);
|
||||
|
||||
/* Set the manager */
|
||||
this.manager = manager;
|
||||
|
||||
/* Set the pointer */
|
||||
requestQueueVariable = cast(Request[]*)manager.getQueueVariable();
|
||||
|
||||
isActive = true;
|
||||
}
|
||||
|
||||
public void stopGC()
|
||||
{
|
||||
isActive = false;
|
||||
}
|
||||
|
||||
/* TODO: Add timeout ability */
|
||||
private void cleaner()
|
||||
{
|
||||
while(isActive)
|
||||
{
|
||||
/* Lock the queue */
|
||||
manager.lockQueue();
|
||||
|
||||
/* Construct a new list */
|
||||
Request[] newList;
|
||||
|
||||
/* Only add to this list undead requests */
|
||||
foreach(Request request; *requestQueueVariable)
|
||||
{
|
||||
if(!request.isDead)
|
||||
{
|
||||
newList ~= request;
|
||||
}
|
||||
}
|
||||
|
||||
/* Update the queue to the new queue */
|
||||
*requestQueueVariable = newList;
|
||||
|
||||
/* Unlock the queue */
|
||||
manager.unlockQueue();
|
||||
|
||||
/* Sleep for 60 seconds after cleaning up */
|
||||
sleep(dur!("seconds")(60));
|
||||
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,295 +0,0 @@
|
|||
module tristanable.manager;
|
||||
|
||||
import tristanable.watcher : Watcher;
|
||||
import tristanable.request : Request;
|
||||
import tristanable.garbage : GarbageCollector;
|
||||
import tristanable.encoding : DataMessage;
|
||||
import tristanable.notifications : NotificationReply;
|
||||
import std.socket : Socket;
|
||||
import core.sync.mutex : Mutex;
|
||||
import bmessage : bSendMessage = sendMessage;
|
||||
|
||||
/* TODO: Watcher class to watch for stuff, and add to manager's queues */
|
||||
/* TODO: maneger class to use commands on, enqueue and wait for dequeue */
|
||||
public final class Manager
|
||||
{
|
||||
/* TODO: Insert queues here */
|
||||
|
||||
/**
|
||||
* The queue of outstanding requests
|
||||
*/
|
||||
private Request[] requestQueue;
|
||||
|
||||
/**
|
||||
* Reserved tags
|
||||
*/
|
||||
private ulong[] reservedTags;
|
||||
|
||||
/**
|
||||
* The queue of received notifications
|
||||
*/
|
||||
private NotificationReply[] notificationQueue;
|
||||
|
||||
/**
|
||||
* The associated Watcher object for this manager.
|
||||
*/
|
||||
private Watcher watcher;
|
||||
|
||||
/**
|
||||
* The list mutex
|
||||
*/
|
||||
private Mutex queueMutex;
|
||||
|
||||
/**
|
||||
* The notification queue mutex
|
||||
*/
|
||||
private Mutex notificationMutex;
|
||||
|
||||
/**
|
||||
* The remote host
|
||||
*/
|
||||
private Socket socket;
|
||||
|
||||
/**
|
||||
* The garbage collector
|
||||
*/
|
||||
private GarbageCollector gc;
|
||||
|
||||
this(Socket endpoint)
|
||||
{
|
||||
/* Set the socket */
|
||||
socket = endpoint;
|
||||
|
||||
/* Create the watcher */
|
||||
watcher = new Watcher(this, endpoint);
|
||||
|
||||
/* Create the garbage collector */
|
||||
gc = new GarbageCollector(this);
|
||||
|
||||
/* Initialize the `requestQueue` mutex */
|
||||
queueMutex = new Mutex();
|
||||
|
||||
/* Initialize the `notificationQueue` mutex */
|
||||
notificationMutex = new Mutex();
|
||||
|
||||
/* Start the watcher */
|
||||
watcher.start();
|
||||
|
||||
/* Start the garbage collector */
|
||||
gc.start();
|
||||
}
|
||||
|
||||
public void stopManager()
|
||||
{
|
||||
/* Will caue watcher to not block */
|
||||
socket.close();
|
||||
|
||||
/* Stop watcher */
|
||||
watcher.stopWatcher();
|
||||
|
||||
/* Stop gc */
|
||||
gc.stopGC();
|
||||
|
||||
/* Wait for watcher thread to stop */
|
||||
watcher.join();
|
||||
|
||||
/* Wait for garbage collector thread to stop */
|
||||
gc.join();
|
||||
}
|
||||
|
||||
public void sendMessage(ulong tag, byte[] data)
|
||||
{
|
||||
/* Encode the message */
|
||||
DataMessage dataMessage = new DataMessage(tag, data);
|
||||
|
||||
/* Construct the message array */
|
||||
byte[] messageData = dataMessage.encode();
|
||||
|
||||
/* Create a new Request */
|
||||
Request newRequest = new Request(tag);
|
||||
|
||||
/* Lock the queue for reading */
|
||||
lockQueue();
|
||||
|
||||
/* Add the request to the request queue */
|
||||
requestQueue ~= newRequest;
|
||||
|
||||
/* Unlock the queue */
|
||||
unlockQueue();
|
||||
|
||||
/* Send the message */
|
||||
bSendMessage(socket, messageData);
|
||||
}
|
||||
|
||||
public bool isValidTag(ulong tag)
|
||||
{
|
||||
for(ulong i = 0; i < requestQueue.length; i++)
|
||||
{
|
||||
/* Get the request */
|
||||
Request request = requestQueue[i];
|
||||
|
||||
/**
|
||||
* Only if the tag is found then return true
|
||||
* and if it is the fresh tagged request (not
|
||||
* ones that are dead using the) same tag.
|
||||
*/
|
||||
if(request.isDead == false && request.tag == tag)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public ulong getTagPosition(ulong tag)
|
||||
{
|
||||
for(ulong i = 0; i < requestQueue.length; i++)
|
||||
{
|
||||
/* Get the request */
|
||||
Request request = requestQueue[i];
|
||||
|
||||
/**
|
||||
* Only if the tag is found then return its
|
||||
* posistion and if it is the fresh tagged
|
||||
* request (not ones that are dead using the)
|
||||
* same tag.
|
||||
*/
|
||||
if(request.isDead == false && request.tag == tag)
|
||||
{
|
||||
return i;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public byte[] receiveMessage(ulong tag)
|
||||
{
|
||||
/* The received data */
|
||||
byte[] receivedData;
|
||||
|
||||
bool active = true;
|
||||
|
||||
/* Loop till fulfilled */
|
||||
while(active)
|
||||
{
|
||||
/* Lock the queue for reading */
|
||||
lockQueue();
|
||||
|
||||
/* Throw an exception if it doesn't exist */
|
||||
if(!isValidTag(tag))
|
||||
{
|
||||
/* Unlock the queue */
|
||||
unlockQueue();
|
||||
|
||||
/* Throw exception here */
|
||||
throw new TristanFokop("Invalid tag");
|
||||
}
|
||||
|
||||
/* Get the request */
|
||||
Request request = requestQueue[getTagPosition(tag)];
|
||||
|
||||
/* Check if the request has been fulfilled */
|
||||
if(request.isFulfilled())
|
||||
{
|
||||
receivedData = request.pullData();
|
||||
|
||||
|
||||
active = false;
|
||||
}
|
||||
|
||||
/* Unlock the queue */
|
||||
unlockQueue();
|
||||
}
|
||||
|
||||
return receivedData;
|
||||
}
|
||||
|
||||
public Request[] getQueue()
|
||||
{
|
||||
return requestQueue;
|
||||
}
|
||||
|
||||
public Request[]* getQueueVariable()
|
||||
{
|
||||
return &requestQueue;
|
||||
}
|
||||
|
||||
public void lockQueue()
|
||||
{
|
||||
queueMutex.lock();
|
||||
}
|
||||
|
||||
public void unlockQueue()
|
||||
{
|
||||
queueMutex.unlock();
|
||||
}
|
||||
|
||||
public void lockNotificationQueue()
|
||||
{
|
||||
notificationMutex.lock();
|
||||
}
|
||||
|
||||
public void unlockNotificationQueue()
|
||||
{
|
||||
notificationMutex.unlock();
|
||||
}
|
||||
|
||||
public NotificationReply[] popNotifications()
|
||||
{
|
||||
/* The notifications at this moment */
|
||||
NotificationReply[] currentNotificationSet;
|
||||
|
||||
/* Lock the notification queue */
|
||||
lockNotificationQueue();
|
||||
|
||||
/* Copy the current notifications */
|
||||
currentNotificationSet = notificationQueue;
|
||||
|
||||
/* Empty the notification list */
|
||||
notificationQueue.length = 0;
|
||||
|
||||
/* Unlock the notification queue */
|
||||
unlockNotificationQueue();
|
||||
|
||||
return currentNotificationSet;
|
||||
}
|
||||
|
||||
public void reserveTag(ulong tag)
|
||||
{
|
||||
reservedTags ~= tag;
|
||||
}
|
||||
|
||||
public bool isReservedTag(ulong tag)
|
||||
{
|
||||
foreach(ulong currentTag; reservedTags)
|
||||
{
|
||||
if(currentTag == tag)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public void addNotification(NotificationReply notificationReply)
|
||||
{
|
||||
/* Lock the notification queue */
|
||||
lockNotificationQueue();
|
||||
|
||||
/* Append the notification */
|
||||
notificationQueue ~= notificationReply;
|
||||
|
||||
/* Unlock the notification queue */
|
||||
unlockNotificationQueue();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public final class TristanFokop : Exception
|
||||
{
|
||||
this(string message)
|
||||
{
|
||||
super(message);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/**
|
||||
* Configuration for the manager
|
||||
*/
|
||||
module tristanable.manager.config;
|
||||
|
||||
/**
|
||||
* Manager parameters
|
||||
*/
|
||||
public struct Config
|
||||
{
|
||||
/**
|
||||
* If set to true then when one uses
|
||||
* `sendMessage(TaggedMessage)` the
|
||||
* manager will check if a queue with
|
||||
* said tag has not been registered
|
||||
* and if so, then register it for
|
||||
* us before encoding-and-sending
|
||||
*/
|
||||
public bool registerOnSend;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates the default configuration to use for
|
||||
* the manager
|
||||
*
|
||||
* Returns: the Config
|
||||
*/
|
||||
public Config defaultConfig()
|
||||
{
|
||||
Config config;
|
||||
config.registerOnSend = false;
|
||||
|
||||
return config;
|
||||
}
|
|
@ -0,0 +1,549 @@
|
|||
/**
|
||||
* Management of a tristanable instance
|
||||
*/
|
||||
module tristanable.manager.manager;
|
||||
|
||||
import std.socket;
|
||||
import tristanable.queue.queue : Queue;
|
||||
import core.sync.mutex : Mutex;
|
||||
import tristanable.manager.watcher : Watcher;
|
||||
import tristanable.encoding : TaggedMessage;
|
||||
import tristanable.exceptions;
|
||||
import std.container.slist : SList;
|
||||
import tristanable.manager.config;
|
||||
import river.core;
|
||||
import river.impls.sock : SockStream;
|
||||
import bformat.client;
|
||||
|
||||
/**
|
||||
* Manages a provided socket by spawning
|
||||
* a watcher thread to read from it and file
|
||||
* mail into the corresponding queues.
|
||||
*
|
||||
* Queues are managed via this an instance
|
||||
* of a manager.
|
||||
*/
|
||||
public class Manager
|
||||
{
|
||||
/**
|
||||
* Configuration
|
||||
*/
|
||||
private Config config;
|
||||
|
||||
/**
|
||||
* The bformat client to read and write from
|
||||
*/
|
||||
private BClient bClient;
|
||||
|
||||
/**
|
||||
* Currently registered queues
|
||||
*/
|
||||
private SList!(Queue) queues;
|
||||
|
||||
/**
|
||||
* Lock for currently registered queues
|
||||
*/
|
||||
private Mutex queuesLock;
|
||||
|
||||
/**
|
||||
* Default queue
|
||||
*/
|
||||
private Queue defaultQueue;
|
||||
|
||||
/**
|
||||
* Watcher which manages the socket and
|
||||
* enqueues new messages into the respective
|
||||
* quueue for us
|
||||
*/
|
||||
private Watcher watcher;
|
||||
|
||||
/**
|
||||
* Constructs a new manager which will read from
|
||||
* this socket and file mail for us
|
||||
*
|
||||
* Params:
|
||||
* stream = the underlying stream to use
|
||||
*/
|
||||
this(Stream stream, Config config = defaultConfig())
|
||||
{
|
||||
this.bClient = new BClient(stream);
|
||||
this.queuesLock = new Mutex();
|
||||
this.config = config;
|
||||
this.watcher = new Watcher(this, bClient);
|
||||
}
|
||||
|
||||
// TODO: Comment this
|
||||
// This is for backwards compatibility (whereby a `Socket` was taken in)
|
||||
this(Socket socket, Config config = defaultConfig())
|
||||
{
|
||||
this(new SockStream(socket), config);
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the management of the socket,
|
||||
* resulting in queues being updated upon
|
||||
* reciving messages tagged for them
|
||||
*/
|
||||
public void start()
|
||||
{
|
||||
watcher.startWatcher();
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the management of the socket, resulting
|
||||
* in ending the updating of queues and closing
|
||||
* the underlying connection
|
||||
*/
|
||||
public void stop()
|
||||
{
|
||||
watcher.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the queue mathcing the provided id
|
||||
*
|
||||
* Params:
|
||||
* id = the id to lookup by
|
||||
* Returns: the Queue
|
||||
* Throws: TristanableException if the queue is not found
|
||||
*/
|
||||
public Queue getQueue(ulong id)
|
||||
{
|
||||
/* The found queue */
|
||||
Queue queue = getQueue_nothrow(id);
|
||||
|
||||
/* If no queue is found then throw an error */
|
||||
if(queue is null)
|
||||
{
|
||||
throw new TristanableException(ErrorType.QUEUE_NOT_FOUND);
|
||||
}
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the queue mathcing the provided id
|
||||
*
|
||||
* This is the nothrow version
|
||||
*
|
||||
* Params:
|
||||
* id = the id to lookup by
|
||||
* Returns: the Queue if found, null otherwise
|
||||
*/
|
||||
public Queue getQueue_nothrow(ulong id)
|
||||
{
|
||||
/* The found queue */
|
||||
Queue queue;
|
||||
|
||||
/* Lock the queue of queues */
|
||||
queuesLock.lock();
|
||||
|
||||
/* On return or error */
|
||||
scope(exit)
|
||||
{
|
||||
/* Unlock the queue of queues */
|
||||
queuesLock.unlock();
|
||||
}
|
||||
|
||||
/* Search for the queue */
|
||||
foreach(Queue curQueue; queues)
|
||||
{
|
||||
if(curQueue.getID() == id)
|
||||
{
|
||||
queue = curQueue;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a new queue thatis unique in its tag
|
||||
* (unused/not regustered yet), register it
|
||||
* and then return it
|
||||
*
|
||||
* Returns: the newly registered Queue
|
||||
*/
|
||||
public Queue getUniqueQueue()
|
||||
{
|
||||
/* The newly created queue */
|
||||
Queue uniqueQueue;
|
||||
|
||||
/* Lock the queue of queues */
|
||||
queuesLock.lock();
|
||||
|
||||
/* On return or error */
|
||||
scope(exit)
|
||||
{
|
||||
/* Unlock the queue of queues */
|
||||
queuesLock.unlock();
|
||||
}
|
||||
|
||||
// TODO: Throw exception if all tags used
|
||||
/* The unused tag */
|
||||
ulong unusedTag = 0;
|
||||
|
||||
/* Try the current tag and ensure no queue uses it */
|
||||
tagLoop: for(ulong curPotentialTag = 0; true; curPotentialTag++)
|
||||
{
|
||||
foreach(Queue curQueue; queues)
|
||||
{
|
||||
if(curQueue.getID() == curPotentialTag)
|
||||
{
|
||||
continue tagLoop;
|
||||
}
|
||||
}
|
||||
|
||||
/* Then we have found a unique tag */
|
||||
unusedTag = curPotentialTag;
|
||||
break;
|
||||
}
|
||||
|
||||
/* Create the queue */
|
||||
uniqueQueue = new Queue(unusedTag);
|
||||
|
||||
/* Register it */
|
||||
registerQueue(uniqueQueue);
|
||||
|
||||
return uniqueQueue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers the given queue with the manager
|
||||
*
|
||||
* Params:
|
||||
* queue = the queue to register
|
||||
* Throws:
|
||||
* TristanableException if a queue with the provided id already exists
|
||||
*/
|
||||
public void registerQueue(Queue queue)
|
||||
{
|
||||
/* Try to register the queue */
|
||||
bool status = registerQueue_nothrow(queue);
|
||||
|
||||
/* If registration was not successful */
|
||||
if(!status)
|
||||
{
|
||||
throw new TristanableException(ErrorType.QUEUE_ALREADY_EXISTS);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers the given queue with the manager
|
||||
*
|
||||
* Params:
|
||||
* queue = the queue to register
|
||||
* Returns: true if registration was successful, false otherwise
|
||||
*/
|
||||
public bool registerQueue_nothrow(Queue queue)
|
||||
{
|
||||
/* Lock the queue of queues */
|
||||
queuesLock.lock();
|
||||
|
||||
/* On return or error */
|
||||
scope(exit)
|
||||
{
|
||||
/* Unlock the queue of queues */
|
||||
queuesLock.unlock();
|
||||
}
|
||||
|
||||
/* Search for the queue, throw an exception if it exists */
|
||||
foreach(Queue curQueue; queues)
|
||||
{
|
||||
if(curQueue.getID() == queue.getID())
|
||||
{
|
||||
/* Registration failed */
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/* Insert the queue as it does not exist */
|
||||
queues.insertAfter(queues[], queue);
|
||||
|
||||
/* Registration was a success */
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* De-registers the given queue from the manager
|
||||
*
|
||||
* Params:
|
||||
* queue = the queue to de-register
|
||||
* Throws:
|
||||
* TristanableException if a queue with the provided id cannot be found
|
||||
*/
|
||||
public void releaseQueue(Queue queue)
|
||||
{
|
||||
/* Try to de-register the queue */
|
||||
bool status = releaseQueue_nothrow(queue);
|
||||
|
||||
/* If de-registration was not successful */
|
||||
if(!status)
|
||||
{
|
||||
throw new TristanableException(ErrorType.QUEUE_NOT_FOUND);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* De-registers the given queue from the manager
|
||||
*
|
||||
* Params:
|
||||
* queue = the queue to de-register
|
||||
* Returns: true if de-registration was successful, false otherwise
|
||||
*/
|
||||
public bool releaseQueue_nothrow(Queue queue)
|
||||
{
|
||||
/* Lock the queue of queues */
|
||||
queuesLock.lock();
|
||||
|
||||
/* On return or error */
|
||||
scope(exit)
|
||||
{
|
||||
/* Unlock the queue of queues */
|
||||
queuesLock.unlock();
|
||||
}
|
||||
|
||||
/* Search for the queue, return false if it does NOT exist */
|
||||
foreach(Queue curQueue; queues)
|
||||
{
|
||||
if(curQueue.getID() == queue.getID())
|
||||
{
|
||||
/* Remove the queue */
|
||||
queues.linearRemoveElement(queue);
|
||||
|
||||
/* De-registration succeeded */
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/* De-registration failed */
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the default queue
|
||||
*
|
||||
* The default queue, when set/enabled, is the queue that will
|
||||
* be used to enqueue messages that have a tag which doesn't
|
||||
* match any of the normally registered queues.
|
||||
*
|
||||
* Please note that the ID of the queue passed in here does not
|
||||
* mean anything in this context; only the queuing facilities
|
||||
* of the Queue object are used
|
||||
*
|
||||
* Params:
|
||||
* queue = the default queue to use
|
||||
*/
|
||||
public void setDefaultQueue(Queue queue)
|
||||
{
|
||||
this.defaultQueue = queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the default queue
|
||||
*
|
||||
* Returns: the default queue
|
||||
* Throws:
|
||||
* TristanableException if there is no default queue
|
||||
*/
|
||||
public Queue getDefaultQueue()
|
||||
{
|
||||
/* The potential default queue */
|
||||
Queue potentialDefaultQueue = getDefaultQueue_nothrow();
|
||||
|
||||
if(potentialDefaultQueue is null)
|
||||
{
|
||||
throw new TristanableException(ErrorType.NO_DEFAULT_QUEUE);
|
||||
}
|
||||
|
||||
return potentialDefaultQueue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the default queue
|
||||
*
|
||||
* This is the nothrow version
|
||||
*
|
||||
* Returns: the default queue if found, null otherwise
|
||||
*/
|
||||
public Queue getDefaultQueue_nothrow()
|
||||
{
|
||||
return defaultQueue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends the provided message over the socket
|
||||
*
|
||||
* Params:
|
||||
* message = the TaggedMessage to send
|
||||
*/
|
||||
public void sendMessage(TaggedMessage message)
|
||||
{
|
||||
/**
|
||||
* If a queue with the tag of the message does
|
||||
* not exist, then register it if the config
|
||||
* option was enabled
|
||||
*/
|
||||
if(config.registerOnSend)
|
||||
{
|
||||
/* Create a Queue with the tag */
|
||||
Queue createdQueue = new Queue(message.getTag());
|
||||
|
||||
/* Attempt to register the queue */
|
||||
registerQueue_nothrow(createdQueue);
|
||||
}
|
||||
|
||||
/* Encode the message */
|
||||
byte[] encodedMessage = message.encode();
|
||||
|
||||
/* Send it using bformat (encode-and-send) */
|
||||
bClient.sendMessage(encodedMessage);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
// TODO: Fix this, write it in a nicer way
|
||||
// ... or make a private constructor here that
|
||||
// ... does not take it in
|
||||
version(unittest)
|
||||
{
|
||||
Socket nullSock = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test retrieving a queue which does not
|
||||
* exist
|
||||
*/
|
||||
unittest
|
||||
{
|
||||
/* Create a manager */
|
||||
Manager manager = new Manager(nullSock);
|
||||
|
||||
/* Shouldn't be found */
|
||||
try
|
||||
{
|
||||
manager.getQueue(69);
|
||||
assert(false);
|
||||
}
|
||||
catch(TristanableException e)
|
||||
{
|
||||
assert(e.getError() == ErrorType.QUEUE_NOT_FOUND);
|
||||
}
|
||||
|
||||
/* Shouldn't be found */
|
||||
assert(manager.getQueue_nothrow(69) is null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test registering a queue and then fetching it
|
||||
*/
|
||||
unittest
|
||||
{
|
||||
/* Create a manager */
|
||||
Manager manager = new Manager(nullSock);
|
||||
|
||||
/* Create a new queue with tag 69 */
|
||||
Queue queue = new Queue(69);
|
||||
|
||||
try
|
||||
{
|
||||
/* Register the queue */
|
||||
manager.registerQueue(queue);
|
||||
|
||||
/* Fetch the queue */
|
||||
Queue fetchedQueue = manager.getQueue(69);
|
||||
|
||||
/* Ensure the queue we fetched is the one we stored (the references would be equal) */
|
||||
assert(fetchedQueue == queue);
|
||||
}
|
||||
catch(TristanableException e)
|
||||
{
|
||||
assert(false);
|
||||
}
|
||||
|
||||
/* Should be found */
|
||||
assert(manager.getQueue_nothrow(69) !is null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests registering a queue and then registering
|
||||
* another queue with the same id
|
||||
*/
|
||||
unittest
|
||||
{
|
||||
/* Create a manager */
|
||||
Manager manager = new Manager(nullSock);
|
||||
|
||||
/* Create a new queue with tag 69 */
|
||||
Queue queue = new Queue(69);
|
||||
|
||||
/* Register the queue */
|
||||
manager.registerQueue(queue);
|
||||
|
||||
try
|
||||
{
|
||||
/* Register the queue (try again) */
|
||||
manager.registerQueue(queue);
|
||||
|
||||
assert(false);
|
||||
}
|
||||
catch(TristanableException e)
|
||||
{
|
||||
assert(e.getError() == ErrorType.QUEUE_ALREADY_EXISTS);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests registering a queue, de-registering it and
|
||||
* then registering it again
|
||||
*/
|
||||
unittest
|
||||
{
|
||||
/* Create a manager */
|
||||
Manager manager = new Manager(nullSock);
|
||||
|
||||
/* Create a new queue with tag 69 */
|
||||
Queue queue = new Queue(69);
|
||||
|
||||
/* Register the queue */
|
||||
manager.registerQueue(queue);
|
||||
|
||||
/* Ensure it is registered */
|
||||
assert(queue == manager.getQueue(69));
|
||||
|
||||
/* De-register the queue */
|
||||
manager.releaseQueue(queue);
|
||||
|
||||
/* Ensure it is de-registered */
|
||||
assert(manager.getQueue_nothrow(69) is null);
|
||||
|
||||
/* Register the queue (again) */
|
||||
manager.registerQueue(queue);
|
||||
|
||||
/* Ensure it is registered (again) */
|
||||
assert(queue == manager.getQueue(69));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests registering a queue using the "next available queue"
|
||||
* method
|
||||
*/
|
||||
unittest
|
||||
{
|
||||
/* Create a manager */
|
||||
Manager manager = new Manager(nullSock);
|
||||
|
||||
/* Get the next 3 available queues */
|
||||
Queue queue1 = manager.getUniqueQueue();
|
||||
Queue queue2 = manager.getUniqueQueue();
|
||||
Queue queue3 = manager.getUniqueQueue();
|
||||
|
||||
/* The queues should have tags [0, 1, 2] respectively */
|
||||
assert(queue1.getID() == 0);
|
||||
assert(queue2.getID() == 1);
|
||||
assert(queue3.getID() == 2);
|
||||
}
|
||||
|
||||
// TODO: Add testing for queue existence (internal method)
|
|
@ -0,0 +1,15 @@
|
|||
/**
|
||||
* Interface which manages a provided socket
|
||||
* and enqueuing and dequeuing of queues
|
||||
*/
|
||||
module tristanable.manager;
|
||||
|
||||
/**
|
||||
* The management facilities
|
||||
*/
|
||||
public import tristanable.manager.manager : Manager;
|
||||
|
||||
/**
|
||||
* Configuration for the manager
|
||||
*/
|
||||
public import tristanable.manager.config : Config, defaultConfig;
|
|
@ -0,0 +1,297 @@
|
|||
/**
|
||||
* Facilitates the reading of messages from the socket,
|
||||
* decoding thereof and final enqueuing thereof into their
|
||||
* respective queus
|
||||
*/
|
||||
module tristanable.manager.watcher;
|
||||
|
||||
import core.thread : Thread;
|
||||
import tristanable.manager.manager : Manager;
|
||||
import std.socket;
|
||||
import bformat;
|
||||
import tristanable.encoding;
|
||||
import tristanable.exceptions;
|
||||
import tristanable.queue.queue;
|
||||
import bformat.client;
|
||||
|
||||
/**
|
||||
* Watches the socket on a thread of its own,
|
||||
* performs the decoding of the incoming messages
|
||||
* and places them into the correct queues via
|
||||
* the associated Manager instance
|
||||
*/
|
||||
public class Watcher : Thread
|
||||
{
|
||||
/**
|
||||
* The associated manager to use
|
||||
* such that we can place new mail
|
||||
* into their respective inboxes (queues)
|
||||
*/
|
||||
private Manager manager;
|
||||
|
||||
/**
|
||||
* The BClient to read from
|
||||
*/
|
||||
private BClient bClient;
|
||||
|
||||
/**
|
||||
* Creates a new `Watcher` that is associated
|
||||
* with the provided `Manager` such that it can
|
||||
* add to its registered queues. The provided `Socket`
|
||||
* is such that it can be read from and managed.
|
||||
*
|
||||
* Params:
|
||||
* manager = the `Manager` to associate with
|
||||
* bclient = the underlying `BClient` to read data from
|
||||
*/
|
||||
package this(Manager manager, BClient bClient)
|
||||
{
|
||||
this.manager = manager;
|
||||
this.bClient = bClient;
|
||||
|
||||
super(&watch);
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the underlying thread
|
||||
*/
|
||||
package void startWatcher()
|
||||
{
|
||||
/* Start the watch method on a new thread */
|
||||
start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Watches the socket for incoming messages
|
||||
* and decodes them on the fly, placing
|
||||
* the final message in the respective queue
|
||||
*/
|
||||
private void watch()
|
||||
{
|
||||
import std.stdio;
|
||||
|
||||
while(true)
|
||||
{
|
||||
/* Do a bformat read-and-decode */
|
||||
byte[] wireTristan;
|
||||
version(unittest) { writeln("Before bformat recv()"); }
|
||||
bool recvStatus = bClient.receiveMessage(wireTristan); // TODO: Add a check for the status of read
|
||||
version(unittest) { writeln("After bformat recv()"); }
|
||||
version(unittest) { writeln("bformat recv() status: ", recvStatus); }
|
||||
|
||||
if(recvStatus)
|
||||
{
|
||||
/* Decode the received bytes into a tagged message */
|
||||
TaggedMessage decodedMessage = TaggedMessage.decode(wireTristan);
|
||||
version(unittest) { writeln("Watcher received: ", decodedMessage); }
|
||||
|
||||
/* Search for the queue with the id provided */
|
||||
ulong messageTag = decodedMessage.getTag();
|
||||
Queue potentialQueue = manager.getQueue_nothrow(messageTag);
|
||||
|
||||
/* If a queue can be found */
|
||||
if(potentialQueue !is null)
|
||||
{
|
||||
/* Enqueue the message */
|
||||
potentialQueue.enqueue(decodedMessage);
|
||||
}
|
||||
/* If the queue if not found */
|
||||
else
|
||||
{
|
||||
/**
|
||||
* Look for a default queue, and if one is found
|
||||
* then enqueue the message there. Otherwise, drop
|
||||
* it by simply doing nothing.
|
||||
*/
|
||||
try
|
||||
{
|
||||
potentialQueue = manager.getDefaultQueue();
|
||||
|
||||
/* Enqueue the message */
|
||||
potentialQueue.enqueue(decodedMessage);
|
||||
}
|
||||
catch(TristanableException e) {}
|
||||
}
|
||||
|
||||
version(unittest) { writeln("drip"); }
|
||||
}
|
||||
/**
|
||||
* If there was an error receiving on the socket.
|
||||
*
|
||||
* This can be either because we have shut the socket down
|
||||
* or the remote end has closed the connection.
|
||||
*
|
||||
* In any case, exit the loop therefore ending this thread.
|
||||
*/
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shuts down the watcher, unblocks the blocking read in the loop
|
||||
* resulting in the watcher thread ending
|
||||
*/
|
||||
package void shutdown()
|
||||
{
|
||||
/* Closes the bformat reader */
|
||||
bClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up a server which will send some tagged messages to us (the client),
|
||||
* where we have setup a `Manager` to watch the queues with tags `42` and `69`,
|
||||
* we then dequeue some messages from both queus. Finally, we shut down the manager.
|
||||
*/
|
||||
unittest
|
||||
{
|
||||
import std.socket;
|
||||
import std.stdio;
|
||||
import core.thread;
|
||||
|
||||
Address serverAddress = parseAddress("::1", 0);
|
||||
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
|
||||
server.bind(serverAddress);
|
||||
server.listen(0);
|
||||
|
||||
class ServerThread : Thread
|
||||
{
|
||||
this()
|
||||
{
|
||||
super(&worker);
|
||||
}
|
||||
|
||||
private void worker()
|
||||
{
|
||||
Socket clientSocket = server.accept();
|
||||
BClient bClient = new BClient(clientSocket);
|
||||
|
||||
Thread.sleep(dur!("seconds")(7));
|
||||
writeln("Server start");
|
||||
|
||||
/**
|
||||
* Create a tagged message to send
|
||||
*
|
||||
* tag 42 payload Cucumber 😳️
|
||||
*/
|
||||
TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️");
|
||||
byte[] tEncoded = message.encode();
|
||||
writeln("server send status: ", bClient.sendMessage(tEncoded));
|
||||
|
||||
writeln("server send [done]");
|
||||
|
||||
/**
|
||||
* Create a tagged message to send
|
||||
*
|
||||
* tag 69 payload Hello
|
||||
*/
|
||||
message = new TaggedMessage(69, cast(byte[])"Hello");
|
||||
tEncoded = message.encode();
|
||||
writeln("server send status: ", bClient.sendMessage(tEncoded));
|
||||
|
||||
writeln("server send [done]");
|
||||
|
||||
/**
|
||||
* Create a tagged message to send
|
||||
*
|
||||
* tag 69 payload Bye
|
||||
*/
|
||||
message = new TaggedMessage(69, cast(byte[])"Bye");
|
||||
tEncoded = message.encode();
|
||||
writeln("server send status: ", bClient.sendMessage(tEncoded));
|
||||
|
||||
writeln("server send [done]");
|
||||
|
||||
/**
|
||||
* Create a tagged message to send
|
||||
*
|
||||
* tag 100 payload Bye
|
||||
*/
|
||||
message = new TaggedMessage(100, cast(byte[])"DEFQUEUE_1");
|
||||
tEncoded = message.encode();
|
||||
writeln("server send status: ", bClient.sendMessage(tEncoded));
|
||||
|
||||
writeln("server send [done]");
|
||||
|
||||
/**
|
||||
* Create a tagged message to send
|
||||
*
|
||||
* tag 200 payload Bye
|
||||
*/
|
||||
message = new TaggedMessage(200, cast(byte[])"DEFQUEUE_2");
|
||||
tEncoded = message.encode();
|
||||
writeln("server send status: ", bClient.sendMessage(tEncoded));
|
||||
|
||||
writeln("server send [done]");
|
||||
}
|
||||
}
|
||||
|
||||
ServerThread serverThread = new ServerThread();
|
||||
serverThread.start();
|
||||
|
||||
Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
|
||||
|
||||
writeln(server.localAddress);
|
||||
|
||||
|
||||
Manager manager = new Manager(client);
|
||||
|
||||
Queue sixtyNine = new Queue(69);
|
||||
Queue fortyTwo = new Queue(42);
|
||||
|
||||
manager.registerQueue(sixtyNine);
|
||||
manager.registerQueue(fortyTwo);
|
||||
|
||||
// Register a default queue (tag ignored)
|
||||
Queue defaultQueue = new Queue(2332);
|
||||
manager.setDefaultQueue(defaultQueue);
|
||||
|
||||
|
||||
/* Connect our socket to the server */
|
||||
client.connect(server.localAddress);
|
||||
|
||||
/* Start the manager and let it manage the socket */
|
||||
manager.start();
|
||||
|
||||
/* Block on the unittest thread for a received message */
|
||||
writeln("unittest thread: Dequeue() blocking...");
|
||||
TaggedMessage dequeuedMessage = sixtyNine.dequeue();
|
||||
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
|
||||
assert(dequeuedMessage.getTag() == 69);
|
||||
assert(dequeuedMessage.getPayload() == cast(byte[])"Hello");
|
||||
|
||||
/* Block on the unittest thread for a received message */
|
||||
writeln("unittest thread: Dequeue() blocking...");
|
||||
dequeuedMessage = sixtyNine.dequeue();
|
||||
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
|
||||
assert(dequeuedMessage.getTag() == 69);
|
||||
assert(dequeuedMessage.getPayload() == cast(byte[])"Bye");
|
||||
|
||||
/* Block on the unittest thread for a received message */
|
||||
writeln("unittest thread: Dequeue() blocking...");
|
||||
dequeuedMessage = fortyTwo.dequeue();
|
||||
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
|
||||
assert(dequeuedMessage.getTag() == 42);
|
||||
assert(dequeuedMessage.getPayload() == cast(byte[])"Cucumber 😳️");
|
||||
|
||||
|
||||
/* Dequeue two messages from the default queue */
|
||||
writeln("unittest thread: Dequeue() blocking...");
|
||||
dequeuedMessage = defaultQueue.dequeue();
|
||||
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
|
||||
assert(dequeuedMessage.getTag() == 100);
|
||||
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_1");
|
||||
|
||||
writeln("unittest thread: Dequeue() blocking...");
|
||||
dequeuedMessage = defaultQueue.dequeue();
|
||||
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
|
||||
assert(dequeuedMessage.getTag() == 200);
|
||||
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_2");
|
||||
|
||||
|
||||
/* Stop the manager */
|
||||
manager.stop();
|
||||
}
|
|
@ -1,39 +0,0 @@
|
|||
/**
|
||||
* NotificationReply
|
||||
*
|
||||
* When a tag is reserved and a message is received
|
||||
* with such a tag then one of these is generated
|
||||
* and added to the queue of notification replies.
|
||||
*
|
||||
* Multiple of these will be made and enqueued even
|
||||
* if they have the same tag (duplicates allowed).
|
||||
*
|
||||
* This facilitates a notification system if one
|
||||
* wants to use tristanable for that purpose (this
|
||||
* is because notifications _just happen_ and have
|
||||
* no prior request)
|
||||
*/
|
||||
|
||||
module tristanable.notifications;
|
||||
|
||||
public class NotificationReply
|
||||
{
|
||||
private ulong tag;
|
||||
private byte[] data;
|
||||
|
||||
this(ulong tag, byte[] data)
|
||||
{
|
||||
this.tag = tag;
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
public byte[] getData()
|
||||
{
|
||||
return data;
|
||||
}
|
||||
|
||||
public ulong getTag()
|
||||
{
|
||||
return tag;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
/**
|
||||
* Tristanable network message queuing framework
|
||||
*/
|
||||
module tristanable;
|
||||
|
||||
/**
|
||||
* Interface which manages a provided socket
|
||||
* and enqueuing and dequeuing of queues
|
||||
*/
|
||||
public import tristanable.manager;
|
||||
|
||||
/**
|
||||
* A queue of queue items all of the same tag
|
||||
*/
|
||||
public import tristanable.queue.queue : Queue;
|
||||
|
||||
/**
|
||||
* Error handling type definitions
|
||||
*/
|
||||
public import tristanable.exceptions : TristanableException, ErrorType;
|
||||
|
||||
/**
|
||||
* Encoding/decoding of the tristanable format
|
||||
*/
|
||||
public import tristanable.encoding : TaggedMessage;
|
|
@ -0,0 +1,240 @@
|
|||
/**
|
||||
* A queue of queue items all of the same tag
|
||||
*/
|
||||
module tristanable.queue.queue;
|
||||
|
||||
import core.sync.mutex : Mutex;
|
||||
import core.sync.condition : Condition;
|
||||
import core.sync.exception : SyncError;
|
||||
import std.container.slist : SList;
|
||||
import tristanable.encoding;
|
||||
import core.time : Duration, dur;
|
||||
import tristanable.exceptions;
|
||||
|
||||
version(unittest)
|
||||
{
|
||||
import std.stdio;
|
||||
import std.conv : to;
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a queue whereby messages of a certain tag/id
|
||||
* can be enqueued to (by the `Watcher`) and dequeued from
|
||||
* (by the user application)
|
||||
*/
|
||||
public class Queue
|
||||
{
|
||||
/**
|
||||
* This queue's unique ID
|
||||
*/
|
||||
private ulong queueID;
|
||||
|
||||
/**
|
||||
* The libsnooze event used to sleep/wake
|
||||
* on queue events
|
||||
* Mutex for the condition variable
|
||||
*/
|
||||
private Mutex mutex;
|
||||
|
||||
/**
|
||||
* The condition variable used to sleep/wake
|
||||
* on queue of events
|
||||
*/
|
||||
private Condition signal;
|
||||
|
||||
/**
|
||||
* The queue of messages
|
||||
*/
|
||||
private SList!(TaggedMessage) queue;
|
||||
|
||||
/**
|
||||
* The lock for the message queue
|
||||
*/
|
||||
private Mutex queueLock;
|
||||
|
||||
/**
|
||||
* If a message is enqueued prior
|
||||
* to us sleeping then we won't
|
||||
* wake up and return for it.
|
||||
*
|
||||
* Therefore a periodic wakeup
|
||||
* is required.
|
||||
*/
|
||||
private Duration wakeInterval;
|
||||
|
||||
/**
|
||||
* Constructs a new Queue and immediately sets up the notification
|
||||
* sub-system for the calling thread (the thread constructing this
|
||||
* object) which ensures that a call to dequeue will immediately
|
||||
* unblock on the first message received under this tag
|
||||
*
|
||||
* Params:
|
||||
* queueID = the id to use for this queue
|
||||
*/
|
||||
this(ulong queueID)
|
||||
{
|
||||
/* Initialize the queue lock */
|
||||
this.queueLock = new Mutex();
|
||||
|
||||
/* Initialize the condition variable */
|
||||
this.mutex = new Mutex();
|
||||
this.signal = new Condition(this.mutex);
|
||||
|
||||
/* Set the queue id */
|
||||
this.queueID = queueID;
|
||||
|
||||
/* Set the slumber interval */
|
||||
this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current wake interval
|
||||
* for the queue checker
|
||||
*
|
||||
* Returns: the `Duration`
|
||||
*/
|
||||
public Duration getWakeInterval()
|
||||
{
|
||||
return this.wakeInterval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the wake up interval
|
||||
*
|
||||
* Params:
|
||||
* interval = the new interval
|
||||
*/
|
||||
public void setWakeInterval(Duration interval)
|
||||
{
|
||||
this.wakeInterval = interval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enqueues the provided tagged message onto this queue
|
||||
* and then wakes up any thread that has called dequeue
|
||||
* on this queue as well
|
||||
*
|
||||
* On error enqueueing a `TristanableException` will be
|
||||
* thrown.
|
||||
*
|
||||
* Params:
|
||||
* message = the TaggedMessage to enqueue
|
||||
*/
|
||||
public void enqueue(TaggedMessage message)
|
||||
{
|
||||
version(unittest)
|
||||
{
|
||||
writeln("queue["~to!(string)(queueID)~"]: Enqueuing '"~to!(string)(message)~"'...");
|
||||
}
|
||||
|
||||
scope(exit)
|
||||
{
|
||||
version(unittest)
|
||||
{
|
||||
writeln("queue["~to!(string)(queueID)~"]: Enqueued '"~to!(string)(message)~"'!");
|
||||
}
|
||||
|
||||
/* Unlock the item queue */
|
||||
queueLock.unlock();
|
||||
}
|
||||
|
||||
/* Lock the item queue */
|
||||
queueLock.lock();
|
||||
|
||||
/* Add the item to the queue */
|
||||
queue.insertAfter(queue[], message);
|
||||
|
||||
/* Wake up anyone wanting to dequeue from us */
|
||||
try
|
||||
{
|
||||
// TODO: Make us wait on the event (optional with a time-out)
|
||||
signal.notifyAll();
|
||||
}
|
||||
catch(SyncError snozErr)
|
||||
{
|
||||
// Throw an exception on a fatal exception
|
||||
throw new TristanableException(ErrorType.ENQUEUE_FAILED);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Make a version of this which can time out
|
||||
|
||||
/**
|
||||
* Blocks till a message can be dequeued from this queue
|
||||
*
|
||||
* On error dequeueing a `TristanableException` will be
|
||||
* thrown.
|
||||
*
|
||||
* Returns: the dequeued TaggedMessage
|
||||
*/
|
||||
public TaggedMessage dequeue()
|
||||
{
|
||||
version(unittest)
|
||||
{
|
||||
writeln("queue["~to!(string)(queueID)~"]: Dequeueing...");
|
||||
}
|
||||
|
||||
/* The dequeued message */
|
||||
TaggedMessage dequeuedMessage;
|
||||
|
||||
scope(exit)
|
||||
{
|
||||
version(unittest)
|
||||
{
|
||||
writeln("queue["~to!(string)(queueID)~"]: Dequeued '"~to!(string)(dequeuedMessage)~"'!");
|
||||
}
|
||||
}
|
||||
|
||||
/* Block till we dequeue a message successfully */
|
||||
while(dequeuedMessage is null)
|
||||
{
|
||||
scope(exit)
|
||||
{
|
||||
// Unlock the mutex
|
||||
this.mutex.unlock();
|
||||
}
|
||||
|
||||
// Lock the mutex
|
||||
this.mutex.lock();
|
||||
|
||||
try
|
||||
{
|
||||
this.signal.wait(this.wakeInterval);
|
||||
}
|
||||
catch(SyncError e)
|
||||
{
|
||||
// Throw an exception on a fatal exception
|
||||
throw new TristanableException(ErrorType.DEQUEUE_FAILED);
|
||||
}
|
||||
|
||||
|
||||
/* Lock the item queue */
|
||||
queueLock.lock();
|
||||
|
||||
/* Consume the front of the queue (if non-empty) */
|
||||
if(!queue.empty())
|
||||
{
|
||||
/* Pop the front item off */
|
||||
dequeuedMessage = queue.front();
|
||||
|
||||
/* Remove the front item from the queue */
|
||||
queue.linearRemoveElement(dequeuedMessage);
|
||||
}
|
||||
|
||||
/* Unlock the item queue */
|
||||
queueLock.unlock();
|
||||
}
|
||||
|
||||
return dequeuedMessage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the id/tag of this queue
|
||||
*
|
||||
* Returns: the queue's id
|
||||
*/
|
||||
public ulong getID()
|
||||
{
|
||||
return queueID;
|
||||
}
|
||||
}
|
|
@ -1,77 +0,0 @@
|
|||
module tristanable.request;
|
||||
|
||||
import std.conv : to;
|
||||
|
||||
/**
|
||||
* Request
|
||||
*
|
||||
* This type represents a placeholder for an
|
||||
* expected response caused by the sending of
|
||||
* an original message with a matching tag.
|
||||
*/
|
||||
public final class Request
|
||||
{
|
||||
/**
|
||||
* The data received
|
||||
*/
|
||||
public byte[] dataReceived;
|
||||
|
||||
/**
|
||||
* Whether or not this request has been
|
||||
* fulfilled or not.
|
||||
*/
|
||||
private bool fulfilled;
|
||||
|
||||
/**
|
||||
* Whether the request has been depleted
|
||||
*/
|
||||
public bool isDead;
|
||||
|
||||
/**
|
||||
* The tag for this request
|
||||
*/
|
||||
public ulong tag;
|
||||
|
||||
/**
|
||||
* Make a new Request with the provided tag
|
||||
* `tag`.
|
||||
*/
|
||||
this(ulong tag)
|
||||
{
|
||||
this.tag = tag;
|
||||
}
|
||||
|
||||
public void fulfill(byte[] data)
|
||||
{
|
||||
dataReceived = data;
|
||||
fulfilled = true;
|
||||
}
|
||||
|
||||
public bool isFulfilled()
|
||||
{
|
||||
return fulfilled;
|
||||
}
|
||||
|
||||
public byte[] pullData()
|
||||
{
|
||||
isDead = true;
|
||||
return dataReceived;
|
||||
}
|
||||
|
||||
override public string toString()
|
||||
{
|
||||
/* the toString string */
|
||||
string toStringString;
|
||||
|
||||
/* Add the Request tag info */
|
||||
toStringString ~= "Request (Tag: " ~ to!(string)(tag);
|
||||
|
||||
/* Add the Request arrival status */
|
||||
toStringString ~= ", Arrived: " ~ to!(string)(fulfilled);
|
||||
|
||||
/* Add the IsDead tag info */
|
||||
toStringString ~= ", Used: " ~ to!(string)(isDead) ~ ")";
|
||||
|
||||
return toStringString;
|
||||
}
|
||||
}
|
|
@ -1,110 +0,0 @@
|
|||
module tristanable.watcher;
|
||||
|
||||
import tristanable.manager : Manager;
|
||||
import tristanable.request : Request;
|
||||
import tristanable.notifications : NotificationReply;
|
||||
import std.socket : Socket;
|
||||
import core.thread : Thread;
|
||||
import bmessage : receiveMessage;
|
||||
|
||||
/* TODO: Watcher class to watch for stuff, and add to manager's queues */
|
||||
/* TODO: maneger class to use commands on, enqueue and wait for dequeue */
|
||||
public final class Watcher : Thread
|
||||
{
|
||||
/**
|
||||
* The associated Manager
|
||||
*
|
||||
* Used to access the queues.
|
||||
*/
|
||||
private Manager manager;
|
||||
|
||||
/**
|
||||
* The endpoint host we are connected to
|
||||
*/
|
||||
private Socket endpoint;
|
||||
|
||||
/**
|
||||
* Whether or not the watcher is active
|
||||
*/
|
||||
private bool isActive;
|
||||
|
||||
this(Manager manager, Socket endpoint)
|
||||
{
|
||||
super(&watchLoop);
|
||||
this.manager = manager;
|
||||
this.endpoint = endpoint;
|
||||
isActive = true;
|
||||
}
|
||||
|
||||
public void stopWatcher()
|
||||
{
|
||||
isActive = false;
|
||||
}
|
||||
|
||||
private void watchLoop()
|
||||
{
|
||||
while(isActive)
|
||||
{
|
||||
/* The received message (tag+data) */
|
||||
byte[] receivedPayload;
|
||||
|
||||
/* The message's tag */
|
||||
ulong receivedTag;
|
||||
|
||||
/* The message's data */
|
||||
byte[] receivedMessage;
|
||||
|
||||
|
||||
/* Receive a message */
|
||||
bool recvStatus = receiveMessage(endpoint, receivedPayload);
|
||||
|
||||
/* TODO: Status check */
|
||||
|
||||
/* Fetch the `tag` */
|
||||
receivedTag = *(cast(ulong*)receivedPayload.ptr);
|
||||
|
||||
/* Fetch the `data` */
|
||||
receivedMessage = receivedPayload[8..receivedPayload.length];
|
||||
|
||||
/* Lock the queue for reading */
|
||||
manager.lockQueue();
|
||||
|
||||
/* Get the queue */
|
||||
Request[] currentQueue = manager.getQueue();
|
||||
|
||||
/* Check to see if this is a tag we are awaiting */
|
||||
bool foundTag = manager.isValidTag(receivedTag);
|
||||
ulong requestPosition = manager.getTagPosition(receivedTag);
|
||||
|
||||
|
||||
/**
|
||||
* Check if the tag was found
|
||||
*
|
||||
* This only accounts for tags requested
|
||||
*/
|
||||
if(foundTag)
|
||||
{
|
||||
/* Fulfill the request */
|
||||
currentQueue[requestPosition].fulfill(receivedMessage);
|
||||
}
|
||||
/**
|
||||
* Check if the tag was reservd
|
||||
*/
|
||||
else if(manager.isReservedTag(receivedTag))
|
||||
{
|
||||
/* Create the NotificationReply */
|
||||
NotificationReply notifyReply = new NotificationReply(receivedTag, receivedMessage);
|
||||
|
||||
/* Add the notification */
|
||||
manager.addNotification(notifyReply);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* TODO: */
|
||||
}
|
||||
|
||||
/* Unlock the queue */
|
||||
manager.unlockQueue();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue