Compare commits

...

14 Commits

Author SHA1 Message Date
Tristan B. Velloza Kildaire 4bee62e4e2 README
- Updated example
2023-10-02 15:56:59 +02:00
Tristan B. Velloza Kildaire b24c3368b3
Nextgen: Tristanable v2 (#4)
* - Added new logo to `README.md`
- Fixed typos in `README.md`
- Added new logo (source included)

* - Added `bformat` version `3.1.13` as dependency

* - Removed executable

* - Updated `.gitignore`

* - Use `https` link rather to `bformat` homepage

* Package (tristanable)

- Added public imports along with comments per each

Encoding

- Added a stub class, `TaggedMessage`, for encoding and decoding the tristanable byte payload

Exceptions

- Added `TristanableException` exception type along with the `Error` enum sub-type

Manager

- Added stub code for `Manager` to manage the queues and socket

Queue

- Added stub class representing a queue with a tag (`Queue`)

QueueItem

- Added stub class `QueueItem` which represents an item that is enqueued/dequeued onto a `Queue`

Watcher

- Added stub class `Watcher` which will manage the socket reading-wise

* Manager

- Added field `watcher` of type `Watcher`

* Watcher

- Added constructor which takes in an instance of `Manager` and an instance of `Socket`

* Manager

- Added unit test TODO

* - Moved `Watcher` and `Manager` modules to their own package
- Ensured `Watcher`'s constructor is package-level accessible only

Manager

- The constructor now creates an instance of `Watcher`
- Added a `start()` method which calls `watcher.start()`

* Manager

- Added stub `sendMessage(TaggedMessage)` which will encode into the tristanable format, then wrap into bformat and send over the socket
- Added import for `TaggedMessage` from `tristanable.encoding` module

* Package (tristanable)

- Added an import for `TaggedMessage` from module `tristanable.encoding`

* Encoding

- Added stub class `TaggedMessage`
- Added constructor, static decoder (unimplemented), `encoder (implemented), getters and setters
- Added module `tristanable.encoding`

* - Attempt merge

* Encoding

- Added parameter-less (default) constructor marked as `private` to `TaggedMessage`
- Added decoding support in `decode(byte[])` which will return a new instance of `TaggedMessage`
- Added a unit test to test encoding and decoding

* TaggedMessage

- Added documentation for fields `tag` and `data`
- Added documentation for both constructors
- Added documentation for `getPayload()`, `getTag()`, `setPayload(byte[])` and `setTag(ulong)`

* - Fixed formatting in `README.md`

* Watcher

- Added import for `bformat` and `encoding` module
- Documented `watch()`
- Added `bformat` read-and-decode `receiveMessage(Socket, ref byte[])` call followed by a `TaggedMessage.decode(byte[])` call

* Watcher

- Moved TODO below already completed code

Exceptions

- Renamed `Error` to `ErrorType`
- Constructing a new `TristanableException` will now store the passed in `ErrorType`
- Added `getError()` to `TristanableException` which returns the stored `ErrorType`
- Added two new memebrs to enum `ErrorType`, namely `QUEUE_NOT_FOUND` and `QUEUE_ALREADY_EXISTS`

* Manager

- Changed from using D's dynamic arrays for the array of `Queue` objects to using an `SList!(T)` where `T` is the `Queue` type
- Implemented `getQueue(ulong)` which returns the `Queue` object with the matching id/tag, else throws an instance of `TristanabaleException`
- Implemented `registerQueue(Queue)` which will attempt to add the provided `Queue` given that a queue does not already exist with the provided queue's id; if that is the case then an instance of `TristanableException` is thrown

Queue

- Made the constructor take in the `ulong` queue ID
- Made the constructor publically accessible
- Implemented `getID()` which returns the `Queue`'s id as a `ulong`
- Removed the static method `newQueue(ulong)`

Unit test

- Added a unit test to test `getQueue(ulong)` when the queue cannot be found
- Added a unit test to test adding a queue and successfully retrieving it

* Package (tristanable)

- Fixed up the `exceptions` module import

* Manager

- Removed now-completed TODO in `registerQueue(QUeue)`

* Manager

- Documented `registerQueue(Queue)`

* Exceptions

- Added new member `NO_DEFAULT_QUEUE` to enum `ErrorType`

* Queue

- The actual queue is now an `SList!(TaggedMessage)`
- Added a stub method `enqueue(TaggedMessage)`
- Updated the stub method `dequeue()` which returns a `TaggedMessage` now

* - Removed `QueueItem`

* Encoding

- Implemented `toString()` in `TaggedMessage`

* Manager

- Added a default queue
- `getQueue(ulong)` now calls `getQueue_nothrow(ulong)` with the same id
- Implemented `getQueue_nothrow(ulong)` which returns the `Queue` if found, `null` otherwise
- Added `getDefaultQueue()` which gets the default queue by calling `getDefaultQueue_nothrow(ulong)` with the same id
- Added `getDefaultQueue_nothrow(ulong)` which returns the default queue as a `Queue` object if it exists, else `null`
- Added `setDefaultQueue(Queue)` which sets the provided queue as the default queue (i.e. the queue where messages tagged with a tag of a queue not registered will be dumped into - if the default queue is set)

Watcher

- Set the worker thread, `watch`, in the constructor
- Added a TODO relating to checking if the socket read succeeded or not
- Added a debug print for the received `TaggedMessage` post-decode
- Extract the tag of the message and find the matching queue (potentially, if it exists)
- If the queue exists then add the `TaggedMessage` to said `Queue`
- If the queue doesn't exist then, get the so-called "Default queue", if it doesn't exist don't do anything, if it does then enqueue the message (the `TaggedMessage`) to said `Queue`

Unit test

- Added a unit test (WIP) for testing the `Manager` and `Watcher` mechanism
- Updated unittest to test the `getQueue_nothrow(ulong)` method
- Added a unit test to test adding a `Queue` with a tag that already exists in a `Queue` registered prior

* Queue

- Added imports for `std.stdio` and `to` from `std.conv` to be imported when compiling in `unittest` mode
- Added documentation to `enqueue(TaggedMessage)`
- Implemented `enqueue(TaggedMessage)` using libsnooze
- Added documentation for `dequeue()`
- Implemented `dequeue()` using libsnooze

* Queue

- Replaced now-completed TODO with an actual comment in `dequeue()`

* Queue

- Added entrance and exit debugs for `dequeue()`

* Manager

- Clean up I guess

* Unit test (Watcher)

- Unit test for watcher works

* Watcher

- Deleted old module that was unused

* Package

- Removed completed TODO

* Exceptions

- Removed unused enum member `QueueExists` of enum `ErrorType`

* Encoding

- Added module-level documentation

Exceptions

- Added module-level documentation

Queue

- Added module-level documentation

Package (`tristanable.manager`)

- Added module-level documentation

* Manager

- Implemented `getUniqueQueue()` which finds an unused tag, makes a `Queue` with said tag, registers it and then returns it
- WIP: `shutdown()` method

* Manager

- Added unittest for `getUniqueQueue()`
- Typo fix

* Manager

- Removed empty unittest

* Config

- Added new module `config`
- Added new type `Config` which is used for configuring an instance of `Manager`
- Added `defaultConfig()` which returns the default `Config` instance used

* Config

- Make the default configuration generated more explicit in `defaultConfig`
- Remove initialization in `Config` (would be `false` in any case)

* Manager

- Added support for configuring the `Manager`, the constructor now uses the default configuration
- Implemented `registerQueue_nothrow(Queue)` which returns `true` on success, `false` otherwise
- `registerQueue(Queue)` now makes a sub-call to `registerQueue_nothrow(Queue)`
- Implemented `sendMessage(TaggedMessage)`
- Added comment for assertions in unittest

* Package (`manager`)

- Import the `Config` type and the `defaultConfig()` function

* Watcher

- Added stub `shutdown()` method that is intended to be called by `Manager` (package-level accessible)

Unit tests

- Sleep for 4 seconds instead of 2 before the server sends the two tagged messages
- Send a messae tagged with tag `42` before that of the one tagged with `69`
- Register a queue with tag `42`
- Remove `WaitingThread`, we now receive both tagged messages (`42` and `69`) on the unittest thread
- Added assertion to ensure the tagged message of `69` is indeed received correctly (tag AND payload)

* Queue

- Added documentation for the constructor `this(ulong)`
- Fixed issue #5

Unit tests

- The `==` operator on strings does some normalization stuff which results in differing byte sequences and therefore inequality (see the `"Cucumber 😳️"` case), therefore casting to `byte[]`
- Send another message tagged with `69`
- Fixed comment for server code sending tagged `42` message
- Call `manager.stop()` right at the end of the unit test

* Watcher

- Added package-level accessible `startWatcher()` method which calls `start()` for us
- Added some debugging prints which will now only be compiled-in during unittest builds
- If the bformat `receiveMessage(Socket, ref byte[])` method fails (returns `false`) then exit the loop, only continue decoding if it is `true`
- Implemented package-level accesible `shutdown()` method

Manager

- `start()` now calls `watcher.startWatcher()` instead of `watcher.start()`

* Unit test

- Sleep a little longer for profiling tests

* Watcher

- Documented unittest as it is a great example of how to sue tristanable

* Exceptions

- Documented `ErrorType` and all its members
- Documented `TristanableException`

* Exceptions

- Added missing documentation
- Fixed the message generation of the exception's message

* Manager

- Added documentation for `start()` and `stop()`

* Manager

- Removed now-completed TODO
- Added documentation for `queuesLock`

* Watcher

- Added documentation to the constructor for `Watcher`

* Watcher

- Documented module `tristanable.manager.watcher`

* Queue

- Docuemneted `getId()`
- Documented the `Queue` class
- Documented fields `event`, `queue` and `queueLock`

* Package

- Removed whitespace

* Watcher

- Documented method `shutdown()`

* - Upgraded to new `bformat` version `4.1.0` and migrated to using `BClient` (unit tests seem to pass)

* Manager

- Added a TODO for the future `removeQueue(Queue)` and `removeQueu_nothrow(Queue)`

* Manager

- Implemented `releaseQueue(Queue)`
- Implemented `releaseQueue_nothrow(Queue)`

Unit tests

- Added unit test for `releaseQueue(Queue)`

* Unit tests

- Added a TODO

* Dub

- Now requires a minimum version of `libsnooze` of at least `
1.0.0-beta`

* Queue

- Be specific, catch `FatalException` in `enqueue(TaggedMessage)`
- Be specific, catch `InterruptedException` and `FatalException` seperately

* Exceptions

- Added enum members `DEQUEUE_FAILED` and `ENQUEUE_FAILED` to `ErrorType` enum

* Queue

- `enqueue(TaggedMessage)` can now throw a `TristanableException` if a `FataException` with `libsnooze` occurrs
- `dequeue()` can now throw a `TristanableException` if a `FatalException` occurs during the call to `wait()` on `libsnooze`

* Dub

- Required minimum version of `bformat`, `4.1.0`

* Dub

- Upgraded `libsnooze` to version `1.3.0-beta`

* Migrate from libsnooze (#8)

* Dub

- Removed `libsnooze` dependency

* Queue

- Removed `libsnooze` imports

* Queue

- Added mutex+condition variable

* Queue

- Removed old `ensure()` call

* Queue

- Switched one thing over to mutex+condvar

* Queue

- Switched to using condition variable
- Added configurable slumber interval

* Queue

- Removed TODOs which are irrevevant for now

* Queue

- Removed `TListener` references

Everything else

- Removed reference to old/duplicate `queue.d` module

* Hotfix/niknaks (#9)

* Dub

- Added `niknaks` package with a minimum version of `v0.3.0`

* Encoding

- Switched to niknaks for `decode()`

* Encoding

- `encode()` now uses niknaks

* Watcher (unit tests)

- Added testing for default queue
2023-10-02 15:49:54 +02:00
Tristan B. Velloza Kildaire a23017a747 README
- Added badges
2023-10-01 19:14:16 +02:00
Tristan B. Velloza Kildaire 459f4a8709 Pipelines
- Added code coverage
2023-10-01 19:10:57 +02:00
Tristan B. Velloza Kildaire 79432cef6c
Enable CI for `nextgen` branch
- Run CI tests for any pull requests being made to the `nextgen` branch
2023-05-04 09:46:46 +02:00
Tristan B. Velloza Kildaire c43ed31ca9
Update README.md 2023-04-30 19:24:24 +02:00
Tristan B. Velloza Kildaire fb36497bda
Create d.yml 2023-04-30 19:23:22 +02:00
Tristan B. Velloza Kildaire 153512e9ab Dub
- Upgraded `libsnooze` to version `0.3.3`
2023-03-19 18:06:37 +02:00
Tristan B. Velloza Kildaire 01a5d779c8 Package
- Added a title to the documentation to better describe what this library does

Dub

- Updated package description
- Added project website link
2023-03-03 17:49:32 +02:00
Tristan B. Velloza Kildaire f8aa3b92cf Package
- Added imports that should be publically imported (exported into the user)
2023-03-03 17:47:23 +02:00
Tristan B. Velloza Kildaire 8828e3ffdd Manager
- Added an array of `Queue`(s) to the manager

Queue

- Added comments
- Initialize the libsnooze `Event` during construction of the `Queue` type

Dub

- Upgraded libsnooze from `0.2.7` to `0.2.9`

Repository

- Removed `dub.selections.json` file
- Added `dub.selections.json` to the `.gitignore`
2023-03-03 17:44:51 +02:00
Tristan B. Velloza Kildaire e7f93cd78a - Upgraded to working version of `libsnooze` (compilation-wise)
- Fixed imports and missing definitions in `queue` module
2023-02-26 22:24:49 +02:00
Tristan B. Velloza Kildaire cf431bdac9 - Updated author's name in package details 2023-02-26 21:55:38 +02:00
Tristan B. Velloza Kildaire 17da826d07 Restarting project 2023-02-26 21:55:13 +02:00
32 changed files with 1562 additions and 801 deletions

41
.github/workflows/d.yml vendored Normal file
View File

@ -0,0 +1,41 @@
# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.
name: D
on:
push:
branches: [ "**" ]
pull_request:
branches: [ "**" ]
permissions:
contents: read
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: dlang-community/setup-dlang@4c99aa991ce7d19dd3064de0a4f2f6b2f152e2d7
- name: Install Doveralls (code coverage tool)
run: |
dub fetch doveralls
sudo apt install libcurl4-openssl-dev
- name: 'Build & Test'
run: |
# Build the project, with its main file included, without unittests
dub build --compiler=$DC
# Build and run tests, as defined by `unittest` configuration
# In this mode, `mainSourceFile` is excluded and `version (unittest)` are included
# See https://dub.pm/package-format-json.html#configurations
dub test --compiler=$DC --coverage
- name: Coverage upload
run: |
dub run doveralls -- -t ${{secrets.COVERALLS_REPO_TOKEN}}

3
.gitignore vendored
View File

@ -22,3 +22,6 @@ docs/
# Code coverage
*.lst
source/tristanable/queue.d
dub.selections.json
tristanable-test-library

192
README.md
View File

@ -1,61 +1,175 @@
![](https://code.dlang.org/packages/tristanable/logo?s=5ef1c9f1250f57dd4c37efbf)
![](branding/logo_small.png)
tristanable
===========
**Tristanable** is a library for D-based libraries and applications that need a way to receive variable-length messages of different types (via a `Socket`) and place these messages into their own resepctively tagged queues indicated by their _"type"_ or `id`.
[![D](https://github.com/deavmi/tristanable/actions/workflows/d.yml/badge.svg)](https://github.com/deavmi/tristanable/actions/workflows/d.yml) ![DUB](https://img.shields.io/dub/v/tristanable?color=%23c10000ff%20&style=flat-square) ![DUB](https://img.shields.io/dub/dt/tristanable?style=flat-square) ![DUB](https://img.shields.io/dub/l/tristanable?style=flat-square) [![Coverage Status](https://coveralls.io/repos/github/deavmi/tristanable/badge.svg?branch=master)](https://coveralls.io/github/deavmi/tristanable?branch=master)
**Tristanable** is a library for D-based libraries and applications that need a way to receive variable-length messages of different types (via a `Socket`) and place these messages into their own respectively tagged queues indicated by their _"type"_ or `id`.
## What problems does it solve?
### Human example
Say now you made a request to a server with a tag `1` and expect a reply with that same tag `1`. Now, for a moment, think about what would happen in a tagless system. You would be expecting a reply, say now the weather report for your area, but what if the server has another thread that writes an instant messenger notification to the server's socket before the weather message is sent? Now you will inetrpret those bytes as if they were a weather message.
Say now you made a request to a server with a tag `1` and expect a reply with that same tag `1`. Now, for a moment, think about what would happen in a tagless system. You would be expecting a reply, say now the weather report for your area, but what if the server has another thread that writes an instant messenger notification to the server's socket before the weather message is sent? Now you will interpret those bytes as if they were a weather message.
Tristanable provides a way for you to receive the "IM notification first" but block and dequeue (when it arrives in the queue) for the "weather report". Irresepctoive of wether (no pun intended) the weather report arrives before the "IM notification" or after.
Tristanable provides a way for you to receive the "IM notification first" but block and dequeue (when it arrives in the queue) for the "weather report". Irrespective of wether (no pun intended) the weather report arrives before the "IM notification" or after.
### Code example
If we wanted to implement the following we would do the following. One note is that instead of waiting on messages of a specific _"type"_ (or rather **tag**), tristanable provides not just a one-message lengthb uffer per tag but infact a full queue per tag, meaning any received message with tag `1` will be enqueued and not dropped after the first message of type `1` is buffered.
Below is a fully-fledged example of the types of places (networking) where tristanable can be of help. The code is all explained in the comments:
```d
import tristanable.manager;
import tristanable.queue;
import tristanable.queueitem;
import std.socket;
import std.stdio;
import core.thread;
/* Create a manager to manage the socket for us */
Manager manager = new Manager(socket);
Address serverAddress = parseAddress("::1", 0);
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
server.bind(serverAddress);
server.listen(0);
/* Create a Queue for all "weather messages" */
Queue weatherQueue = new Queue(1);
class ServerThread : Thread
{
this()
{
super(&worker);
}
/* Create a Queue for all "IM notifications" */
Queue instantNotification = new Queue(2);
private void worker()
{
Socket clientSocket = server.accept();
BClient bClient = new BClient(clientSocket);
/* Tell the manager to look out for tagged messages `1` and `2` */
manager.addQueue(weatherQueue);
manager.addQueue(instantNotification);
Thread.sleep(dur!("seconds")(7));
writeln("Server start");
/* Now we can block on this queue and return with its head */
QueueItem message = weatherQueue.dequeue();
/**
* Create a tagged message to send
*
* tag 42 payload Cucumber 😳️
*/
TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️");
byte[] tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 69 payload Hello
*/
message = new TaggedMessage(69, cast(byte[])"Hello");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 69 payload Bye
*/
message = new TaggedMessage(69, cast(byte[])"Bye");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 100 payload Bye
*/
message = new TaggedMessage(100, cast(byte[])"DEFQUEUE_1");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 200 payload Bye
*/
message = new TaggedMessage(200, cast(byte[])"DEFQUEUE_2");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
}
}
ServerThread serverThread = new ServerThread();
serverThread.start();
Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
writeln(server.localAddress);
Manager manager = new Manager(client);
Queue sixtyNine = new Queue(69);
Queue fortyTwo = new Queue(42);
manager.registerQueue(sixtyNine);
manager.registerQueue(fortyTwo);
// Register a default queue (tag ignored)
Queue defaultQueue = new Queue(2332);
manager.setDefaultQueue(defaultQueue);
/* Connect our socket to the server */
client.connect(server.localAddress);
/* Start the manager and let it manage the socket */
manager.start();
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
TaggedMessage dequeuedMessage = sixtyNine.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 69);
assert(dequeuedMessage.getPayload() == cast(byte[])"Hello");
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = sixtyNine.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 69);
assert(dequeuedMessage.getPayload() == cast(byte[])"Bye");
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = fortyTwo.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 42);
assert(dequeuedMessage.getPayload() == cast(byte[])"Cucumber 😳️");
/* Dequeue two messages from the default queue */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = defaultQueue.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 100);
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_1");
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = defaultQueue.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 200);
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_2");
/* Stop the manager */
manager.stop();
```
Surely, there must be some sort of encoding mechanism too? The messages afterall need to be encoded. **No problem!**, we have that sorted:
```d
import tristanable.encoding;
/* Let's send it with tag 1 and data "Hello" */
ulong tag = 1;
byte[] data = cast(byte[])"Hello";
/* When sending a message */
DataMessage tristanEncoded = new DataMessage(tag, data);
/* Then send it */
socket.send(encodeForSend(tristanEncoded));
```
And let tristanable handle it! We even handle the message lengths and everything using another great project [bformat](http://deavmi.assigned.network/projects/bformat).
And let tristanable handle it! We even handle the message lengths and everything using another great project [bformat](https://deavmi.assigned.network/projects/bformat).
## Format
@ -64,9 +178,9 @@ And let tristanable handle it! We even handle the message lengths and everything
```
## Using tristanable in your D project
You can easily add the library (source-based) to your project by running the following command in your
project's root:
You can easily add the library (source-based) to your project by running the following command in your project's root:
```bash
dub add tristanable
```
```

BIN
branding/logo_base.xcf Normal file

Binary file not shown.

BIN
branding/logo_small.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.5 KiB

BIN
branding/logo_small.xcf Normal file

Binary file not shown.

View File

@ -1,14 +1,15 @@
{
"authors": [
"Tristan B. Kildaire"
"Tristan B. Velloza Kildaire"
],
"copyright": "Copyright © 2020, Tristan B. Kildaire",
"copyright": "Copyright © 2023, Tristan B. Kildaire",
"dependencies": {
"bformat": "~>3.1.3"
"bformat": ">=4.1.1",
"niknaks": ">=0.3.0"
},
"description": "Tag-based asynchronous messaging framework",
"description": "Tristanable network message queuing framework",
"homepage": "https://deavmi.assigned.network/projects/tristanable",
"license": "LGPL-3.0",
"name": "tristanable",
"targetType": "library",
"sourcePaths": ["source/tristanable"]
"targetType": "library"
}

View File

@ -1,6 +0,0 @@
{
"fileVersion": 1,
"versions": {
"bformat": "3.1.3"
}
}

View File

@ -1,15 +0,0 @@
.dub
docs.json
__dummy.html
docs/
/example
example.so
example.dylib
example.dll
example.a
example.lib
example-test-*
*.exe
*.o
*.obj
*.lst

View File

@ -1,12 +0,0 @@
{
"authors": [
"Tristan B. Kildaire"
],
"copyright": "Copyright © 2020, Tristan B. Kildaire",
"dependencies": {
"tristanable": "~>2.2.1"
},
"description": "A minimal D application.",
"license": "proprietary",
"name": "example"
}

View File

@ -1,7 +0,0 @@
{
"fileVersion": 1,
"versions": {
"bformat": "1.0.8",
"tristanable": "2.2.1"
}
}

View File

@ -1,59 +0,0 @@
import std.stdio;
import tristanable.manager : Manager;
import std.socket;
import core.thread;
void main()
{
writeln("Edit source/app.d to start your project.");
Socket socket = new Socket(AddressFamily.INET, SocketType.STREAM, ProtocolType.TCP);
socket.connect(parseAddress("127.0.0.1",7777));
Manager manager = new Manager(socket);
class bruh : Thread
{
this()
{
super(&run);
}
private void run()
{
while(true)
{
manager.lockQueue();
writeln(manager.getQueue());
manager.unlockQueue();
import core.thread;
Thread.sleep(dur!("seconds")(1));
}
}
}
new bruh().start();
manager.sendMessage(69, [77]);
manager.sendMessage(70, [78]);
byte[] receivedKaka = manager.receiveMessage(69);
writeln(receivedKaka);
receivedKaka = manager.receiveMessage(70);
writeln(receivedKaka);
manager.sendMessage(70, [78]);
receivedKaka = manager.receiveMessage(70);
writeln(receivedKaka);
}

View File

View File

@ -1,15 +0,0 @@
.dub
docs.json
__dummy.html
docs/
/server
server.so
server.dylib
server.dll
server.a
server.lib
server-test-*
*.exe
*.o
*.obj
*.lst

View File

@ -1,13 +0,0 @@
{
"authors": [
"Tristan B. Kildaire"
],
"copyright": "Copyright © 2020, Tristan B. Kildaire",
"dependencies": {
"bformat": "~>1.0.8",
"tristanable": "~>2.2.1"
},
"description": "A minimal D application.",
"license": "proprietary",
"name": "server"
}

View File

@ -1,7 +0,0 @@
{
"fileVersion": 1,
"versions": {
"bformat": "1.0.8",
"tristanable": "2.2.1"
}
}

View File

@ -1,37 +0,0 @@
import std.stdio;
import std.socket;
import tristanable.encoding : DataMessage;
import bmessage;
import core.thread;
void main()
{
writeln("Edit source/app.d to start your project.");
Socket socket = new Socket(AddressFamily.INET, SocketType.STREAM, ProtocolType.TCP);
socket.bind(parseAddress("127.0.0.1",7777));
socket.listen(1);
Socket conn = socket.accept();
byte[] receivedData;
while(true)
{
receiveMessage(conn, receivedData);
DataMessage message = DataMessage.decode(receivedData);
writeln("Tag: ", message.tag);
writeln("Data: ", message.data);
DataMessage d = new DataMessage(70, [2]);
sendMessage(conn, d.encode());
d = new DataMessage(69, [1]);
Thread.sleep(dur!("seconds")(5));
sendMessage(conn, d.encode());
}
}

View File

@ -1,67 +1,168 @@
/**
* Encoding/decoding of the tristanable format
*/
module tristanable.encoding;
public final class DataMessage
{
import std.conv : to;
import niknaks.bits : bytesToIntegral, Order, order, toBytes;
/**
* Represents a tagged message that has been decoded
* from its raw byte encoding, this is a tuple of
* a numeric tag and a byte array of payload data
*
* Also provides a static method to decode from such
* raw encoding and an instance method to do the reverse
*/
public final class TaggedMessage
{
/**
* This message's tag
*/
private ulong tag;
/**
* The payload
*/
private byte[] data;
public static DataMessage decode(byte[] bytes)
{
/* Fetch the `tag` */
ulong receivedTag = *(cast(ulong*)bytes.ptr);
/* Fetch the `data` */
byte[] receivedData = bytes[8..bytes.length];
return new DataMessage(receivedTag, receivedData);
}
/**
* Constructs a new DataMessage with
* the give `tag` and bytes `data`
*/
/**
* Constructs a new TaggedMessage with the given tag and payload
*
* Params:
* tag = the tag to use
* data = the payload
*/
this(ulong tag, byte[] data)
{
this.tag = tag;
this.data = data;
}
public byte[] encode()
/**
* Parameterless constructor used for decoder
*/
private this() {}
/**
* Decodes the wire-formatted tristanable bytes into an instance
* of TaggedMessage whereby the tag and data can be seperately
* accessed and manipulated
*
* Params:
* encodedMessage = the wire-format encoded bytes
* Returns: an instance of TaggedMessage
*/
public static TaggedMessage decode(byte[] encodedMessage)
{
/* Construct the message array */
byte[] messageData;
/* The decoded message */
TaggedMessage decodedMessage = new TaggedMessage();
/* Add the `tag` bytes */
messageData ~= *(cast(byte*)&tag);
messageData ~= *(cast(byte*)&tag+1);
messageData ~= *(cast(byte*)&tag+2);
messageData ~= *(cast(byte*)&tag+3);
messageData ~= *(cast(byte*)&tag+4);
messageData ~= *(cast(byte*)&tag+5);
messageData ~= *(cast(byte*)&tag+6);
messageData ~= *(cast(byte*)&tag+7);
/* The decoded tag */
ulong decodedTag;
/* Take ulong-many bytes and only flip them to LE if not on LE host */
decodedTag = order(bytesToIntegral!(ushort)(cast(ubyte[])encodedMessage), Order.LE);
/* Add the `data` bytes (the actual message) */
messageData ~= data;
return messageData;
/* Set the tag */
decodedMessage.setTag(decodedTag);
/* Set the data *(9-th byte onwards) */
decodedMessage.setPayload(encodedMessage[8..$]);
return decodedMessage;
}
public byte[] getData()
/**
* Encodes the tagged message into the tristanable
* wire format ready for transmission
*
* Returns: the encoded bytes
*/
public byte[] encode()
{
/* The encoded bytes */
byte[] encodedMessage;
/* If on little endian then no re-order, if host is BE flip (the tag) */
encodedMessage ~= toBytes(order(tag, Order.LE));
/* Tack on the data */
encodedMessage ~= data;
return encodedMessage;
}
/**
* Get the message's payload
*
* Returns: the payload
*/
public byte[] getPayload()
{
return data;
}
/**
* Get the message's tag
*
* Returns: the tag
*/
public ulong getTag()
{
return tag;
}
/**
* Set the message's payload
*
* Params:
* newPayload = the payload to use
*/
public void setPayload(byte[] newPayload)
{
this.data = newPayload;
}
/**
* Set the message's tag
*
* Params:
* newTag = the tag to use
*/
public void setTag(ulong newTag)
{
this.tag = newTag;
}
/**
* Returns a string representation of the TaggedMessage
*
* Returns: the string represenation
*/
public override string toString()
{
return "TMessage [Tag: "~to!(string)(tag)~", Payload: "~to!(string)(data)~"]";
}
}
public static byte[] encodeForSend(DataMessage message)
/**
* Test encoding and decoding
*/
unittest
{
import bmessage;
return encodeBformat(message.encode());
/* Setup testing data */
TaggedMessage testData = new TaggedMessage(420, [1,2,3]);
/* Encode */
byte[] encoded = testData.encode();
/* Decode */
TaggedMessage decoded = TaggedMessage.decode(encoded);
/* Now ensure that `decoded` == original `testData` */
assert(decoded.getTag() == testData.getTag);
assert(decoded.getPayload() == testData.getPayload());
}

View File

@ -1,29 +1,73 @@
/**
* Error handling type definitions
*/
module tristanable.exceptions;
import tristanable.manager;
import tristanable.queue : Queue;
import std.conv : to;
public final class TristanableException : Exception
/**
* The type of sub-error of the `TristanableException`
*/
public enum ErrorType
{
this(Manager manager, string message)
/**
* If the requested queue could not be found
*/
QUEUE_NOT_FOUND,
/**
* If the queue wanting to be registered has already
* been registered under the same tag
*/
QUEUE_ALREADY_EXISTS,
/**
* If no default queue is configured
*/
NO_DEFAULT_QUEUE,
/**
* The blocking call to `dequeue()`, somehow, failed
*/
DEQUEUE_FAILED,
/**
* The call to `enqueue()`, somehow, failed
*/
ENQUEUE_FAILED
}
/**
* Any sort of error that occurs during runtime of the tristanable
* engine
*/
public class TristanableException : Exception
{
/**
* The sub-error type
*/
private ErrorType err;
/**
* Constructs a new `TristanableException` with the provided
* sub-error type
*
* Params:
* err = the `ErrorType`
*/
this(ErrorType err)
{
super(generateMessage(message));
super(this.classinfo.name~": "~to!(string)(err));
this.err = err;
}
private string generateMessage(string errMesg)
/**
* Retrieve the sub-error type
*
* Returns: the sub-error type as a `ErrorType`
*/
public ErrorType getError()
{
string msg;
// msg = "TRistanable failure: "~errMesg~"\n\n";
// msg ~= "Queue stats:\n\n"
// Queue[] queues = manager.getQueues();
// foreach(Queue queue; queues)
// {
// msg ~= "Queue["~to!(string)(queue.getTag())~"]: "~
// }
// msg ~= manager.getQueues()
return msg;
return err;
}
}

View File

@ -1,217 +0,0 @@
module tristanable.manager;
import std.socket : Socket;
import core.sync.mutex : Mutex;
import bmessage : bSendMessage = sendMessage;
import tristanable.queue : Queue;
import tristanable.watcher;
import std.container.dlist;
import tristanable.exceptions;
/**
* Manager
*
* This is the core class that is to be instantiated
* that represents an instance of the tristanable
* framework. It is passed a Socket from which it
* reads from (using a bformat block reader).
*
* It contains a Watcher which does the reading and
* appending to respective queues (the user need not
* worry about this factum).
*
* The functions provided allow users to wait in a
* tight loop to dequeue ("receive" in a blcoking mannger)
* from a specified queue.
*/
public final class Manager
{
/* All queues */
private DList!(Queue) queues;
private Mutex queuesLock;
/* TODO Add drop queue? */
/**
* The remote host
*/
private Socket socket;
private Watcher watcher;
/**
* Constructs a new Manager with the given
* endpoint Socket
*
*/
this(Socket socket)
{
/* TODO: Make sure the socket is in STREAM mode */
/* Set the socket */
this.socket = socket;
/* Initialize the queues mutex */
queuesLock = new Mutex();
/* Initialize the watcher */
watcher = new Watcher(this, socket);
}
public Queue getQueue(ulong tag)
{
Queue matchingQueue;
queuesLock.lock();
foreach(Queue queue; queues)
{
if(queue.getTag() == tag)
{
matchingQueue = queue;
break;
}
}
queuesLock.unlock();
return matchingQueue;
}
/* TODO: Probably remove this or keep it */
public bool isValidTag(ulong tag)
{
return !(getQueue(tag) is null);
}
/**
* Returns a new queue with a new ID,
* if all IDs are used then it returns
* null
*
* Use this if you don't care about reserving
* queues IDs and just want a throwaway queue
*
* FIXME: All tags in use, this won't handle it
*/
public Queue generateQueue()
{
/* Newly generated queue */
Queue newQueue;
queuesLock.lock();
ulong curGuess = 0;
bool bad = true;
reguess: while(bad)
{
if(isValidTag(curGuess))
{
curGuess++;
continue reguess;
}
bad = false;
}
/* Create the new queue with the free id found */
newQueue = new Queue(curGuess);
/* Add the queue (recursive mutex) */
addQueue(newQueue);
queuesLock.unlock();
return newQueue;
}
public Queue[] getQueues()
{
Queue[] queues;
queuesLock.lock();
foreach(Queue queue; this.queues)
{
queues ~= queue;
}
queuesLock.unlock();
return queues;
}
/**
* Removes the given Queue, `queue`, from the manager
*
* Throws a TristanableException if the id of the
* queue wanting to be removed is not in use by any
* queue already added
*/
public void removeQueue(Queue queue)
{
queuesLock.lock();
/* Make sure such a tag exists */
if(isValidTag(queue.getTag()))
{
queues.linearRemoveElement(queue);
}
else
{
/* Unlock queue before throwing an exception */
queuesLock.unlock();
throw new TristanableException(this, "Cannot remove a queue with an id not in use");
}
queuesLock.unlock();
}
/**
* Adds the given Queue, `queue`, to the manager
*
* Throws a TristanableException if the id of the
* queue wanting to be added is already in use by
* another already added queue
*/
public void addQueue(Queue queue)
{
queuesLock.lock();
/* Make sure such a tag does not exist already */
if(!isValidTag(queue.getTag()))
{
queues ~= queue;
}
else
{
/* Unlock queue before throwing an exception */
queuesLock.unlock();
throw new TristanableException(this, "Cannot add queue with id already in use");
}
queuesLock.unlock();
}
public Socket getSocket()
{
return socket;
}
/**
* TODO: Comment
* TODO: Testing
*/
public void shutdown()
{
/* TODO: Implement me */
/* Make the loop stop whenever it does */
watcher.shutdown();
/* Wait for the thread to end */
watcher.join();
}
}

View File

@ -0,0 +1,34 @@
/**
* Configuration for the manager
*/
module tristanable.manager.config;
/**
* Manager parameters
*/
public struct Config
{
/**
* If set to true then when one uses
* `sendMessage(TaggedMessage)` the
* manager will check if a queue with
* said tag has not been registered
* and if so, then register it for
* us before encoding-and-sending
*/
public bool registerOnSend;
}
/**
* Generates the default configuration to use for
* the manager
*
* Returns: the Config
*/
public Config defaultConfig()
{
Config config;
config.registerOnSend = false;
return config;
}

View File

@ -0,0 +1,549 @@
/**
* Management of a tristanable instance
*/
module tristanable.manager.manager;
import std.socket;
import tristanable.queue.queue : Queue;
import core.sync.mutex : Mutex;
import tristanable.manager.watcher : Watcher;
import tristanable.encoding : TaggedMessage;
import tristanable.exceptions;
import std.container.slist : SList;
import tristanable.manager.config;
import river.core;
import river.impls.sock : SockStream;
import bformat.client;
/**
* Manages a provided socket by spawning
* a watcher thread to read from it and file
* mail into the corresponding queues.
*
* Queues are managed via this an instance
* of a manager.
*/
public class Manager
{
/**
* Configuration
*/
private Config config;
/**
* The bformat client to read and write from
*/
private BClient bClient;
/**
* Currently registered queues
*/
private SList!(Queue) queues;
/**
* Lock for currently registered queues
*/
private Mutex queuesLock;
/**
* Default queue
*/
private Queue defaultQueue;
/**
* Watcher which manages the socket and
* enqueues new messages into the respective
* quueue for us
*/
private Watcher watcher;
/**
* Constructs a new manager which will read from
* this socket and file mail for us
*
* Params:
* stream = the underlying stream to use
*/
this(Stream stream, Config config = defaultConfig())
{
this.bClient = new BClient(stream);
this.queuesLock = new Mutex();
this.config = config;
this.watcher = new Watcher(this, bClient);
}
// TODO: Comment this
// This is for backwards compatibility (whereby a `Socket` was taken in)
this(Socket socket, Config config = defaultConfig())
{
this(new SockStream(socket), config);
}
/**
* Starts the management of the socket,
* resulting in queues being updated upon
* reciving messages tagged for them
*/
public void start()
{
watcher.startWatcher();
}
/**
* Stops the management of the socket, resulting
* in ending the updating of queues and closing
* the underlying connection
*/
public void stop()
{
watcher.shutdown();
}
/**
* Retrieves the queue mathcing the provided id
*
* Params:
* id = the id to lookup by
* Returns: the Queue
* Throws: TristanableException if the queue is not found
*/
public Queue getQueue(ulong id)
{
/* The found queue */
Queue queue = getQueue_nothrow(id);
/* If no queue is found then throw an error */
if(queue is null)
{
throw new TristanableException(ErrorType.QUEUE_NOT_FOUND);
}
return queue;
}
/**
* Retrieves the queue mathcing the provided id
*
* This is the nothrow version
*
* Params:
* id = the id to lookup by
* Returns: the Queue if found, null otherwise
*/
public Queue getQueue_nothrow(ulong id)
{
/* The found queue */
Queue queue;
/* Lock the queue of queues */
queuesLock.lock();
/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}
/* Search for the queue */
foreach(Queue curQueue; queues)
{
if(curQueue.getID() == id)
{
queue = curQueue;
break;
}
}
return queue;
}
/**
* Get a new queue thatis unique in its tag
* (unused/not regustered yet), register it
* and then return it
*
* Returns: the newly registered Queue
*/
public Queue getUniqueQueue()
{
/* The newly created queue */
Queue uniqueQueue;
/* Lock the queue of queues */
queuesLock.lock();
/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}
// TODO: Throw exception if all tags used
/* The unused tag */
ulong unusedTag = 0;
/* Try the current tag and ensure no queue uses it */
tagLoop: for(ulong curPotentialTag = 0; true; curPotentialTag++)
{
foreach(Queue curQueue; queues)
{
if(curQueue.getID() == curPotentialTag)
{
continue tagLoop;
}
}
/* Then we have found a unique tag */
unusedTag = curPotentialTag;
break;
}
/* Create the queue */
uniqueQueue = new Queue(unusedTag);
/* Register it */
registerQueue(uniqueQueue);
return uniqueQueue;
}
/**
* Registers the given queue with the manager
*
* Params:
* queue = the queue to register
* Throws:
* TristanableException if a queue with the provided id already exists
*/
public void registerQueue(Queue queue)
{
/* Try to register the queue */
bool status = registerQueue_nothrow(queue);
/* If registration was not successful */
if(!status)
{
throw new TristanableException(ErrorType.QUEUE_ALREADY_EXISTS);
}
}
/**
* Registers the given queue with the manager
*
* Params:
* queue = the queue to register
* Returns: true if registration was successful, false otherwise
*/
public bool registerQueue_nothrow(Queue queue)
{
/* Lock the queue of queues */
queuesLock.lock();
/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}
/* Search for the queue, throw an exception if it exists */
foreach(Queue curQueue; queues)
{
if(curQueue.getID() == queue.getID())
{
/* Registration failed */
return false;
}
}
/* Insert the queue as it does not exist */
queues.insertAfter(queues[], queue);
/* Registration was a success */
return true;
}
/**
* De-registers the given queue from the manager
*
* Params:
* queue = the queue to de-register
* Throws:
* TristanableException if a queue with the provided id cannot be found
*/
public void releaseQueue(Queue queue)
{
/* Try to de-register the queue */
bool status = releaseQueue_nothrow(queue);
/* If de-registration was not successful */
if(!status)
{
throw new TristanableException(ErrorType.QUEUE_NOT_FOUND);
}
}
/**
* De-registers the given queue from the manager
*
* Params:
* queue = the queue to de-register
* Returns: true if de-registration was successful, false otherwise
*/
public bool releaseQueue_nothrow(Queue queue)
{
/* Lock the queue of queues */
queuesLock.lock();
/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}
/* Search for the queue, return false if it does NOT exist */
foreach(Queue curQueue; queues)
{
if(curQueue.getID() == queue.getID())
{
/* Remove the queue */
queues.linearRemoveElement(queue);
/* De-registration succeeded */
return true;
}
}
/* De-registration failed */
return false;
}
/**
* Sets the default queue
*
* The default queue, when set/enabled, is the queue that will
* be used to enqueue messages that have a tag which doesn't
* match any of the normally registered queues.
*
* Please note that the ID of the queue passed in here does not
* mean anything in this context; only the queuing facilities
* of the Queue object are used
*
* Params:
* queue = the default queue to use
*/
public void setDefaultQueue(Queue queue)
{
this.defaultQueue = queue;
}
/**
* Returns the default queue
*
* Returns: the default queue
* Throws:
* TristanableException if there is no default queue
*/
public Queue getDefaultQueue()
{
/* The potential default queue */
Queue potentialDefaultQueue = getDefaultQueue_nothrow();
if(potentialDefaultQueue is null)
{
throw new TristanableException(ErrorType.NO_DEFAULT_QUEUE);
}
return potentialDefaultQueue;
}
/**
* Returns the default queue
*
* This is the nothrow version
*
* Returns: the default queue if found, null otherwise
*/
public Queue getDefaultQueue_nothrow()
{
return defaultQueue;
}
/**
* Sends the provided message over the socket
*
* Params:
* message = the TaggedMessage to send
*/
public void sendMessage(TaggedMessage message)
{
/**
* If a queue with the tag of the message does
* not exist, then register it if the config
* option was enabled
*/
if(config.registerOnSend)
{
/* Create a Queue with the tag */
Queue createdQueue = new Queue(message.getTag());
/* Attempt to register the queue */
registerQueue_nothrow(createdQueue);
}
/* Encode the message */
byte[] encodedMessage = message.encode();
/* Send it using bformat (encode-and-send) */
bClient.sendMessage(encodedMessage);
}
}
// TODO: Fix this, write it in a nicer way
// ... or make a private constructor here that
// ... does not take it in
version(unittest)
{
Socket nullSock = null;
}
/**
* Test retrieving a queue which does not
* exist
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Shouldn't be found */
try
{
manager.getQueue(69);
assert(false);
}
catch(TristanableException e)
{
assert(e.getError() == ErrorType.QUEUE_NOT_FOUND);
}
/* Shouldn't be found */
assert(manager.getQueue_nothrow(69) is null);
}
/**
* Test registering a queue and then fetching it
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Create a new queue with tag 69 */
Queue queue = new Queue(69);
try
{
/* Register the queue */
manager.registerQueue(queue);
/* Fetch the queue */
Queue fetchedQueue = manager.getQueue(69);
/* Ensure the queue we fetched is the one we stored (the references would be equal) */
assert(fetchedQueue == queue);
}
catch(TristanableException e)
{
assert(false);
}
/* Should be found */
assert(manager.getQueue_nothrow(69) !is null);
}
/**
* Tests registering a queue and then registering
* another queue with the same id
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Create a new queue with tag 69 */
Queue queue = new Queue(69);
/* Register the queue */
manager.registerQueue(queue);
try
{
/* Register the queue (try again) */
manager.registerQueue(queue);
assert(false);
}
catch(TristanableException e)
{
assert(e.getError() == ErrorType.QUEUE_ALREADY_EXISTS);
}
}
/**
* Tests registering a queue, de-registering it and
* then registering it again
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Create a new queue with tag 69 */
Queue queue = new Queue(69);
/* Register the queue */
manager.registerQueue(queue);
/* Ensure it is registered */
assert(queue == manager.getQueue(69));
/* De-register the queue */
manager.releaseQueue(queue);
/* Ensure it is de-registered */
assert(manager.getQueue_nothrow(69) is null);
/* Register the queue (again) */
manager.registerQueue(queue);
/* Ensure it is registered (again) */
assert(queue == manager.getQueue(69));
}
/**
* Tests registering a queue using the "next available queue"
* method
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Get the next 3 available queues */
Queue queue1 = manager.getUniqueQueue();
Queue queue2 = manager.getUniqueQueue();
Queue queue3 = manager.getUniqueQueue();
/* The queues should have tags [0, 1, 2] respectively */
assert(queue1.getID() == 0);
assert(queue2.getID() == 1);
assert(queue3.getID() == 2);
}
// TODO: Add testing for queue existence (internal method)

View File

@ -0,0 +1,15 @@
/**
* Interface which manages a provided socket
* and enqueuing and dequeuing of queues
*/
module tristanable.manager;
/**
* The management facilities
*/
public import tristanable.manager.manager : Manager;
/**
* Configuration for the manager
*/
public import tristanable.manager.config : Config, defaultConfig;

View File

@ -0,0 +1,297 @@
/**
* Facilitates the reading of messages from the socket,
* decoding thereof and final enqueuing thereof into their
* respective queus
*/
module tristanable.manager.watcher;
import core.thread : Thread;
import tristanable.manager.manager : Manager;
import std.socket;
import bformat;
import tristanable.encoding;
import tristanable.exceptions;
import tristanable.queue.queue;
import bformat.client;
/**
* Watches the socket on a thread of its own,
* performs the decoding of the incoming messages
* and places them into the correct queues via
* the associated Manager instance
*/
public class Watcher : Thread
{
/**
* The associated manager to use
* such that we can place new mail
* into their respective inboxes (queues)
*/
private Manager manager;
/**
* The BClient to read from
*/
private BClient bClient;
/**
* Creates a new `Watcher` that is associated
* with the provided `Manager` such that it can
* add to its registered queues. The provided `Socket`
* is such that it can be read from and managed.
*
* Params:
* manager = the `Manager` to associate with
* bclient = the underlying `BClient` to read data from
*/
package this(Manager manager, BClient bClient)
{
this.manager = manager;
this.bClient = bClient;
super(&watch);
}
/**
* Starts the underlying thread
*/
package void startWatcher()
{
/* Start the watch method on a new thread */
start();
}
/**
* Watches the socket for incoming messages
* and decodes them on the fly, placing
* the final message in the respective queue
*/
private void watch()
{
import std.stdio;
while(true)
{
/* Do a bformat read-and-decode */
byte[] wireTristan;
version(unittest) { writeln("Before bformat recv()"); }
bool recvStatus = bClient.receiveMessage(wireTristan); // TODO: Add a check for the status of read
version(unittest) { writeln("After bformat recv()"); }
version(unittest) { writeln("bformat recv() status: ", recvStatus); }
if(recvStatus)
{
/* Decode the received bytes into a tagged message */
TaggedMessage decodedMessage = TaggedMessage.decode(wireTristan);
version(unittest) { writeln("Watcher received: ", decodedMessage); }
/* Search for the queue with the id provided */
ulong messageTag = decodedMessage.getTag();
Queue potentialQueue = manager.getQueue_nothrow(messageTag);
/* If a queue can be found */
if(potentialQueue !is null)
{
/* Enqueue the message */
potentialQueue.enqueue(decodedMessage);
}
/* If the queue if not found */
else
{
/**
* Look for a default queue, and if one is found
* then enqueue the message there. Otherwise, drop
* it by simply doing nothing.
*/
try
{
potentialQueue = manager.getDefaultQueue();
/* Enqueue the message */
potentialQueue.enqueue(decodedMessage);
}
catch(TristanableException e) {}
}
version(unittest) { writeln("drip"); }
}
/**
* If there was an error receiving on the socket.
*
* This can be either because we have shut the socket down
* or the remote end has closed the connection.
*
* In any case, exit the loop therefore ending this thread.
*/
else
{
break;
}
}
}
/**
* Shuts down the watcher, unblocks the blocking read in the loop
* resulting in the watcher thread ending
*/
package void shutdown()
{
/* Closes the bformat reader */
bClient.close();
}
}
/**
* Set up a server which will send some tagged messages to us (the client),
* where we have setup a `Manager` to watch the queues with tags `42` and `69`,
* we then dequeue some messages from both queus. Finally, we shut down the manager.
*/
unittest
{
import std.socket;
import std.stdio;
import core.thread;
Address serverAddress = parseAddress("::1", 0);
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
server.bind(serverAddress);
server.listen(0);
class ServerThread : Thread
{
this()
{
super(&worker);
}
private void worker()
{
Socket clientSocket = server.accept();
BClient bClient = new BClient(clientSocket);
Thread.sleep(dur!("seconds")(7));
writeln("Server start");
/**
* Create a tagged message to send
*
* tag 42 payload Cucumber 😳
*/
TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️");
byte[] tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 69 payload Hello
*/
message = new TaggedMessage(69, cast(byte[])"Hello");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 69 payload Bye
*/
message = new TaggedMessage(69, cast(byte[])"Bye");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 100 payload Bye
*/
message = new TaggedMessage(100, cast(byte[])"DEFQUEUE_1");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 200 payload Bye
*/
message = new TaggedMessage(200, cast(byte[])"DEFQUEUE_2");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
}
}
ServerThread serverThread = new ServerThread();
serverThread.start();
Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
writeln(server.localAddress);
Manager manager = new Manager(client);
Queue sixtyNine = new Queue(69);
Queue fortyTwo = new Queue(42);
manager.registerQueue(sixtyNine);
manager.registerQueue(fortyTwo);
// Register a default queue (tag ignored)
Queue defaultQueue = new Queue(2332);
manager.setDefaultQueue(defaultQueue);
/* Connect our socket to the server */
client.connect(server.localAddress);
/* Start the manager and let it manage the socket */
manager.start();
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
TaggedMessage dequeuedMessage = sixtyNine.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 69);
assert(dequeuedMessage.getPayload() == cast(byte[])"Hello");
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = sixtyNine.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 69);
assert(dequeuedMessage.getPayload() == cast(byte[])"Bye");
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = fortyTwo.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 42);
assert(dequeuedMessage.getPayload() == cast(byte[])"Cucumber 😳️");
/* Dequeue two messages from the default queue */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = defaultQueue.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 100);
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_1");
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = defaultQueue.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 200);
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_2");
/* Stop the manager */
manager.stop();
}

View File

@ -1,7 +1,25 @@
/**
* Tristanable network message queuing framework
*/
module tristanable;
public import tristanable.encoding;
/**
* Interface which manages a provided socket
* and enqueuing and dequeuing of queues
*/
public import tristanable.manager;
public import tristanable.queue;
public import tristanable.queueitem;
public import tristanable.watcher;
/**
* A queue of queue items all of the same tag
*/
public import tristanable.queue.queue : Queue;
/**
* Error handling type definitions
*/
public import tristanable.exceptions : TristanableException, ErrorType;
/**
* Encoding/decoding of the tristanable format
*/
public import tristanable.encoding : TaggedMessage;

View File

@ -1,172 +0,0 @@
/**
* Queue
*
* Represents a queue with a tag.
*
* Any messages that are received with
* the matching tag (to this queue) are
* then enqueued to this queue
*/
module tristanable.queue;
import tristanable.queueitem : QueueItem;
import std.socket : Socket;
import core.sync.mutex : Mutex;
import bmessage : bSendMessage = sendMessage;
import core.thread : Thread;
import std.container.dlist;
import std.range : walkLength;
public enum QueuePolicy : ubyte
{
LENGTH_CAP = 1
}
public final class Queue
{
/* This queue's tag */
private ulong tag;
/* The queue */
private DList!(QueueItem) queue;
/* The queue mutex */
private Mutex queueLock;
/**
* Construct a new queue with the given
* tag
*/
this(ulong tag, QueuePolicy flags = cast(QueuePolicy)0)
{
this.tag = tag;
/* Initialize the mutex */
queueLock = new Mutex();
this.flags = flags;
}
public void setLengthCap(ulong lengthCap)
{
this.lengthCap = lengthCap;
}
public ulong getLengthCap(ulong lengthCap)
{
return lengthCap;
}
/**
* Queue policy settings
*/
private ulong lengthCap = 1;
private QueuePolicy flags;
public void enqueue(QueueItem item)
{
/* Lock the queue */
queueLock.lock();
/**
* Check to see if the queue has a length cap
*
* If so then determine whether to drop or
* keep dependent on current capacity
*/
if(flags & QueuePolicy.LENGTH_CAP)
{
if(walkLength(queue[]) == lengthCap)
{
goto unlock;
}
}
/* Add it to the queue */
queue ~= item;
unlock:
/* Unlock the queue */
queueLock.unlock();
}
/**
* Returns true if this queue has items ready
* to be dequeued, false otherwise
*/
public bool poll()
{
/* Status */
bool status;
/* Lock the queue */
queueLock.lock();
status = !queue.empty();
/* Unlock the queue */
queueLock.unlock();
return status;
}
/**
* Attempts to coninuously dequeue the
* head of the queue
*
* TODO: Add a timeout capability
* TODO: Add tryLock, yield on failure (with loop for recheck ofc)
* TODO: Possible multiple dequeue feature? Like .receive
*/
public QueueItem dequeue()
{
/* The head of the queue */
QueueItem queueHead;
while(!queueHead)
{
/* Lock the queue */
queueLock.lock();
/* Check if we can dequeue anything */
if(!queue.empty())
{
/* If we can then dequeue */
queueHead = queue.front();
queue.removeFront();
/* Chop off the head */
// offWithTheHead();
}
/* Unlock the queue */
queueLock.unlock();
/**
* Move away from this thread, let
* the watcher (presumably) try
* access our queue (successfully)
* by getting a lock on it
*
* Prevents us possibly racing back
* and locking queue again hence
* starving the system
*/
Thread.getThis().yield();
}
return queueHead;
}
/**
* Returns the tag for this queue
*/
public ulong getTag()
{
return tag;
}
}

View File

@ -0,0 +1,240 @@
/**
* A queue of queue items all of the same tag
*/
module tristanable.queue.queue;
import core.sync.mutex : Mutex;
import core.sync.condition : Condition;
import core.sync.exception : SyncError;
import std.container.slist : SList;
import tristanable.encoding;
import core.time : Duration, dur;
import tristanable.exceptions;
version(unittest)
{
import std.stdio;
import std.conv : to;
}
/**
* Represents a queue whereby messages of a certain tag/id
* can be enqueued to (by the `Watcher`) and dequeued from
* (by the user application)
*/
public class Queue
{
/**
* This queue's unique ID
*/
private ulong queueID;
/**
* The libsnooze event used to sleep/wake
* on queue events
* Mutex for the condition variable
*/
private Mutex mutex;
/**
* The condition variable used to sleep/wake
* on queue of events
*/
private Condition signal;
/**
* The queue of messages
*/
private SList!(TaggedMessage) queue;
/**
* The lock for the message queue
*/
private Mutex queueLock;
/**
* If a message is enqueued prior
* to us sleeping then we won't
* wake up and return for it.
*
* Therefore a periodic wakeup
* is required.
*/
private Duration wakeInterval;
/**
* Constructs a new Queue and immediately sets up the notification
* sub-system for the calling thread (the thread constructing this
* object) which ensures that a call to dequeue will immediately
* unblock on the first message received under this tag
*
* Params:
* queueID = the id to use for this queue
*/
this(ulong queueID)
{
/* Initialize the queue lock */
this.queueLock = new Mutex();
/* Initialize the condition variable */
this.mutex = new Mutex();
this.signal = new Condition(this.mutex);
/* Set the queue id */
this.queueID = queueID;
/* Set the slumber interval */
this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value
}
/**
* Returns the current wake interval
* for the queue checker
*
* Returns: the `Duration`
*/
public Duration getWakeInterval()
{
return this.wakeInterval;
}
/**
* Sets the wake up interval
*
* Params:
* interval = the new interval
*/
public void setWakeInterval(Duration interval)
{
this.wakeInterval = interval;
}
/**
* Enqueues the provided tagged message onto this queue
* and then wakes up any thread that has called dequeue
* on this queue as well
*
* On error enqueueing a `TristanableException` will be
* thrown.
*
* Params:
* message = the TaggedMessage to enqueue
*/
public void enqueue(TaggedMessage message)
{
version(unittest)
{
writeln("queue["~to!(string)(queueID)~"]: Enqueuing '"~to!(string)(message)~"'...");
}
scope(exit)
{
version(unittest)
{
writeln("queue["~to!(string)(queueID)~"]: Enqueued '"~to!(string)(message)~"'!");
}
/* Unlock the item queue */
queueLock.unlock();
}
/* Lock the item queue */
queueLock.lock();
/* Add the item to the queue */
queue.insertAfter(queue[], message);
/* Wake up anyone wanting to dequeue from us */
try
{
// TODO: Make us wait on the event (optional with a time-out)
signal.notifyAll();
}
catch(SyncError snozErr)
{
// Throw an exception on a fatal exception
throw new TristanableException(ErrorType.ENQUEUE_FAILED);
}
}
// TODO: Make a version of this which can time out
/**
* Blocks till a message can be dequeued from this queue
*
* On error dequeueing a `TristanableException` will be
* thrown.
*
* Returns: the dequeued TaggedMessage
*/
public TaggedMessage dequeue()
{
version(unittest)
{
writeln("queue["~to!(string)(queueID)~"]: Dequeueing...");
}
/* The dequeued message */
TaggedMessage dequeuedMessage;
scope(exit)
{
version(unittest)
{
writeln("queue["~to!(string)(queueID)~"]: Dequeued '"~to!(string)(dequeuedMessage)~"'!");
}
}
/* Block till we dequeue a message successfully */
while(dequeuedMessage is null)
{
scope(exit)
{
// Unlock the mutex
this.mutex.unlock();
}
// Lock the mutex
this.mutex.lock();
try
{
this.signal.wait(this.wakeInterval);
}
catch(SyncError e)
{
// Throw an exception on a fatal exception
throw new TristanableException(ErrorType.DEQUEUE_FAILED);
}
/* Lock the item queue */
queueLock.lock();
/* Consume the front of the queue (if non-empty) */
if(!queue.empty())
{
/* Pop the front item off */
dequeuedMessage = queue.front();
/* Remove the front item from the queue */
queue.linearRemoveElement(dequeuedMessage);
}
/* Unlock the item queue */
queueLock.unlock();
}
return dequeuedMessage;
}
/**
* Get the id/tag of this queue
*
* Returns: the queue's id
*/
public ulong getID()
{
return queueID;
}
}

View File

@ -1,18 +0,0 @@
module tristanable.queueitem;
public final class QueueItem
{
/* This item's data */
private byte[] data;
/* TODO: */
this(byte[] data)
{
this.data = data;
}
public byte[] getData()
{
return data;
}
}

View File

@ -1,107 +0,0 @@
module tristanable.watcher;
import std.socket : Socket;
import core.sync.mutex : Mutex;
import bmessage : receiveMessage;
import tristanable.queue : Queue;
import tristanable.queueitem : QueueItem;
import tristanable.manager : Manager;
import core.thread : Thread;
import tristanable.encoding;
import tristanable.exceptions;
public final class Watcher : Thread
{
/* The manager */
private Manager manager;
/* The socket to read from */
private Socket socket;
private bool running;
this(Manager manager, Socket endpoint)
{
super(&run);
this.manager = manager;
socket = endpoint;
running = true;
start();
}
public void shutdown()
{
running=false;
/* Close the socket, causing an error, breaking the event loop */
socket.close();
}
private void run()
{
/* Continuously dequeue tristanable packets from socket */
while(true)
{
/* Receive payload (tag+data) */
byte[] receivedPayload;
/* Block for socket response */
bool recvStatus = receiveMessage(socket, receivedPayload);
/* If the receive was successful */
if(recvStatus)
{
/* Decode the ttag-encoded message */
DataMessage message = DataMessage.decode(receivedPayload);
/* TODO: Remove isTag, improve later, oneshot */
/* The matching queue (if any) */
Queue queue = manager.getQueue(message.getTag());
/* If the tag belongs to a queue */
if(queue)
{
/* Add an item to this queue */
queue.enqueue(new QueueItem(message.getData()));
}
/* If the tag is unknwon */
else
{
/* TODO: Add to dropped queue? */
/* Do nothing */
}
}
/* If the receive failed */
else
{
/* TODO: depending on `running`, different error */
/* TODO: Stop everything */
break;
}
/**
* Like in `dequeue` we don't want the possibility
* of racing back to the top of the loop and locking
* the mutex again right before a thread switch,
* so we make sure that a switch occurs to a different
* thread
*/
Thread.getThis().yield();
}
/* Check if we had an error */
if(running)
{
throw new TristanableException(manager, "bformat socket error");
}
else
{
/* Actual shut down, do nothing */
}
}
}

10
todo
View File

@ -1,10 +0,0 @@
- [x] Use queues
- [x] Immediate head chop after dequeue
- [x] Thread.yields
- [ ] Sleep option
- [x] Use linked list for queues (increase performance)
- [ ] shutdown option
- [x] Queue policies
- [x] Length cap
- [ ] Exceptions
- [x] Queue deletion

@ -1 +0,0 @@
Subproject commit cb68e9f673dd7b2f490ee4ff2262e45b41a90a0f

Binary file not shown.