Compare commits

..

76 Commits

Author SHA1 Message Date
Tristan B. Velloza Kildaire 7ca6489b58 Dub
- Upgraded `libsnooze` to version `1.3.0-beta`
2023-07-05 13:42:46 +02:00
Tristan B. Velloza Kildaire 4611da0d1d Dub
- Required minimum version of `bformat`, `4.1.0`
2023-06-18 13:40:36 +02:00
Tristan B. Velloza Kildaire 628d8444b7 Queue
- `enqueue(TaggedMessage)` can now throw a `TristanableException` if a `FataException` with `libsnooze` occurrs
- `dequeue()` can now throw a `TristanableException` if a `FatalException` occurs during the call to `wait()` on `libsnooze`
2023-06-14 15:23:18 +02:00
Tristan B. Velloza Kildaire 6dd832f807 Exceptions
- Added enum members `DEQUEUE_FAILED` and `ENQUEUE_FAILED` to `ErrorType` enum
2023-06-13 17:50:25 +02:00
Tristan B. Velloza Kildaire ddcad89d00 Queue
- Be specific, catch `FatalException` in `enqueue(TaggedMessage)`
- Be specific, catch `InterruptedException` and `FatalException` seperately
2023-06-13 17:43:39 +02:00
Tristan B. Velloza Kildaire 3ff4519d99 Dub
- Now requires a minimum version of `libsnooze` of at least `
1.0.0-beta`
2023-06-13 17:39:05 +02:00
Tristan B. Velloza Kildaire 25775414e1
Merge pull request #7 from deavmi/nextgen_queue_remove
Ability to de-register a queue
2023-05-04 09:51:45 +02:00
Tristan B. Velloza Kildaire bbe7b344ec Merge branch 'nextgen' into nextgen_queue_remove 2023-05-04 09:47:51 +02:00
Tristan B. Velloza Kildaire e0977837af Merge branch 'master' into nextgen 2023-05-04 09:47:39 +02:00
Tristan B. Velloza Kildaire b283ebcdfc Merge branch 'nextgen' into nextgen_queue_remove 2023-05-04 09:46:15 +02:00
Tristan B. Velloza Kildaire 89fce3bae9 Unit tests
- Added a TODO
2023-05-04 09:45:26 +02:00
Tristan B. Velloza Kildaire 798acba4aa Manager
- Implemented `releaseQueue(Queue)`
- Implemented `releaseQueue_nothrow(Queue)`

Unit tests

- Added unit test for `releaseQueue(Queue)`
2023-05-04 09:45:06 +02:00
Tristan B. Velloza Kildaire abe64f7701 Manager
- Added a TODO for the future `removeQueue(Queue)` and `removeQueu_nothrow(Queue)`
2023-05-03 23:10:45 +02:00
Tristan B. Velloza Kildaire 1437895669
Merge pull request #6 from deavmi/nextgen_river
Bformat+River upgrade
2023-04-30 19:42:09 +02:00
Tristan B. Velloza Kildaire 10e230ea2f Merge branch 'nextgen' into nextgen_river 2023-04-30 19:36:50 +02:00
Tristan B. Velloza Kildaire 8d3534b216 Merge branch 'master' into nextgen 2023-04-30 19:34:27 +02:00
Tristan B. Velloza Kildaire 64163aed0a - Upgraded to new `bformat` version `4.1.0` and migrated to using `BClient` (unit tests seem to pass) 2023-04-30 19:30:11 +02:00
Tristan B. Velloza Kildaire 97ecbd86bb Watcher
- Documented method `shutdown()`
2023-04-07 12:27:14 +02:00
Tristan B. Velloza Kildaire b10807e279 Package
- Removed whitespace
2023-04-07 12:23:35 +02:00
Tristan B. Velloza Kildaire 19742d9276 Queue
- Docuemneted `getId()`
- Documented the `Queue` class
- Documented fields `event`, `queue` and `queueLock`
2023-04-07 12:22:51 +02:00
Tristan B. Velloza Kildaire 6e972b6cc5 Watcher
- Documented module `tristanable.manager.watcher`
2023-04-07 12:19:19 +02:00
Tristan B. Velloza Kildaire 97e72e09f0 Watcher
- Added documentation to the constructor for `Watcher`
2023-04-07 12:18:20 +02:00
Tristan B. Velloza Kildaire 1b6dd5d746 Manager
- Removed now-completed TODO
- Added documentation for `queuesLock`
2023-04-07 12:16:52 +02:00
Tristan B. Velloza Kildaire b7bb3df7c9 Manager
- Added documentation for `start()` and `stop()`
2023-04-07 12:16:13 +02:00
Tristan B. Velloza Kildaire 3bda88267b Exceptions
- Added missing documentation
- Fixed the message generation of the exception's message
2023-04-07 12:14:02 +02:00
Tristan B. Velloza Kildaire 07d47551ae Exceptions
- Documented `ErrorType` and all its members
- Documented `TristanableException`
2023-04-06 15:46:34 +02:00
Tristan B. Velloza Kildaire 79fee5bd7e Watcher
- Documented unittest as it is a great example of how to sue tristanable
2023-04-06 13:40:38 +02:00
Tristan B. Velloza Kildaire f811273818 Unit test
- Sleep a little longer for profiling tests
2023-04-06 13:18:20 +02:00
Tristan B. Velloza Kildaire 375a611a82 Watcher
- Added package-level accessible `startWatcher()` method which calls `start()` for us
- Added some debugging prints which will now only be compiled-in during unittest builds
- If the bformat `receiveMessage(Socket, ref byte[])` method fails (returns `false`) then exit the loop, only continue decoding if it is `true`
- Implemented package-level accesible `shutdown()` method

Manager

- `start()` now calls `watcher.startWatcher()` instead of `watcher.start()`
2023-04-06 13:13:38 +02:00
Tristan B. Velloza Kildaire 2fa77e639f Queue
- Added documentation for the constructor `this(ulong)`
- Fixed issue #5

Unit tests

- The `==` operator on strings does some normalization stuff which results in differing byte sequences and therefore inequality (see the `"Cucumber 😳️"` case), therefore casting to `byte[]`
- Send another message tagged with `69`
- Fixed comment for server code sending tagged `42` message
- Call `manager.stop()` right at the end of the unit test
2023-04-06 12:28:54 +02:00
Tristan B. Velloza Kildaire 62b16de596 Watcher
- Added stub `shutdown()` method that is intended to be called by `Manager` (package-level accessible)

Unit tests

- Sleep for 4 seconds instead of 2 before the server sends the two tagged messages
- Send a messae tagged with tag `42` before that of the one tagged with `69`
- Register a queue with tag `42`
- Remove `WaitingThread`, we now receive both tagged messages (`42` and `69`) on the unittest thread
- Added assertion to ensure the tagged message of `69` is indeed received correctly (tag AND payload)
2023-04-06 08:57:52 +02:00
Tristan B. Velloza Kildaire 9f07c06e15 Package (`manager`)
- Import the `Config` type and the `defaultConfig()` function
2023-04-06 08:51:47 +02:00
Tristan B. Velloza Kildaire 26e856b7a1 Manager
- Added support for configuring the `Manager`, the constructor now uses the default configuration
- Implemented `registerQueue_nothrow(Queue)` which returns `true` on success, `false` otherwise
- `registerQueue(Queue)` now makes a sub-call to `registerQueue_nothrow(Queue)`
- Implemented `sendMessage(TaggedMessage)`
- Added comment for assertions in unittest
2023-04-06 08:51:32 +02:00
Tristan B. Velloza Kildaire 62cc615be3 Config
- Make the default configuration generated more explicit in `defaultConfig`
- Remove initialization in `Config` (would be `false` in any case)
2023-04-06 08:49:44 +02:00
Tristan B. Velloza Kildaire af4eed748f Config
- Added new module `config`
- Added new type `Config` which is used for configuring an instance of `Manager`
- Added `defaultConfig()` which returns the default `Config` instance used
2023-04-06 08:49:02 +02:00
Tristan B. Velloza Kildaire 4632929123 Manager
- Removed empty unittest
2023-04-06 08:29:30 +02:00
Tristan B. Velloza Kildaire 6b13303c9d Manager
- Added unittest for `getUniqueQueue()`
- Typo fix
2023-04-06 08:26:01 +02:00
Tristan B. Velloza Kildaire 63698c0f87 Manager
- Implemented `getUniqueQueue()` which finds an unused tag, makes a `Queue` with said tag, registers it and then returns it
- WIP: `shutdown()` method
2023-04-05 15:35:55 +02:00
Tristan B. Velloza Kildaire cf62054eb8 Encoding
- Added module-level documentation

Exceptions

- Added module-level documentation

Queue

- Added module-level documentation

Package (`tristanable.manager`)

- Added module-level documentation
2023-04-05 08:47:51 +02:00
Tristan B. Velloza Kildaire 41ebb30754 Exceptions
- Removed unused enum member `QueueExists` of enum `ErrorType`
2023-04-05 08:46:17 +02:00
Tristan B. Velloza Kildaire c6568a566c Package
- Removed completed TODO
2023-04-05 08:45:48 +02:00
Tristan B. Velloza Kildaire c6611fc26b Watcher
- Deleted old module that was unused
2023-04-05 08:44:10 +02:00
Tristan B. Velloza Kildaire 1336a37d13 Unit test (Watcher)
- Unit test for watcher works
2023-04-05 08:40:09 +02:00
Tristan B. Velloza Kildaire f7565b6de2 Manager
- Clean up I guess
2023-04-05 08:39:50 +02:00
Tristan B. Velloza Kildaire 883949e555 Queue
- Added entrance and exit debugs for `dequeue()`
2023-04-05 08:39:35 +02:00
Tristan B. Velloza Kildaire c2e034715d Queue
- Replaced now-completed TODO with an actual comment in `dequeue()`
2023-04-05 08:33:48 +02:00
Tristan B. Velloza Kildaire eecb9fef02 Queue
- Added imports for `std.stdio` and `to` from `std.conv` to be imported when compiling in `unittest` mode
- Added documentation to `enqueue(TaggedMessage)`
- Implemented `enqueue(TaggedMessage)` using libsnooze
- Added documentation for `dequeue()`
- Implemented `dequeue()` using libsnooze
2023-04-05 08:33:08 +02:00
Tristan B. Velloza Kildaire 16bbeeece4 Manager
- Added a default queue
- `getQueue(ulong)` now calls `getQueue_nothrow(ulong)` with the same id
- Implemented `getQueue_nothrow(ulong)` which returns the `Queue` if found, `null` otherwise
- Added `getDefaultQueue()` which gets the default queue by calling `getDefaultQueue_nothrow(ulong)` with the same id
- Added `getDefaultQueue_nothrow(ulong)` which returns the default queue as a `Queue` object if it exists, else `null`
- Added `setDefaultQueue(Queue)` which sets the provided queue as the default queue (i.e. the queue where messages tagged with a tag of a queue not registered will be dumped into - if the default queue is set)

Watcher

- Set the worker thread, `watch`, in the constructor
- Added a TODO relating to checking if the socket read succeeded or not
- Added a debug print for the received `TaggedMessage` post-decode
- Extract the tag of the message and find the matching queue (potentially, if it exists)
- If the queue exists then add the `TaggedMessage` to said `Queue`
- If the queue doesn't exist then, get the so-called "Default queue", if it doesn't exist don't do anything, if it does then enqueue the message (the `TaggedMessage`) to said `Queue`

Unit test

- Added a unit test (WIP) for testing the `Manager` and `Watcher` mechanism
- Updated unittest to test the `getQueue_nothrow(ulong)` method
- Added a unit test to test adding a `Queue` with a tag that already exists in a `Queue` registered prior
2023-03-31 11:31:56 +02:00
Tristan B. Velloza Kildaire 8cf731089e Encoding
- Implemented `toString()` in `TaggedMessage`
2023-03-30 18:26:34 +02:00
Tristan B. Velloza Kildaire 4c530dd7cd - Removed `QueueItem` 2023-03-30 13:36:54 +02:00
Tristan B. Velloza Kildaire 409dacd4da Queue
- The actual queue is now an `SList!(TaggedMessage)`
- Added a stub method `enqueue(TaggedMessage)`
- Updated the stub method `dequeue()` which returns a `TaggedMessage` now
2023-03-30 13:35:09 +02:00
Tristan B. Velloza Kildaire 0d94ed0c83 Exceptions
- Added new member `NO_DEFAULT_QUEUE` to enum `ErrorType`
2023-03-30 13:33:17 +02:00
Tristan B. Velloza Kildaire 06e80eec84 Manager
- Documented `registerQueue(Queue)`
2023-03-30 12:36:52 +02:00
Tristan B. Velloza Kildaire ab07df80ea Manager
- Removed now-completed TODO in `registerQueue(QUeue)`
2023-03-30 12:35:41 +02:00
Tristan B. Velloza Kildaire a13ff17c0d Package (tristanable)
- Fixed up the `exceptions` module import
2023-03-29 16:04:51 +02:00
Tristan B. Velloza Kildaire 99c14bc699 Manager
- Changed from using D's dynamic arrays for the array of `Queue` objects to using an `SList!(T)` where `T` is the `Queue` type
- Implemented `getQueue(ulong)` which returns the `Queue` object with the matching id/tag, else throws an instance of `TristanabaleException`
- Implemented `registerQueue(Queue)` which will attempt to add the provided `Queue` given that a queue does not already exist with the provided queue's id; if that is the case then an instance of `TristanableException` is thrown

Queue

- Made the constructor take in the `ulong` queue ID
- Made the constructor publically accessible
- Implemented `getID()` which returns the `Queue`'s id as a `ulong`
- Removed the static method `newQueue(ulong)`

Unit test

- Added a unit test to test `getQueue(ulong)` when the queue cannot be found
- Added a unit test to test adding a queue and successfully retrieving it
2023-03-29 16:01:34 +02:00
Tristan B. Velloza Kildaire 454e7dd18e Watcher
- Moved TODO below already completed code

Exceptions

- Renamed `Error` to `ErrorType`
- Constructing a new `TristanableException` will now store the passed in `ErrorType`
- Added `getError()` to `TristanableException` which returns the stored `ErrorType`
- Added two new memebrs to enum `ErrorType`, namely `QUEUE_NOT_FOUND` and `QUEUE_ALREADY_EXISTS`
2023-03-29 15:57:51 +02:00
Tristan B. Velloza Kildaire 8942bd7f85 Watcher
- Added import for `bformat` and `encoding` module
- Documented `watch()`
- Added `bformat` read-and-decode `receiveMessage(Socket, ref byte[])` call followed by a `TaggedMessage.decode(byte[])` call
2023-03-27 21:58:04 +02:00
Tristan B. Velloza Kildaire e1676e2acc - Fixed formatting in `README.md` 2023-03-27 16:21:37 +02:00
Tristan B. Velloza Kildaire 6b04a0325a TaggedMessage
- Added documentation for fields `tag` and `data`
- Added documentation for both constructors
- Added documentation for `getPayload()`, `getTag()`, `setPayload(byte[])` and `setTag(ulong)`
2023-03-27 16:20:26 +02:00
Tristan B. Velloza Kildaire cd0eed6dda Encoding
- Added parameter-less (default) constructor marked as `private` to `TaggedMessage`
- Added decoding support in `decode(byte[])` which will return a new instance of `TaggedMessage`
- Added a unit test to test encoding and decoding
2023-03-27 16:02:39 +02:00
Tristan B. Velloza Kildaire ef95b15a2d Merge branch 'nextgen' of github.com:deavmi/tristanable into nextgen 2023-03-27 15:43:10 +02:00
Tristan B. Velloza Kildaire f75604eca9 - Attempt merge 2023-03-27 15:41:50 +02:00
Tristan B. Velloza Kildaire 83b2a11c80 Encoding
- Added stub class `TaggedMessage`
- Added constructor, static decoder (unimplemented), `encoder (implemented), getters and setters
- Added module `tristanable.encoding`
2023-03-27 15:40:20 +02:00
Tristan B. Velloza Kildaire f36a6ba454 Package (tristanable)
- Added an import for `TaggedMessage` from module `tristanable.encoding`
2023-03-26 18:37:22 +02:00
Tristan B. Velloza Kildaire a05bc3e2fd Manager
- Added stub `sendMessage(TaggedMessage)` which will encode into the tristanable format, then wrap into bformat and send over the socket
- Added import for `TaggedMessage` from `tristanable.encoding` module
2023-03-26 18:35:19 +02:00
Tristan B. Velloza Kildaire ed68bf7cd6 - Moved `Watcher` and `Manager` modules to their own package
- Ensured `Watcher`'s constructor is package-level accessible only

Manager

- The constructor now creates an instance of `Watcher`
- Added a `start()` method which calls `watcher.start()`
2023-03-26 18:31:52 +02:00
Tristan B. Velloza Kildaire 88432ab8d5 Manager
- Added unit test TODO
2023-03-26 18:27:24 +02:00
Tristan B. Velloza Kildaire 0d740d6231 Watcher
- Added constructor which takes in an instance of `Manager` and an instance of `Socket`
2023-03-26 18:26:07 +02:00
Tristan B. Velloza Kildaire 80d870e41a Manager
- Added field `watcher` of type `Watcher`
2023-03-26 18:24:15 +02:00
Tristan B. Velloza Kildaire de44080c6b Package (tristanable)
- Added public imports along with comments per each

Encoding

- Added a stub class, `TaggedMessage`, for encoding and decoding the tristanable byte payload

Exceptions

- Added `TristanableException` exception type along with the `Error` enum sub-type

Manager

- Added stub code for `Manager` to manage the queues and socket

Queue

- Added stub class representing a queue with a tag (`Queue`)

QueueItem

- Added stub class `QueueItem` which represents an item that is enqueued/dequeued onto a `Queue`

Watcher

- Added stub class `Watcher` which will manage the socket reading-wise
2023-03-26 18:22:15 +02:00
Tristan B. Velloza Kildaire e8454d61df - Use `https` link rather to `bformat` homepage 2023-03-26 16:49:41 +02:00
Tristan B. Velloza Kildaire a3c8b9dd9d - Updated `.gitignore` 2023-03-26 16:21:04 +02:00
Tristan B. Velloza Kildaire 4dd4199f20 - Removed executable 2023-03-26 16:20:39 +02:00
Tristan B. Velloza Kildaire a935aa65dd - Added `bformat` version `3.1.13` as dependency 2023-03-26 12:20:21 +02:00
Tristan B. Velloza Kildaire 6ea030301c - Added new logo to `README.md`
- Fixed typos in `README.md`
- Added new logo (source included)
2023-03-26 12:19:56 +02:00
8 changed files with 146 additions and 270 deletions

View File

@ -6,9 +6,9 @@ name: D
on:
push:
branches: [ "**" ]
branches: [ "master" ]
pull_request:
branches: [ "**" ]
branches: [ "master", "nextgen" ]
permissions:
contents: read
@ -22,11 +22,6 @@ jobs:
- uses: actions/checkout@v3
- uses: dlang-community/setup-dlang@4c99aa991ce7d19dd3064de0a4f2f6b2f152e2d7
- name: Install Doveralls (code coverage tool)
run: |
dub fetch doveralls
sudo apt install libcurl4-openssl-dev
- name: 'Build & Test'
run: |
# Build the project, with its main file included, without unittests
@ -34,8 +29,4 @@ jobs:
# Build and run tests, as defined by `unittest` configuration
# In this mode, `mainSourceFile` is excluded and `version (unittest)` are included
# See https://dub.pm/package-format-json.html#configurations
dub test --compiler=$DC --coverage
- name: Coverage upload
run: |
dub run doveralls -- -t ${{secrets.COVERALLS_REPO_TOKEN}}
dub test --compiler=$DC

166
README.md
View File

@ -3,8 +3,7 @@
tristanable
===========
[![D](https://github.com/deavmi/tristanable/actions/workflows/d.yml/badge.svg)](https://github.com/deavmi/tristanable/actions/workflows/d.yml) ![DUB](https://img.shields.io/dub/v/tristanable?color=%23c10000ff%20&style=flat-square) ![DUB](https://img.shields.io/dub/dt/tristanable?style=flat-square) ![DUB](https://img.shields.io/dub/l/tristanable?style=flat-square) [![Coverage Status](https://coveralls.io/repos/github/deavmi/tristanable/badge.svg?branch=master)](https://coveralls.io/github/deavmi/tristanable?branch=master)
[![D](https://github.com/deavmi/tristanable/actions/workflows/d.yml/badge.svg)](https://github.com/deavmi/tristanable/actions/workflows/d.yml)
**Tristanable** is a library for D-based libraries and applications that need a way to receive variable-length messages of different types (via a `Socket`) and place these messages into their own respectively tagged queues indicated by their _"type"_ or `id`.
@ -18,155 +17,44 @@ Tristanable provides a way for you to receive the "IM notification first" but bl
### Code example
Below is a fully-fledged example of the types of places (networking) where tristanable can be of help. The code is all explained in the comments:
If we wanted to implement the following we would do the following. One note is that instead of waiting on messages of a specific _"type"_ (or rather **tag**), tristanable provides not just a one-message length buffer per tag but in fact a full queue per tag, meaning any received message with tag `1` will be enqueued and not dropped after the first message of type `1` is buffered.
```d
import std.socket;
import std.stdio;
import core.thread;
import tristanable.manager;
import tristanable.queue;
import tristanable.queueitem;
Address serverAddress = parseAddress("::1", 0);
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
server.bind(serverAddress);
server.listen(0);
/* Create a manager to manage the socket for us */
Manager manager = new Manager(socket);
class ServerThread : Thread
{
this()
{
super(&worker);
}
/* Create a Queue for all "weather messages" */
Queue weatherQueue = new Queue(1);
private void worker()
{
Socket clientSocket = server.accept();
BClient bClient = new BClient(clientSocket);
/* Create a Queue for all "IM notifications" */
Queue instantNotification = new Queue(2);
Thread.sleep(dur!("seconds")(7));
writeln("Server start");
/* Tell the manager to look out for tagged messages `1` and `2` */
manager.addQueue(weatherQueue);
manager.addQueue(instantNotification);
/**
* Create a tagged message to send
*
* tag 42 payload Cucumber 😳️
*/
TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️");
byte[] tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
/* Now we can block on this queue and return with its head */
QueueItem message = weatherQueue.dequeue();
```
writeln("server send [done]");
Surely, there must be some sort of encoding mechanism too? The messages after all need to be encoded. **No problem!**, we have that sorted:
/**
* Create a tagged message to send
*
* tag 69 payload Hello
*/
message = new TaggedMessage(69, cast(byte[])"Hello");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
```d
import tristanable.encoding;
writeln("server send [done]");
/* Let's send it with tag 1 and data "Hello" */
ulong tag = 1;
byte[] data = cast(byte[])"Hello";
/**
* Create a tagged message to send
*
* tag 69 payload Bye
*/
message = new TaggedMessage(69, cast(byte[])"Bye");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
/* When sending a message */
DataMessage tristanEncoded = new DataMessage(tag, data);
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 100 payload Bye
*/
message = new TaggedMessage(100, cast(byte[])"DEFQUEUE_1");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 200 payload Bye
*/
message = new TaggedMessage(200, cast(byte[])"DEFQUEUE_2");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
}
}
ServerThread serverThread = new ServerThread();
serverThread.start();
Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
writeln(server.localAddress);
Manager manager = new Manager(client);
Queue sixtyNine = new Queue(69);
Queue fortyTwo = new Queue(42);
manager.registerQueue(sixtyNine);
manager.registerQueue(fortyTwo);
// Register a default queue (tag ignored)
Queue defaultQueue = new Queue(2332);
manager.setDefaultQueue(defaultQueue);
/* Connect our socket to the server */
client.connect(server.localAddress);
/* Start the manager and let it manage the socket */
manager.start();
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
TaggedMessage dequeuedMessage = sixtyNine.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 69);
assert(dequeuedMessage.getPayload() == cast(byte[])"Hello");
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = sixtyNine.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 69);
assert(dequeuedMessage.getPayload() == cast(byte[])"Bye");
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = fortyTwo.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 42);
assert(dequeuedMessage.getPayload() == cast(byte[])"Cucumber 😳️");
/* Dequeue two messages from the default queue */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = defaultQueue.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 100);
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_1");
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = defaultQueue.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 200);
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_2");
/* Stop the manager */
manager.stop();
/* Then send it */
socket.send(encodeForSend(tristanEncoded));
```
And let tristanable handle it! We even handle the message lengths and everything using another great project [bformat](https://deavmi.assigned.network/projects/bformat).

View File

@ -5,7 +5,7 @@
"copyright": "Copyright © 2023, Tristan B. Kildaire",
"dependencies": {
"bformat": ">=4.1.1",
"niknaks": ">=0.3.0"
"libsnooze": ">=1.3.0-beta"
},
"description": "Tristanable network message queuing framework",
"homepage": "https://deavmi.assigned.network/projects/tristanable",

View File

@ -4,7 +4,6 @@
module tristanable.encoding;
import std.conv : to;
import niknaks.bits : bytesToIntegral, Order, order, toBytes;
/**
* Represents a tagged message that has been decoded
@ -61,9 +60,31 @@ public final class TaggedMessage
/* The decoded tag */
ulong decodedTag;
/* Take ulong-many bytes and only flip them to LE if not on LE host */
decodedTag = order(bytesToIntegral!(ushort)(cast(ubyte[])encodedMessage), Order.LE);
/* If on little endian then dump direct */
version(LittleEndian)
{
decodedTag = *cast(ulong*)encodedMessage.ptr;
}
/* If on big endian then reverse received 8 bytes */
else version(BigEndian)
{
/* Base of our tag */
byte* tagHighPtr = cast(byte*)decodedTag.ptr;
*(tagHighPtr+0) = encodedMessage[7];
*(tagHighPtr+1) = encodedMessage[6];
*(tagHighPtr+2) = encodedMessage[5];
*(tagHighPtr+3) = encodedMessage[4];
*(tagHighPtr+4) = encodedMessage[3];
*(tagHighPtr+5) = encodedMessage[2];
*(tagHighPtr+6) = encodedMessage[1];
*(tagHighPtr+7) = encodedMessage[0];
}
/* Blessed is the fruit of thy womb Jesus, hail Mary, mother of God, pray for our sinners - now and at the hour of our death - Amen */
else
{
pragma(msg, "Not too sure about tha 'ey 😳️");
}
/* Set the tag */
decodedMessage.setTag(decodedTag);
@ -85,9 +106,41 @@ public final class TaggedMessage
/* The encoded bytes */
byte[] encodedMessage;
/* If on little endian then no re-order, if host is BE flip (the tag) */
encodedMessage ~= toBytes(order(tag, Order.LE));
/* If on little endian, then dump 64 bit as is - little endian */
version(LittleEndian)
{
/* Base (little first) of tag */
byte* basePtr = cast(byte*)&tag;
encodedMessage ~= *(basePtr+0);
encodedMessage ~= *(basePtr+1);
encodedMessage ~= *(basePtr+2);
encodedMessage ~= *(basePtr+3);
encodedMessage ~= *(basePtr+4);
encodedMessage ~= *(basePtr+5);
encodedMessage ~= *(basePtr+6);
encodedMessage ~= *(basePtr+7);
}
/* If on big endian, then traverse 64-bit number in reverse - and tack on */
else version(BigEndian)
{
/* Base (biggest first) of tag */
byte* highPtr = cast(byte*)&tag;
encodedMessage ~= *(highPtr+7);
encodedMessage ~= *(highPtr+6);
encodedMessage ~= *(highPtr+5);
encodedMessage ~= *(highPtr+4);
encodedMessage ~= *(highPtr+3);
encodedMessage ~= *(highPtr+2);
encodedMessage ~= *(highPtr+1);
encodedMessage ~= *(highPtr+0);
}
/* Hail marry, mother of God, pray for our sinners, now and at the our of our death Amen */
else
{
pragma(msg, "Not feeling scrumptious homeslice 😎️");
}
/* Tack on the data */
encodedMessage ~= data;

View File

@ -4,7 +4,7 @@
module tristanable.manager.manager;
import std.socket;
import tristanable.queue.queue : Queue;
import tristanable.queue : Queue;
import core.sync.mutex : Mutex;
import tristanable.manager.watcher : Watcher;
import tristanable.encoding : TaggedMessage;

View File

@ -11,7 +11,7 @@ import std.socket;
import bformat;
import tristanable.encoding;
import tristanable.exceptions;
import tristanable.queue.queue;
import tristanable.queue;
import bformat.client;
/**
@ -204,28 +204,6 @@ unittest
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 100 payload Bye
*/
message = new TaggedMessage(100, cast(byte[])"DEFQUEUE_1");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 200 payload Bye
*/
message = new TaggedMessage(200, cast(byte[])"DEFQUEUE_2");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
}
}
@ -245,10 +223,6 @@ unittest
manager.registerQueue(sixtyNine);
manager.registerQueue(fortyTwo);
// Register a default queue (tag ignored)
Queue defaultQueue = new Queue(2332);
manager.setDefaultQueue(defaultQueue);
/* Connect our socket to the server */
client.connect(server.localAddress);
@ -278,19 +252,6 @@ unittest
assert(dequeuedMessage.getPayload() == cast(byte[])"Cucumber 😳️");
/* Dequeue two messages from the default queue */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = defaultQueue.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 100);
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_1");
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = defaultQueue.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 200);
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_2");
/* Stop the manager */
manager.stop();

View File

@ -12,7 +12,7 @@ public import tristanable.manager;
/**
* A queue of queue items all of the same tag
*/
public import tristanable.queue.queue : Queue;
public import tristanable.queue : Queue;
/**
* Error handling type definitions

View File

@ -1,14 +1,16 @@
/**
* A queue of queue items all of the same tag
*/
module tristanable.queue.queue;
module tristanable.queue;
// TODO: Examine the below import which seemingly fixes stuff for libsnooze
import libsnooze.clib;
import libsnooze;
import core.sync.mutex : Mutex;
import core.sync.condition : Condition;
import core.sync.exception : SyncError;
import std.container.slist : SList;
import tristanable.encoding;
import core.time : Duration, dur;
import core.thread : dur;
import tristanable.exceptions;
version(unittest)
@ -24,23 +26,11 @@ version(unittest)
*/
public class Queue
{
/**
* This queue's unique ID
*/
private ulong queueID;
/**
* The libsnooze event used to sleep/wake
* on queue events
* Mutex for the condition variable
*/
private Mutex mutex;
/**
* The condition variable used to sleep/wake
* on queue of events
*/
private Condition signal;
private Event event;
/**
* The queue of messages
@ -51,16 +41,12 @@ public class Queue
* The lock for the message queue
*/
private Mutex queueLock;
/**
* If a message is enqueued prior
* to us sleeping then we won't
* wake up and return for it.
*
* Therefore a periodic wakeup
* is required.
/**
* This queue's unique ID
*/
private Duration wakeInterval;
private ulong queueID;
/**
* Constructs a new Queue and immediately sets up the notification
@ -76,37 +62,14 @@ public class Queue
/* Initialize the queue lock */
this.queueLock = new Mutex();
/* Initialize the condition variable */
this.mutex = new Mutex();
this.signal = new Condition(this.mutex);
/* Initialize the event */
this.event = new Event();
/* Set the queue id */
this.queueID = queueID;
/* Set the slumber interval */
this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value
}
/**
* Returns the current wake interval
* for the queue checker
*
* Returns: the `Duration`
*/
public Duration getWakeInterval()
{
return this.wakeInterval;
}
/**
* Sets the wake up interval
*
* Params:
* interval = the new interval
*/
public void setWakeInterval(Duration interval)
{
this.wakeInterval = interval;
/* Ensure pipe existence (see https://deavmi.assigned.network/git/deavmi/tristanable/issues/5) */
event.wait(dur!("seconds")(0));
}
/**
@ -148,9 +111,9 @@ public class Queue
try
{
// TODO: Make us wait on the event (optional with a time-out)
signal.notifyAll();
event.notifyAll();
}
catch(SyncError snozErr)
catch(FatalException snozErr)
{
// Throw an exception on a fatal exception
throw new TristanableException(ErrorType.ENQUEUE_FAILED);
@ -188,25 +151,45 @@ public class Queue
/* Block till we dequeue a message successfully */
while(dequeuedMessage is null)
{
scope(exit)
/**
* Call `wait()` and catch any interrupts
* in which case loop back and call `wait()`
* again
*/
while(true)
{
// Unlock the mutex
this.mutex.unlock();
}
try
{
// TODO: Make us wait on the event (optional with a time-out)
event.wait();
}
catch(InterruptedException e)
{
version(unittest)
{
import std.stdio;
writeln("dequeue() had libsnooze wait() get interrupted!");
}
// Lock the mutex
this.mutex.lock();
// Retry the wait()
continue;
}
catch(FatalException fatalErr)
{
version(unittest)
{
import std.stdio;
writeln("dequeue() had libsnooze wait() get FATALLY fail! Exception will now throw...");
}
try
{
this.signal.wait(this.wakeInterval);
}
catch(SyncError e)
{
// Throw an exception on a fatal exception
throw new TristanableException(ErrorType.DEQUEUE_FAILED);
}
// Throw an exception on a fatal exception
throw new TristanableException(ErrorType.DEQUEUE_FAILED);
}
// On successful wait() wake-up exit this wait()-retry loop
break;
}
/* Lock the item queue */
queueLock.lock();