Compare commits

...

76 Commits

Author SHA1 Message Date
Tristan B. Velloza Kildaire 7ca6489b58 Dub
- Upgraded `libsnooze` to version `1.3.0-beta`
2023-07-05 13:42:46 +02:00
Tristan B. Velloza Kildaire 4611da0d1d Dub
- Required minimum version of `bformat`, `4.1.0`
2023-06-18 13:40:36 +02:00
Tristan B. Velloza Kildaire 628d8444b7 Queue
- `enqueue(TaggedMessage)` can now throw a `TristanableException` if a `FataException` with `libsnooze` occurrs
- `dequeue()` can now throw a `TristanableException` if a `FatalException` occurs during the call to `wait()` on `libsnooze`
2023-06-14 15:23:18 +02:00
Tristan B. Velloza Kildaire 6dd832f807 Exceptions
- Added enum members `DEQUEUE_FAILED` and `ENQUEUE_FAILED` to `ErrorType` enum
2023-06-13 17:50:25 +02:00
Tristan B. Velloza Kildaire ddcad89d00 Queue
- Be specific, catch `FatalException` in `enqueue(TaggedMessage)`
- Be specific, catch `InterruptedException` and `FatalException` seperately
2023-06-13 17:43:39 +02:00
Tristan B. Velloza Kildaire 3ff4519d99 Dub
- Now requires a minimum version of `libsnooze` of at least `
1.0.0-beta`
2023-06-13 17:39:05 +02:00
Tristan B. Velloza Kildaire 25775414e1
Merge pull request #7 from deavmi/nextgen_queue_remove
Ability to de-register a queue
2023-05-04 09:51:45 +02:00
Tristan B. Velloza Kildaire bbe7b344ec Merge branch 'nextgen' into nextgen_queue_remove 2023-05-04 09:47:51 +02:00
Tristan B. Velloza Kildaire e0977837af Merge branch 'master' into nextgen 2023-05-04 09:47:39 +02:00
Tristan B. Velloza Kildaire b283ebcdfc Merge branch 'nextgen' into nextgen_queue_remove 2023-05-04 09:46:15 +02:00
Tristan B. Velloza Kildaire 89fce3bae9 Unit tests
- Added a TODO
2023-05-04 09:45:26 +02:00
Tristan B. Velloza Kildaire 798acba4aa Manager
- Implemented `releaseQueue(Queue)`
- Implemented `releaseQueue_nothrow(Queue)`

Unit tests

- Added unit test for `releaseQueue(Queue)`
2023-05-04 09:45:06 +02:00
Tristan B. Velloza Kildaire abe64f7701 Manager
- Added a TODO for the future `removeQueue(Queue)` and `removeQueu_nothrow(Queue)`
2023-05-03 23:10:45 +02:00
Tristan B. Velloza Kildaire 1437895669
Merge pull request #6 from deavmi/nextgen_river
Bformat+River upgrade
2023-04-30 19:42:09 +02:00
Tristan B. Velloza Kildaire 10e230ea2f Merge branch 'nextgen' into nextgen_river 2023-04-30 19:36:50 +02:00
Tristan B. Velloza Kildaire 8d3534b216 Merge branch 'master' into nextgen 2023-04-30 19:34:27 +02:00
Tristan B. Velloza Kildaire 64163aed0a - Upgraded to new `bformat` version `4.1.0` and migrated to using `BClient` (unit tests seem to pass) 2023-04-30 19:30:11 +02:00
Tristan B. Velloza Kildaire 97ecbd86bb Watcher
- Documented method `shutdown()`
2023-04-07 12:27:14 +02:00
Tristan B. Velloza Kildaire b10807e279 Package
- Removed whitespace
2023-04-07 12:23:35 +02:00
Tristan B. Velloza Kildaire 19742d9276 Queue
- Docuemneted `getId()`
- Documented the `Queue` class
- Documented fields `event`, `queue` and `queueLock`
2023-04-07 12:22:51 +02:00
Tristan B. Velloza Kildaire 6e972b6cc5 Watcher
- Documented module `tristanable.manager.watcher`
2023-04-07 12:19:19 +02:00
Tristan B. Velloza Kildaire 97e72e09f0 Watcher
- Added documentation to the constructor for `Watcher`
2023-04-07 12:18:20 +02:00
Tristan B. Velloza Kildaire 1b6dd5d746 Manager
- Removed now-completed TODO
- Added documentation for `queuesLock`
2023-04-07 12:16:52 +02:00
Tristan B. Velloza Kildaire b7bb3df7c9 Manager
- Added documentation for `start()` and `stop()`
2023-04-07 12:16:13 +02:00
Tristan B. Velloza Kildaire 3bda88267b Exceptions
- Added missing documentation
- Fixed the message generation of the exception's message
2023-04-07 12:14:02 +02:00
Tristan B. Velloza Kildaire 07d47551ae Exceptions
- Documented `ErrorType` and all its members
- Documented `TristanableException`
2023-04-06 15:46:34 +02:00
Tristan B. Velloza Kildaire 79fee5bd7e Watcher
- Documented unittest as it is a great example of how to sue tristanable
2023-04-06 13:40:38 +02:00
Tristan B. Velloza Kildaire f811273818 Unit test
- Sleep a little longer for profiling tests
2023-04-06 13:18:20 +02:00
Tristan B. Velloza Kildaire 375a611a82 Watcher
- Added package-level accessible `startWatcher()` method which calls `start()` for us
- Added some debugging prints which will now only be compiled-in during unittest builds
- If the bformat `receiveMessage(Socket, ref byte[])` method fails (returns `false`) then exit the loop, only continue decoding if it is `true`
- Implemented package-level accesible `shutdown()` method

Manager

- `start()` now calls `watcher.startWatcher()` instead of `watcher.start()`
2023-04-06 13:13:38 +02:00
Tristan B. Velloza Kildaire 2fa77e639f Queue
- Added documentation for the constructor `this(ulong)`
- Fixed issue #5

Unit tests

- The `==` operator on strings does some normalization stuff which results in differing byte sequences and therefore inequality (see the `"Cucumber 😳️"` case), therefore casting to `byte[]`
- Send another message tagged with `69`
- Fixed comment for server code sending tagged `42` message
- Call `manager.stop()` right at the end of the unit test
2023-04-06 12:28:54 +02:00
Tristan B. Velloza Kildaire 62b16de596 Watcher
- Added stub `shutdown()` method that is intended to be called by `Manager` (package-level accessible)

Unit tests

- Sleep for 4 seconds instead of 2 before the server sends the two tagged messages
- Send a messae tagged with tag `42` before that of the one tagged with `69`
- Register a queue with tag `42`
- Remove `WaitingThread`, we now receive both tagged messages (`42` and `69`) on the unittest thread
- Added assertion to ensure the tagged message of `69` is indeed received correctly (tag AND payload)
2023-04-06 08:57:52 +02:00
Tristan B. Velloza Kildaire 9f07c06e15 Package (`manager`)
- Import the `Config` type and the `defaultConfig()` function
2023-04-06 08:51:47 +02:00
Tristan B. Velloza Kildaire 26e856b7a1 Manager
- Added support for configuring the `Manager`, the constructor now uses the default configuration
- Implemented `registerQueue_nothrow(Queue)` which returns `true` on success, `false` otherwise
- `registerQueue(Queue)` now makes a sub-call to `registerQueue_nothrow(Queue)`
- Implemented `sendMessage(TaggedMessage)`
- Added comment for assertions in unittest
2023-04-06 08:51:32 +02:00
Tristan B. Velloza Kildaire 62cc615be3 Config
- Make the default configuration generated more explicit in `defaultConfig`
- Remove initialization in `Config` (would be `false` in any case)
2023-04-06 08:49:44 +02:00
Tristan B. Velloza Kildaire af4eed748f Config
- Added new module `config`
- Added new type `Config` which is used for configuring an instance of `Manager`
- Added `defaultConfig()` which returns the default `Config` instance used
2023-04-06 08:49:02 +02:00
Tristan B. Velloza Kildaire 4632929123 Manager
- Removed empty unittest
2023-04-06 08:29:30 +02:00
Tristan B. Velloza Kildaire 6b13303c9d Manager
- Added unittest for `getUniqueQueue()`
- Typo fix
2023-04-06 08:26:01 +02:00
Tristan B. Velloza Kildaire 63698c0f87 Manager
- Implemented `getUniqueQueue()` which finds an unused tag, makes a `Queue` with said tag, registers it and then returns it
- WIP: `shutdown()` method
2023-04-05 15:35:55 +02:00
Tristan B. Velloza Kildaire cf62054eb8 Encoding
- Added module-level documentation

Exceptions

- Added module-level documentation

Queue

- Added module-level documentation

Package (`tristanable.manager`)

- Added module-level documentation
2023-04-05 08:47:51 +02:00
Tristan B. Velloza Kildaire 41ebb30754 Exceptions
- Removed unused enum member `QueueExists` of enum `ErrorType`
2023-04-05 08:46:17 +02:00
Tristan B. Velloza Kildaire c6568a566c Package
- Removed completed TODO
2023-04-05 08:45:48 +02:00
Tristan B. Velloza Kildaire c6611fc26b Watcher
- Deleted old module that was unused
2023-04-05 08:44:10 +02:00
Tristan B. Velloza Kildaire 1336a37d13 Unit test (Watcher)
- Unit test for watcher works
2023-04-05 08:40:09 +02:00
Tristan B. Velloza Kildaire f7565b6de2 Manager
- Clean up I guess
2023-04-05 08:39:50 +02:00
Tristan B. Velloza Kildaire 883949e555 Queue
- Added entrance and exit debugs for `dequeue()`
2023-04-05 08:39:35 +02:00
Tristan B. Velloza Kildaire c2e034715d Queue
- Replaced now-completed TODO with an actual comment in `dequeue()`
2023-04-05 08:33:48 +02:00
Tristan B. Velloza Kildaire eecb9fef02 Queue
- Added imports for `std.stdio` and `to` from `std.conv` to be imported when compiling in `unittest` mode
- Added documentation to `enqueue(TaggedMessage)`
- Implemented `enqueue(TaggedMessage)` using libsnooze
- Added documentation for `dequeue()`
- Implemented `dequeue()` using libsnooze
2023-04-05 08:33:08 +02:00
Tristan B. Velloza Kildaire 16bbeeece4 Manager
- Added a default queue
- `getQueue(ulong)` now calls `getQueue_nothrow(ulong)` with the same id
- Implemented `getQueue_nothrow(ulong)` which returns the `Queue` if found, `null` otherwise
- Added `getDefaultQueue()` which gets the default queue by calling `getDefaultQueue_nothrow(ulong)` with the same id
- Added `getDefaultQueue_nothrow(ulong)` which returns the default queue as a `Queue` object if it exists, else `null`
- Added `setDefaultQueue(Queue)` which sets the provided queue as the default queue (i.e. the queue where messages tagged with a tag of a queue not registered will be dumped into - if the default queue is set)

Watcher

- Set the worker thread, `watch`, in the constructor
- Added a TODO relating to checking if the socket read succeeded or not
- Added a debug print for the received `TaggedMessage` post-decode
- Extract the tag of the message and find the matching queue (potentially, if it exists)
- If the queue exists then add the `TaggedMessage` to said `Queue`
- If the queue doesn't exist then, get the so-called "Default queue", if it doesn't exist don't do anything, if it does then enqueue the message (the `TaggedMessage`) to said `Queue`

Unit test

- Added a unit test (WIP) for testing the `Manager` and `Watcher` mechanism
- Updated unittest to test the `getQueue_nothrow(ulong)` method
- Added a unit test to test adding a `Queue` with a tag that already exists in a `Queue` registered prior
2023-03-31 11:31:56 +02:00
Tristan B. Velloza Kildaire 8cf731089e Encoding
- Implemented `toString()` in `TaggedMessage`
2023-03-30 18:26:34 +02:00
Tristan B. Velloza Kildaire 4c530dd7cd - Removed `QueueItem` 2023-03-30 13:36:54 +02:00
Tristan B. Velloza Kildaire 409dacd4da Queue
- The actual queue is now an `SList!(TaggedMessage)`
- Added a stub method `enqueue(TaggedMessage)`
- Updated the stub method `dequeue()` which returns a `TaggedMessage` now
2023-03-30 13:35:09 +02:00
Tristan B. Velloza Kildaire 0d94ed0c83 Exceptions
- Added new member `NO_DEFAULT_QUEUE` to enum `ErrorType`
2023-03-30 13:33:17 +02:00
Tristan B. Velloza Kildaire 06e80eec84 Manager
- Documented `registerQueue(Queue)`
2023-03-30 12:36:52 +02:00
Tristan B. Velloza Kildaire ab07df80ea Manager
- Removed now-completed TODO in `registerQueue(QUeue)`
2023-03-30 12:35:41 +02:00
Tristan B. Velloza Kildaire a13ff17c0d Package (tristanable)
- Fixed up the `exceptions` module import
2023-03-29 16:04:51 +02:00
Tristan B. Velloza Kildaire 99c14bc699 Manager
- Changed from using D's dynamic arrays for the array of `Queue` objects to using an `SList!(T)` where `T` is the `Queue` type
- Implemented `getQueue(ulong)` which returns the `Queue` object with the matching id/tag, else throws an instance of `TristanabaleException`
- Implemented `registerQueue(Queue)` which will attempt to add the provided `Queue` given that a queue does not already exist with the provided queue's id; if that is the case then an instance of `TristanableException` is thrown

Queue

- Made the constructor take in the `ulong` queue ID
- Made the constructor publically accessible
- Implemented `getID()` which returns the `Queue`'s id as a `ulong`
- Removed the static method `newQueue(ulong)`

Unit test

- Added a unit test to test `getQueue(ulong)` when the queue cannot be found
- Added a unit test to test adding a queue and successfully retrieving it
2023-03-29 16:01:34 +02:00
Tristan B. Velloza Kildaire 454e7dd18e Watcher
- Moved TODO below already completed code

Exceptions

- Renamed `Error` to `ErrorType`
- Constructing a new `TristanableException` will now store the passed in `ErrorType`
- Added `getError()` to `TristanableException` which returns the stored `ErrorType`
- Added two new memebrs to enum `ErrorType`, namely `QUEUE_NOT_FOUND` and `QUEUE_ALREADY_EXISTS`
2023-03-29 15:57:51 +02:00
Tristan B. Velloza Kildaire 8942bd7f85 Watcher
- Added import for `bformat` and `encoding` module
- Documented `watch()`
- Added `bformat` read-and-decode `receiveMessage(Socket, ref byte[])` call followed by a `TaggedMessage.decode(byte[])` call
2023-03-27 21:58:04 +02:00
Tristan B. Velloza Kildaire e1676e2acc - Fixed formatting in `README.md` 2023-03-27 16:21:37 +02:00
Tristan B. Velloza Kildaire 6b04a0325a TaggedMessage
- Added documentation for fields `tag` and `data`
- Added documentation for both constructors
- Added documentation for `getPayload()`, `getTag()`, `setPayload(byte[])` and `setTag(ulong)`
2023-03-27 16:20:26 +02:00
Tristan B. Velloza Kildaire cd0eed6dda Encoding
- Added parameter-less (default) constructor marked as `private` to `TaggedMessage`
- Added decoding support in `decode(byte[])` which will return a new instance of `TaggedMessage`
- Added a unit test to test encoding and decoding
2023-03-27 16:02:39 +02:00
Tristan B. Velloza Kildaire ef95b15a2d Merge branch 'nextgen' of github.com:deavmi/tristanable into nextgen 2023-03-27 15:43:10 +02:00
Tristan B. Velloza Kildaire f75604eca9 - Attempt merge 2023-03-27 15:41:50 +02:00
Tristan B. Velloza Kildaire 83b2a11c80 Encoding
- Added stub class `TaggedMessage`
- Added constructor, static decoder (unimplemented), `encoder (implemented), getters and setters
- Added module `tristanable.encoding`
2023-03-27 15:40:20 +02:00
Tristan B. Velloza Kildaire f36a6ba454 Package (tristanable)
- Added an import for `TaggedMessage` from module `tristanable.encoding`
2023-03-26 18:37:22 +02:00
Tristan B. Velloza Kildaire a05bc3e2fd Manager
- Added stub `sendMessage(TaggedMessage)` which will encode into the tristanable format, then wrap into bformat and send over the socket
- Added import for `TaggedMessage` from `tristanable.encoding` module
2023-03-26 18:35:19 +02:00
Tristan B. Velloza Kildaire ed68bf7cd6 - Moved `Watcher` and `Manager` modules to their own package
- Ensured `Watcher`'s constructor is package-level accessible only

Manager

- The constructor now creates an instance of `Watcher`
- Added a `start()` method which calls `watcher.start()`
2023-03-26 18:31:52 +02:00
Tristan B. Velloza Kildaire 88432ab8d5 Manager
- Added unit test TODO
2023-03-26 18:27:24 +02:00
Tristan B. Velloza Kildaire 0d740d6231 Watcher
- Added constructor which takes in an instance of `Manager` and an instance of `Socket`
2023-03-26 18:26:07 +02:00
Tristan B. Velloza Kildaire 80d870e41a Manager
- Added field `watcher` of type `Watcher`
2023-03-26 18:24:15 +02:00
Tristan B. Velloza Kildaire de44080c6b Package (tristanable)
- Added public imports along with comments per each

Encoding

- Added a stub class, `TaggedMessage`, for encoding and decoding the tristanable byte payload

Exceptions

- Added `TristanableException` exception type along with the `Error` enum sub-type

Manager

- Added stub code for `Manager` to manage the queues and socket

Queue

- Added stub class representing a queue with a tag (`Queue`)

QueueItem

- Added stub class `QueueItem` which represents an item that is enqueued/dequeued onto a `Queue`

Watcher

- Added stub class `Watcher` which will manage the socket reading-wise
2023-03-26 18:22:15 +02:00
Tristan B. Velloza Kildaire e8454d61df - Use `https` link rather to `bformat` homepage 2023-03-26 16:49:41 +02:00
Tristan B. Velloza Kildaire a3c8b9dd9d - Updated `.gitignore` 2023-03-26 16:21:04 +02:00
Tristan B. Velloza Kildaire 4dd4199f20 - Removed executable 2023-03-26 16:20:39 +02:00
Tristan B. Velloza Kildaire a935aa65dd - Added `bformat` version `3.1.13` as dependency 2023-03-26 12:20:21 +02:00
Tristan B. Velloza Kildaire 6ea030301c - Added new logo to `README.md`
- Fixed typos in `README.md`
- Added new logo (source included)
2023-03-26 12:19:56 +02:00
16 changed files with 1371 additions and 47 deletions

1
.gitignore vendored
View File

@ -24,3 +24,4 @@ docs/
*.lst
source/tristanable/queue.d
dub.selections.json
tristanable-test-library

View File

@ -1,23 +1,23 @@
![](https://code.dlang.org/packages/tristanable/logo?s=5ef1c9f1250f57dd4c37efbf)
![](branding/logo_small.png)
tristanable
===========
[![D](https://github.com/deavmi/tristanable/actions/workflows/d.yml/badge.svg)](https://github.com/deavmi/tristanable/actions/workflows/d.yml)
**Tristanable** is a library for D-based libraries and applications that need a way to receive variable-length messages of different types (via a `Socket`) and place these messages into their own resepctively tagged queues indicated by their _"type"_ or `id`.
**Tristanable** is a library for D-based libraries and applications that need a way to receive variable-length messages of different types (via a `Socket`) and place these messages into their own respectively tagged queues indicated by their _"type"_ or `id`.
## What problems does it solve?
### Human example
Say now you made a request to a server with a tag `1` and expect a reply with that same tag `1`. Now, for a moment, think about what would happen in a tagless system. You would be expecting a reply, say now the weather report for your area, but what if the server has another thread that writes an instant messenger notification to the server's socket before the weather message is sent? Now you will inetrpret those bytes as if they were a weather message.
Say now you made a request to a server with a tag `1` and expect a reply with that same tag `1`. Now, for a moment, think about what would happen in a tagless system. You would be expecting a reply, say now the weather report for your area, but what if the server has another thread that writes an instant messenger notification to the server's socket before the weather message is sent? Now you will interpret those bytes as if they were a weather message.
Tristanable provides a way for you to receive the "IM notification first" but block and dequeue (when it arrives in the queue) for the "weather report". Irresepctoive of wether (no pun intended) the weather report arrives before the "IM notification" or after.
Tristanable provides a way for you to receive the "IM notification first" but block and dequeue (when it arrives in the queue) for the "weather report". Irrespective of wether (no pun intended) the weather report arrives before the "IM notification" or after.
### Code example
If we wanted to implement the following we would do the following. One note is that instead of waiting on messages of a specific _"type"_ (or rather **tag**), tristanable provides not just a one-message lengthb uffer per tag but infact a full queue per tag, meaning any received message with tag `1` will be enqueued and not dropped after the first message of type `1` is buffered.
If we wanted to implement the following we would do the following. One note is that instead of waiting on messages of a specific _"type"_ (or rather **tag**), tristanable provides not just a one-message length buffer per tag but in fact a full queue per tag, meaning any received message with tag `1` will be enqueued and not dropped after the first message of type `1` is buffered.
```d
import tristanable.manager;
@ -41,7 +41,7 @@ manager.addQueue(instantNotification);
QueueItem message = weatherQueue.dequeue();
```
Surely, there must be some sort of encoding mechanism too? The messages afterall need to be encoded. **No problem!**, we have that sorted:
Surely, there must be some sort of encoding mechanism too? The messages after all need to be encoded. **No problem!**, we have that sorted:
```d
import tristanable.encoding;
@ -57,7 +57,7 @@ DataMessage tristanEncoded = new DataMessage(tag, data);
socket.send(encodeForSend(tristanEncoded));
```
And let tristanable handle it! We even handle the message lengths and everything using another great project [bformat](http://deavmi.assigned.network/projects/bformat).
And let tristanable handle it! We even handle the message lengths and everything using another great project [bformat](https://deavmi.assigned.network/projects/bformat).
## Format
@ -66,8 +66,8 @@ And let tristanable handle it! We even handle the message lengths and everything
```
## Using tristanable in your D project
You can easily add the library (source-based) to your project by running the following command in your
project's root:
You can easily add the library (source-based) to your project by running the following command in your project's root:
```bash
dub add tristanable

BIN
branding/logo_base.xcf Normal file

Binary file not shown.

BIN
branding/logo_small.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.5 KiB

BIN
branding/logo_small.xcf Normal file

Binary file not shown.

View File

@ -2,12 +2,13 @@
"authors": [
"Tristan B. Velloza Kildaire"
],
"homepage": "https://deavmi.assigned.network/projects/tristanable",
"copyright": "Copyright © 2023, Tristan B. Kildaire",
"dependencies": {
"libsnooze": "0.3.3"
"bformat": ">=4.1.1",
"libsnooze": ">=1.3.0-beta"
},
"description": "Tristanable network message queuing framework",
"homepage": "https://deavmi.assigned.network/projects/tristanable",
"license": "LGPL-3.0",
"name": "tristanable",
"targetType": "library"

View File

@ -0,0 +1,221 @@
/**
* Encoding/decoding of the tristanable format
*/
module tristanable.encoding;
import std.conv : to;
/**
* Represents a tagged message that has been decoded
* from its raw byte encoding, this is a tuple of
* a numeric tag and a byte array of payload data
*
* Also provides a static method to decode from such
* raw encoding and an instance method to do the reverse
*/
public final class TaggedMessage
{
/**
* This message's tag
*/
private ulong tag;
/**
* The payload
*/
private byte[] data;
/**
* Constructs a new TaggedMessage with the given tag and payload
*
* Params:
* tag = the tag to use
* data = the payload
*/
this(ulong tag, byte[] data)
{
this.tag = tag;
this.data = data;
}
/**
* Parameterless constructor used for decoder
*/
private this() {}
/**
* Decodes the wire-formatted tristanable bytes into an instance
* of TaggedMessage whereby the tag and data can be seperately
* accessed and manipulated
*
* Params:
* encodedMessage = the wire-format encoded bytes
* Returns: an instance of TaggedMessage
*/
public static TaggedMessage decode(byte[] encodedMessage)
{
/* The decoded message */
TaggedMessage decodedMessage = new TaggedMessage();
/* The decoded tag */
ulong decodedTag;
/* If on little endian then dump direct */
version(LittleEndian)
{
decodedTag = *cast(ulong*)encodedMessage.ptr;
}
/* If on big endian then reverse received 8 bytes */
else version(BigEndian)
{
/* Base of our tag */
byte* tagHighPtr = cast(byte*)decodedTag.ptr;
*(tagHighPtr+0) = encodedMessage[7];
*(tagHighPtr+1) = encodedMessage[6];
*(tagHighPtr+2) = encodedMessage[5];
*(tagHighPtr+3) = encodedMessage[4];
*(tagHighPtr+4) = encodedMessage[3];
*(tagHighPtr+5) = encodedMessage[2];
*(tagHighPtr+6) = encodedMessage[1];
*(tagHighPtr+7) = encodedMessage[0];
}
/* Blessed is the fruit of thy womb Jesus, hail Mary, mother of God, pray for our sinners - now and at the hour of our death - Amen */
else
{
pragma(msg, "Not too sure about tha 'ey 😳️");
}
/* Set the tag */
decodedMessage.setTag(decodedTag);
/* Set the data *(9-th byte onwards) */
decodedMessage.setPayload(encodedMessage[8..$]);
return decodedMessage;
}
/**
* Encodes the tagged message into the tristanable
* wire format ready for transmission
*
* Returns: the encoded bytes
*/
public byte[] encode()
{
/* The encoded bytes */
byte[] encodedMessage;
/* If on little endian, then dump 64 bit as is - little endian */
version(LittleEndian)
{
/* Base (little first) of tag */
byte* basePtr = cast(byte*)&tag;
encodedMessage ~= *(basePtr+0);
encodedMessage ~= *(basePtr+1);
encodedMessage ~= *(basePtr+2);
encodedMessage ~= *(basePtr+3);
encodedMessage ~= *(basePtr+4);
encodedMessage ~= *(basePtr+5);
encodedMessage ~= *(basePtr+6);
encodedMessage ~= *(basePtr+7);
}
/* If on big endian, then traverse 64-bit number in reverse - and tack on */
else version(BigEndian)
{
/* Base (biggest first) of tag */
byte* highPtr = cast(byte*)&tag;
encodedMessage ~= *(highPtr+7);
encodedMessage ~= *(highPtr+6);
encodedMessage ~= *(highPtr+5);
encodedMessage ~= *(highPtr+4);
encodedMessage ~= *(highPtr+3);
encodedMessage ~= *(highPtr+2);
encodedMessage ~= *(highPtr+1);
encodedMessage ~= *(highPtr+0);
}
/* Hail marry, mother of God, pray for our sinners, now and at the our of our death Amen */
else
{
pragma(msg, "Not feeling scrumptious homeslice 😎️");
}
/* Tack on the data */
encodedMessage ~= data;
return encodedMessage;
}
/**
* Get the message's payload
*
* Returns: the payload
*/
public byte[] getPayload()
{
return data;
}
/**
* Get the message's tag
*
* Returns: the tag
*/
public ulong getTag()
{
return tag;
}
/**
* Set the message's payload
*
* Params:
* newPayload = the payload to use
*/
public void setPayload(byte[] newPayload)
{
this.data = newPayload;
}
/**
* Set the message's tag
*
* Params:
* newTag = the tag to use
*/
public void setTag(ulong newTag)
{
this.tag = newTag;
}
/**
* Returns a string representation of the TaggedMessage
*
* Returns: the string represenation
*/
public override string toString()
{
return "TMessage [Tag: "~to!(string)(tag)~", Payload: "~to!(string)(data)~"]";
}
}
/**
* Test encoding and decoding
*/
unittest
{
/* Setup testing data */
TaggedMessage testData = new TaggedMessage(420, [1,2,3]);
/* Encode */
byte[] encoded = testData.encode();
/* Decode */
TaggedMessage decoded = TaggedMessage.decode(encoded);
/* Now ensure that `decoded` == original `testData` */
assert(decoded.getTag() == testData.getTag);
assert(decoded.getPayload() == testData.getPayload());
}

View File

@ -0,0 +1,73 @@
/**
* Error handling type definitions
*/
module tristanable.exceptions;
import std.conv : to;
/**
* The type of sub-error of the `TristanableException`
*/
public enum ErrorType
{
/**
* If the requested queue could not be found
*/
QUEUE_NOT_FOUND,
/**
* If the queue wanting to be registered has already
* been registered under the same tag
*/
QUEUE_ALREADY_EXISTS,
/**
* If no default queue is configured
*/
NO_DEFAULT_QUEUE,
/**
* The blocking call to `dequeue()`, somehow, failed
*/
DEQUEUE_FAILED,
/**
* The call to `enqueue()`, somehow, failed
*/
ENQUEUE_FAILED
}
/**
* Any sort of error that occurs during runtime of the tristanable
* engine
*/
public class TristanableException : Exception
{
/**
* The sub-error type
*/
private ErrorType err;
/**
* Constructs a new `TristanableException` with the provided
* sub-error type
*
* Params:
* err = the `ErrorType`
*/
this(ErrorType err)
{
super(this.classinfo.name~": "~to!(string)(err));
this.err = err;
}
/**
* Retrieve the sub-error type
*
* Returns: the sub-error type as a `ErrorType`
*/
public ErrorType getError()
{
return err;
}
}

View File

@ -1,15 +0,0 @@
module tristanable.manager;
import tristanable.queue : Queue;
/**
* Allows one to add new queues, control
* existing ones by waiting on them etc
*/
public class Manager
{
/* Queues */
private Queue[] queues;
}

View File

@ -0,0 +1,34 @@
/**
* Configuration for the manager
*/
module tristanable.manager.config;
/**
* Manager parameters
*/
public struct Config
{
/**
* If set to true then when one uses
* `sendMessage(TaggedMessage)` the
* manager will check if a queue with
* said tag has not been registered
* and if so, then register it for
* us before encoding-and-sending
*/
public bool registerOnSend;
}
/**
* Generates the default configuration to use for
* the manager
*
* Returns: the Config
*/
public Config defaultConfig()
{
Config config;
config.registerOnSend = false;
return config;
}

View File

@ -0,0 +1,549 @@
/**
* Management of a tristanable instance
*/
module tristanable.manager.manager;
import std.socket;
import tristanable.queue : Queue;
import core.sync.mutex : Mutex;
import tristanable.manager.watcher : Watcher;
import tristanable.encoding : TaggedMessage;
import tristanable.exceptions;
import std.container.slist : SList;
import tristanable.manager.config;
import river.core;
import river.impls.sock : SockStream;
import bformat.client;
/**
* Manages a provided socket by spawning
* a watcher thread to read from it and file
* mail into the corresponding queues.
*
* Queues are managed via this an instance
* of a manager.
*/
public class Manager
{
/**
* Configuration
*/
private Config config;
/**
* The bformat client to read and write from
*/
private BClient bClient;
/**
* Currently registered queues
*/
private SList!(Queue) queues;
/**
* Lock for currently registered queues
*/
private Mutex queuesLock;
/**
* Default queue
*/
private Queue defaultQueue;
/**
* Watcher which manages the socket and
* enqueues new messages into the respective
* quueue for us
*/
private Watcher watcher;
/**
* Constructs a new manager which will read from
* this socket and file mail for us
*
* Params:
* stream = the underlying stream to use
*/
this(Stream stream, Config config = defaultConfig())
{
this.bClient = new BClient(stream);
this.queuesLock = new Mutex();
this.config = config;
this.watcher = new Watcher(this, bClient);
}
// TODO: Comment this
// This is for backwards compatibility (whereby a `Socket` was taken in)
this(Socket socket, Config config = defaultConfig())
{
this(new SockStream(socket), config);
}
/**
* Starts the management of the socket,
* resulting in queues being updated upon
* reciving messages tagged for them
*/
public void start()
{
watcher.startWatcher();
}
/**
* Stops the management of the socket, resulting
* in ending the updating of queues and closing
* the underlying connection
*/
public void stop()
{
watcher.shutdown();
}
/**
* Retrieves the queue mathcing the provided id
*
* Params:
* id = the id to lookup by
* Returns: the Queue
* Throws: TristanableException if the queue is not found
*/
public Queue getQueue(ulong id)
{
/* The found queue */
Queue queue = getQueue_nothrow(id);
/* If no queue is found then throw an error */
if(queue is null)
{
throw new TristanableException(ErrorType.QUEUE_NOT_FOUND);
}
return queue;
}
/**
* Retrieves the queue mathcing the provided id
*
* This is the nothrow version
*
* Params:
* id = the id to lookup by
* Returns: the Queue if found, null otherwise
*/
public Queue getQueue_nothrow(ulong id)
{
/* The found queue */
Queue queue;
/* Lock the queue of queues */
queuesLock.lock();
/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}
/* Search for the queue */
foreach(Queue curQueue; queues)
{
if(curQueue.getID() == id)
{
queue = curQueue;
break;
}
}
return queue;
}
/**
* Get a new queue thatis unique in its tag
* (unused/not regustered yet), register it
* and then return it
*
* Returns: the newly registered Queue
*/
public Queue getUniqueQueue()
{
/* The newly created queue */
Queue uniqueQueue;
/* Lock the queue of queues */
queuesLock.lock();
/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}
// TODO: Throw exception if all tags used
/* The unused tag */
ulong unusedTag = 0;
/* Try the current tag and ensure no queue uses it */
tagLoop: for(ulong curPotentialTag = 0; true; curPotentialTag++)
{
foreach(Queue curQueue; queues)
{
if(curQueue.getID() == curPotentialTag)
{
continue tagLoop;
}
}
/* Then we have found a unique tag */
unusedTag = curPotentialTag;
break;
}
/* Create the queue */
uniqueQueue = new Queue(unusedTag);
/* Register it */
registerQueue(uniqueQueue);
return uniqueQueue;
}
/**
* Registers the given queue with the manager
*
* Params:
* queue = the queue to register
* Throws:
* TristanableException if a queue with the provided id already exists
*/
public void registerQueue(Queue queue)
{
/* Try to register the queue */
bool status = registerQueue_nothrow(queue);
/* If registration was not successful */
if(!status)
{
throw new TristanableException(ErrorType.QUEUE_ALREADY_EXISTS);
}
}
/**
* Registers the given queue with the manager
*
* Params:
* queue = the queue to register
* Returns: true if registration was successful, false otherwise
*/
public bool registerQueue_nothrow(Queue queue)
{
/* Lock the queue of queues */
queuesLock.lock();
/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}
/* Search for the queue, throw an exception if it exists */
foreach(Queue curQueue; queues)
{
if(curQueue.getID() == queue.getID())
{
/* Registration failed */
return false;
}
}
/* Insert the queue as it does not exist */
queues.insertAfter(queues[], queue);
/* Registration was a success */
return true;
}
/**
* De-registers the given queue from the manager
*
* Params:
* queue = the queue to de-register
* Throws:
* TristanableException if a queue with the provided id cannot be found
*/
public void releaseQueue(Queue queue)
{
/* Try to de-register the queue */
bool status = releaseQueue_nothrow(queue);
/* If de-registration was not successful */
if(!status)
{
throw new TristanableException(ErrorType.QUEUE_NOT_FOUND);
}
}
/**
* De-registers the given queue from the manager
*
* Params:
* queue = the queue to de-register
* Returns: true if de-registration was successful, false otherwise
*/
public bool releaseQueue_nothrow(Queue queue)
{
/* Lock the queue of queues */
queuesLock.lock();
/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}
/* Search for the queue, return false if it does NOT exist */
foreach(Queue curQueue; queues)
{
if(curQueue.getID() == queue.getID())
{
/* Remove the queue */
queues.linearRemoveElement(queue);
/* De-registration succeeded */
return true;
}
}
/* De-registration failed */
return false;
}
/**
* Sets the default queue
*
* The default queue, when set/enabled, is the queue that will
* be used to enqueue messages that have a tag which doesn't
* match any of the normally registered queues.
*
* Please note that the ID of the queue passed in here does not
* mean anything in this context; only the queuing facilities
* of the Queue object are used
*
* Params:
* queue = the default queue to use
*/
public void setDefaultQueue(Queue queue)
{
this.defaultQueue = queue;
}
/**
* Returns the default queue
*
* Returns: the default queue
* Throws:
* TristanableException if there is no default queue
*/
public Queue getDefaultQueue()
{
/* The potential default queue */
Queue potentialDefaultQueue = getDefaultQueue_nothrow();
if(potentialDefaultQueue is null)
{
throw new TristanableException(ErrorType.NO_DEFAULT_QUEUE);
}
return potentialDefaultQueue;
}
/**
* Returns the default queue
*
* This is the nothrow version
*
* Returns: the default queue if found, null otherwise
*/
public Queue getDefaultQueue_nothrow()
{
return defaultQueue;
}
/**
* Sends the provided message over the socket
*
* Params:
* message = the TaggedMessage to send
*/
public void sendMessage(TaggedMessage message)
{
/**
* If a queue with the tag of the message does
* not exist, then register it if the config
* option was enabled
*/
if(config.registerOnSend)
{
/* Create a Queue with the tag */
Queue createdQueue = new Queue(message.getTag());
/* Attempt to register the queue */
registerQueue_nothrow(createdQueue);
}
/* Encode the message */
byte[] encodedMessage = message.encode();
/* Send it using bformat (encode-and-send) */
bClient.sendMessage(encodedMessage);
}
}
// TODO: Fix this, write it in a nicer way
// ... or make a private constructor here that
// ... does not take it in
version(unittest)
{
Socket nullSock = null;
}
/**
* Test retrieving a queue which does not
* exist
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Shouldn't be found */
try
{
manager.getQueue(69);
assert(false);
}
catch(TristanableException e)
{
assert(e.getError() == ErrorType.QUEUE_NOT_FOUND);
}
/* Shouldn't be found */
assert(manager.getQueue_nothrow(69) is null);
}
/**
* Test registering a queue and then fetching it
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Create a new queue with tag 69 */
Queue queue = new Queue(69);
try
{
/* Register the queue */
manager.registerQueue(queue);
/* Fetch the queue */
Queue fetchedQueue = manager.getQueue(69);
/* Ensure the queue we fetched is the one we stored (the references would be equal) */
assert(fetchedQueue == queue);
}
catch(TristanableException e)
{
assert(false);
}
/* Should be found */
assert(manager.getQueue_nothrow(69) !is null);
}
/**
* Tests registering a queue and then registering
* another queue with the same id
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Create a new queue with tag 69 */
Queue queue = new Queue(69);
/* Register the queue */
manager.registerQueue(queue);
try
{
/* Register the queue (try again) */
manager.registerQueue(queue);
assert(false);
}
catch(TristanableException e)
{
assert(e.getError() == ErrorType.QUEUE_ALREADY_EXISTS);
}
}
/**
* Tests registering a queue, de-registering it and
* then registering it again
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Create a new queue with tag 69 */
Queue queue = new Queue(69);
/* Register the queue */
manager.registerQueue(queue);
/* Ensure it is registered */
assert(queue == manager.getQueue(69));
/* De-register the queue */
manager.releaseQueue(queue);
/* Ensure it is de-registered */
assert(manager.getQueue_nothrow(69) is null);
/* Register the queue (again) */
manager.registerQueue(queue);
/* Ensure it is registered (again) */
assert(queue == manager.getQueue(69));
}
/**
* Tests registering a queue using the "next available queue"
* method
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Get the next 3 available queues */
Queue queue1 = manager.getUniqueQueue();
Queue queue2 = manager.getUniqueQueue();
Queue queue3 = manager.getUniqueQueue();
/* The queues should have tags [0, 1, 2] respectively */
assert(queue1.getID() == 0);
assert(queue2.getID() == 1);
assert(queue3.getID() == 2);
}
// TODO: Add testing for queue existence (internal method)

View File

@ -0,0 +1,15 @@
/**
* Interface which manages a provided socket
* and enqueuing and dequeuing of queues
*/
module tristanable.manager;
/**
* The management facilities
*/
public import tristanable.manager.manager : Manager;
/**
* Configuration for the manager
*/
public import tristanable.manager.config : Config, defaultConfig;

View File

@ -0,0 +1,258 @@
/**
* Facilitates the reading of messages from the socket,
* decoding thereof and final enqueuing thereof into their
* respective queus
*/
module tristanable.manager.watcher;
import core.thread : Thread;
import tristanable.manager.manager : Manager;
import std.socket;
import bformat;
import tristanable.encoding;
import tristanable.exceptions;
import tristanable.queue;
import bformat.client;
/**
* Watches the socket on a thread of its own,
* performs the decoding of the incoming messages
* and places them into the correct queues via
* the associated Manager instance
*/
public class Watcher : Thread
{
/**
* The associated manager to use
* such that we can place new mail
* into their respective inboxes (queues)
*/
private Manager manager;
/**
* The BClient to read from
*/
private BClient bClient;
/**
* Creates a new `Watcher` that is associated
* with the provided `Manager` such that it can
* add to its registered queues. The provided `Socket`
* is such that it can be read from and managed.
*
* Params:
* manager = the `Manager` to associate with
* bclient = the underlying `BClient` to read data from
*/
package this(Manager manager, BClient bClient)
{
this.manager = manager;
this.bClient = bClient;
super(&watch);
}
/**
* Starts the underlying thread
*/
package void startWatcher()
{
/* Start the watch method on a new thread */
start();
}
/**
* Watches the socket for incoming messages
* and decodes them on the fly, placing
* the final message in the respective queue
*/
private void watch()
{
import std.stdio;
while(true)
{
/* Do a bformat read-and-decode */
byte[] wireTristan;
version(unittest) { writeln("Before bformat recv()"); }
bool recvStatus = bClient.receiveMessage(wireTristan); // TODO: Add a check for the status of read
version(unittest) { writeln("After bformat recv()"); }
version(unittest) { writeln("bformat recv() status: ", recvStatus); }
if(recvStatus)
{
/* Decode the received bytes into a tagged message */
TaggedMessage decodedMessage = TaggedMessage.decode(wireTristan);
version(unittest) { writeln("Watcher received: ", decodedMessage); }
/* Search for the queue with the id provided */
ulong messageTag = decodedMessage.getTag();
Queue potentialQueue = manager.getQueue_nothrow(messageTag);
/* If a queue can be found */
if(potentialQueue !is null)
{
/* Enqueue the message */
potentialQueue.enqueue(decodedMessage);
}
/* If the queue if not found */
else
{
/**
* Look for a default queue, and if one is found
* then enqueue the message there. Otherwise, drop
* it by simply doing nothing.
*/
try
{
potentialQueue = manager.getDefaultQueue();
/* Enqueue the message */
potentialQueue.enqueue(decodedMessage);
}
catch(TristanableException e) {}
}
version(unittest) { writeln("drip"); }
}
/**
* If there was an error receiving on the socket.
*
* This can be either because we have shut the socket down
* or the remote end has closed the connection.
*
* In any case, exit the loop therefore ending this thread.
*/
else
{
break;
}
}
}
/**
* Shuts down the watcher, unblocks the blocking read in the loop
* resulting in the watcher thread ending
*/
package void shutdown()
{
/* Closes the bformat reader */
bClient.close();
}
}
/**
* Set up a server which will send some tagged messages to us (the client),
* where we have setup a `Manager` to watch the queues with tags `42` and `69`,
* we then dequeue some messages from both queus. Finally, we shut down the manager.
*/
unittest
{
import std.socket;
import std.stdio;
import core.thread;
Address serverAddress = parseAddress("::1", 0);
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
server.bind(serverAddress);
server.listen(0);
class ServerThread : Thread
{
this()
{
super(&worker);
}
private void worker()
{
Socket clientSocket = server.accept();
BClient bClient = new BClient(clientSocket);
Thread.sleep(dur!("seconds")(7));
writeln("Server start");
/**
* Create a tagged message to send
*
* tag 42 payload Cucumber 😳
*/
TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️");
byte[] tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 69 payload Hello
*/
message = new TaggedMessage(69, cast(byte[])"Hello");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 69 payload Bye
*/
message = new TaggedMessage(69, cast(byte[])"Bye");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
}
}
ServerThread serverThread = new ServerThread();
serverThread.start();
Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
writeln(server.localAddress);
Manager manager = new Manager(client);
Queue sixtyNine = new Queue(69);
Queue fortyTwo = new Queue(42);
manager.registerQueue(sixtyNine);
manager.registerQueue(fortyTwo);
/* Connect our socket to the server */
client.connect(server.localAddress);
/* Start the manager and let it manage the socket */
manager.start();
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
TaggedMessage dequeuedMessage = sixtyNine.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 69);
assert(dequeuedMessage.getPayload() == cast(byte[])"Hello");
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = sixtyNine.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 69);
assert(dequeuedMessage.getPayload() == cast(byte[])"Bye");
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = fortyTwo.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 42);
assert(dequeuedMessage.getPayload() == cast(byte[])"Cucumber 😳️");
/* Stop the manager */
manager.stop();
}

View File

@ -3,6 +3,23 @@
*/
module tristanable;
/**
* Interface which manages a provided socket
* and enqueuing and dequeuing of queues
*/
public import tristanable.manager;
public import tristanable.manager : Manager;
public import tristanable.queue : Queue, QueueItem;
/**
* A queue of queue items all of the same tag
*/
public import tristanable.queue : Queue;
/**
* Error handling type definitions
*/
public import tristanable.exceptions : TristanableException, ErrorType;
/**
* Encoding/decoding of the tristanable format
*/
public import tristanable.encoding : TaggedMessage;

View File

@ -1,53 +1,223 @@
/**
* A queue of queue items all of the same tag
*/
module tristanable.queue;
// TODO: Examine the below import which seemingly fixes stuff for libsnooze
import libsnooze.clib;
import libsnooze;
import core.sync.mutex : Mutex;
import core.sync.mutex : Mutex;
import std.container.slist : SList;
import tristanable.encoding;
import core.thread : dur;
import tristanable.exceptions;
version(unittest)
{
import std.stdio;
import std.conv : to;
}
/**
* Represents a queue whereby messages of a certain tag/id
* can be enqueued to (by the `Watcher`) and dequeued from
* (by the user application)
*/
public class Queue
{
/**
* Everytime a thread calls `.dequeue()` on this queue
*
* The libsnooze event used to sleep/wake
* on queue events
*/
private Event event;
private QueueItem queue;
/**
* The queue of messages
*/
private SList!(TaggedMessage) queue;
/**
* The lock for the message queue
*/
private Mutex queueLock;
/**
* This queue's unique ID
*/
private ulong queueID;
private this()
/**
* Constructs a new Queue and immediately sets up the notification
* sub-system for the calling thread (the thread constructing this
* object) which ensures that a call to dequeue will immediately
* unblock on the first message received under this tag
*
* Params:
* queueID = the id to use for this queue
*/
this(ulong queueID)
{
/* Initialize the queue lock */
this.queueLock = new Mutex();
/* Initialize the event */
this.event = new Event();
/* Set the queue id */
this.queueID = queueID;
/* Ensure pipe existence (see https://deavmi.assigned.network/git/deavmi/tristanable/issues/5) */
event.wait(dur!("seconds")(0));
}
public void dequeue()
/**
* Enqueues the provided tagged message onto this queue
* and then wakes up any thread that has called dequeue
* on this queue as well
*
* On error enqueueing a `TristanableException` will be
* thrown.
*
* Params:
* message = the TaggedMessage to enqueue
*/
public void enqueue(TaggedMessage message)
{
// TODO: Make us wait on the event (optional with a time-out)
version(unittest)
{
writeln("queue["~to!(string)(queueID)~"]: Enqueuing '"~to!(string)(message)~"'...");
}
// TODO: Lock queue
scope(exit)
{
version(unittest)
{
writeln("queue["~to!(string)(queueID)~"]: Enqueued '"~to!(string)(message)~"'!");
}
/* Unlock the item queue */
queueLock.unlock();
}
/* Lock the item queue */
queueLock.lock();
// TODO: Get item off queue
/* Add the item to the queue */
queue.insertAfter(queue[], message);
// TODO: Unlock queue
queueLock.unlock();
/* Wake up anyone wanting to dequeue from us */
try
{
// TODO: Make us wait on the event (optional with a time-out)
event.notifyAll();
}
catch(FatalException snozErr)
{
// Throw an exception on a fatal exception
throw new TristanableException(ErrorType.ENQUEUE_FAILED);
}
}
public static Queue newQueue(ulong queueID)
// TODO: Make a version of this which can time out
/**
* Blocks till a message can be dequeued from this queue
*
* On error dequeueing a `TristanableException` will be
* thrown.
*
* Returns: the dequeued TaggedMessage
*/
public TaggedMessage dequeue()
{
Queue queue;
version(unittest)
{
writeln("queue["~to!(string)(queueID)~"]: Dequeueing...");
}
// TODO: Implement me
/* The dequeued message */
TaggedMessage dequeuedMessage;
return queue;
scope(exit)
{
version(unittest)
{
writeln("queue["~to!(string)(queueID)~"]: Dequeued '"~to!(string)(dequeuedMessage)~"'!");
}
}
/* Block till we dequeue a message successfully */
while(dequeuedMessage is null)
{
/**
* Call `wait()` and catch any interrupts
* in which case loop back and call `wait()`
* again
*/
while(true)
{
try
{
// TODO: Make us wait on the event (optional with a time-out)
event.wait();
}
catch(InterruptedException e)
{
version(unittest)
{
import std.stdio;
writeln("dequeue() had libsnooze wait() get interrupted!");
}
// Retry the wait()
continue;
}
catch(FatalException fatalErr)
{
version(unittest)
{
import std.stdio;
writeln("dequeue() had libsnooze wait() get FATALLY fail! Exception will now throw...");
}
// Throw an exception on a fatal exception
throw new TristanableException(ErrorType.DEQUEUE_FAILED);
}
// On successful wait() wake-up exit this wait()-retry loop
break;
}
/* Lock the item queue */
queueLock.lock();
/* Consume the front of the queue (if non-empty) */
if(!queue.empty())
{
/* Pop the front item off */
dequeuedMessage = queue.front();
/* Remove the front item from the queue */
queue.linearRemoveElement(dequeuedMessage);
}
/* Unlock the item queue */
queueLock.unlock();
}
return dequeuedMessage;
}
}
public class QueueItem
{
/**
* Get the id/tag of this queue
*
* Returns: the queue's id
*/
public ulong getID()
{
return queueID;
}
}

Binary file not shown.