Compare commits

...

16 Commits

Author SHA1 Message Date
Tristan B. Velloza Kildaire bfee652a94 Merge branch 'master' into nextgen_listener 2023-10-02 15:52:19 +02:00
Tristan B. Velloza Kildaire b24c3368b3
Nextgen: Tristanable v2 (#4)
* - Added new logo to `README.md`
- Fixed typos in `README.md`
- Added new logo (source included)

* - Added `bformat` version `3.1.13` as dependency

* - Removed executable

* - Updated `.gitignore`

* - Use `https` link rather to `bformat` homepage

* Package (tristanable)

- Added public imports along with comments per each

Encoding

- Added a stub class, `TaggedMessage`, for encoding and decoding the tristanable byte payload

Exceptions

- Added `TristanableException` exception type along with the `Error` enum sub-type

Manager

- Added stub code for `Manager` to manage the queues and socket

Queue

- Added stub class representing a queue with a tag (`Queue`)

QueueItem

- Added stub class `QueueItem` which represents an item that is enqueued/dequeued onto a `Queue`

Watcher

- Added stub class `Watcher` which will manage the socket reading-wise

* Manager

- Added field `watcher` of type `Watcher`

* Watcher

- Added constructor which takes in an instance of `Manager` and an instance of `Socket`

* Manager

- Added unit test TODO

* - Moved `Watcher` and `Manager` modules to their own package
- Ensured `Watcher`'s constructor is package-level accessible only

Manager

- The constructor now creates an instance of `Watcher`
- Added a `start()` method which calls `watcher.start()`

* Manager

- Added stub `sendMessage(TaggedMessage)` which will encode into the tristanable format, then wrap into bformat and send over the socket
- Added import for `TaggedMessage` from `tristanable.encoding` module

* Package (tristanable)

- Added an import for `TaggedMessage` from module `tristanable.encoding`

* Encoding

- Added stub class `TaggedMessage`
- Added constructor, static decoder (unimplemented), `encoder (implemented), getters and setters
- Added module `tristanable.encoding`

* - Attempt merge

* Encoding

- Added parameter-less (default) constructor marked as `private` to `TaggedMessage`
- Added decoding support in `decode(byte[])` which will return a new instance of `TaggedMessage`
- Added a unit test to test encoding and decoding

* TaggedMessage

- Added documentation for fields `tag` and `data`
- Added documentation for both constructors
- Added documentation for `getPayload()`, `getTag()`, `setPayload(byte[])` and `setTag(ulong)`

* - Fixed formatting in `README.md`

* Watcher

- Added import for `bformat` and `encoding` module
- Documented `watch()`
- Added `bformat` read-and-decode `receiveMessage(Socket, ref byte[])` call followed by a `TaggedMessage.decode(byte[])` call

* Watcher

- Moved TODO below already completed code

Exceptions

- Renamed `Error` to `ErrorType`
- Constructing a new `TristanableException` will now store the passed in `ErrorType`
- Added `getError()` to `TristanableException` which returns the stored `ErrorType`
- Added two new memebrs to enum `ErrorType`, namely `QUEUE_NOT_FOUND` and `QUEUE_ALREADY_EXISTS`

* Manager

- Changed from using D's dynamic arrays for the array of `Queue` objects to using an `SList!(T)` where `T` is the `Queue` type
- Implemented `getQueue(ulong)` which returns the `Queue` object with the matching id/tag, else throws an instance of `TristanabaleException`
- Implemented `registerQueue(Queue)` which will attempt to add the provided `Queue` given that a queue does not already exist with the provided queue's id; if that is the case then an instance of `TristanableException` is thrown

Queue

- Made the constructor take in the `ulong` queue ID
- Made the constructor publically accessible
- Implemented `getID()` which returns the `Queue`'s id as a `ulong`
- Removed the static method `newQueue(ulong)`

Unit test

- Added a unit test to test `getQueue(ulong)` when the queue cannot be found
- Added a unit test to test adding a queue and successfully retrieving it

* Package (tristanable)

- Fixed up the `exceptions` module import

* Manager

- Removed now-completed TODO in `registerQueue(QUeue)`

* Manager

- Documented `registerQueue(Queue)`

* Exceptions

- Added new member `NO_DEFAULT_QUEUE` to enum `ErrorType`

* Queue

- The actual queue is now an `SList!(TaggedMessage)`
- Added a stub method `enqueue(TaggedMessage)`
- Updated the stub method `dequeue()` which returns a `TaggedMessage` now

* - Removed `QueueItem`

* Encoding

- Implemented `toString()` in `TaggedMessage`

* Manager

- Added a default queue
- `getQueue(ulong)` now calls `getQueue_nothrow(ulong)` with the same id
- Implemented `getQueue_nothrow(ulong)` which returns the `Queue` if found, `null` otherwise
- Added `getDefaultQueue()` which gets the default queue by calling `getDefaultQueue_nothrow(ulong)` with the same id
- Added `getDefaultQueue_nothrow(ulong)` which returns the default queue as a `Queue` object if it exists, else `null`
- Added `setDefaultQueue(Queue)` which sets the provided queue as the default queue (i.e. the queue where messages tagged with a tag of a queue not registered will be dumped into - if the default queue is set)

Watcher

- Set the worker thread, `watch`, in the constructor
- Added a TODO relating to checking if the socket read succeeded or not
- Added a debug print for the received `TaggedMessage` post-decode
- Extract the tag of the message and find the matching queue (potentially, if it exists)
- If the queue exists then add the `TaggedMessage` to said `Queue`
- If the queue doesn't exist then, get the so-called "Default queue", if it doesn't exist don't do anything, if it does then enqueue the message (the `TaggedMessage`) to said `Queue`

Unit test

- Added a unit test (WIP) for testing the `Manager` and `Watcher` mechanism
- Updated unittest to test the `getQueue_nothrow(ulong)` method
- Added a unit test to test adding a `Queue` with a tag that already exists in a `Queue` registered prior

* Queue

- Added imports for `std.stdio` and `to` from `std.conv` to be imported when compiling in `unittest` mode
- Added documentation to `enqueue(TaggedMessage)`
- Implemented `enqueue(TaggedMessage)` using libsnooze
- Added documentation for `dequeue()`
- Implemented `dequeue()` using libsnooze

* Queue

- Replaced now-completed TODO with an actual comment in `dequeue()`

* Queue

- Added entrance and exit debugs for `dequeue()`

* Manager

- Clean up I guess

* Unit test (Watcher)

- Unit test for watcher works

* Watcher

- Deleted old module that was unused

* Package

- Removed completed TODO

* Exceptions

- Removed unused enum member `QueueExists` of enum `ErrorType`

* Encoding

- Added module-level documentation

Exceptions

- Added module-level documentation

Queue

- Added module-level documentation

Package (`tristanable.manager`)

- Added module-level documentation

* Manager

- Implemented `getUniqueQueue()` which finds an unused tag, makes a `Queue` with said tag, registers it and then returns it
- WIP: `shutdown()` method

* Manager

- Added unittest for `getUniqueQueue()`
- Typo fix

* Manager

- Removed empty unittest

* Config

- Added new module `config`
- Added new type `Config` which is used for configuring an instance of `Manager`
- Added `defaultConfig()` which returns the default `Config` instance used

* Config

- Make the default configuration generated more explicit in `defaultConfig`
- Remove initialization in `Config` (would be `false` in any case)

* Manager

- Added support for configuring the `Manager`, the constructor now uses the default configuration
- Implemented `registerQueue_nothrow(Queue)` which returns `true` on success, `false` otherwise
- `registerQueue(Queue)` now makes a sub-call to `registerQueue_nothrow(Queue)`
- Implemented `sendMessage(TaggedMessage)`
- Added comment for assertions in unittest

* Package (`manager`)

- Import the `Config` type and the `defaultConfig()` function

* Watcher

- Added stub `shutdown()` method that is intended to be called by `Manager` (package-level accessible)

Unit tests

- Sleep for 4 seconds instead of 2 before the server sends the two tagged messages
- Send a messae tagged with tag `42` before that of the one tagged with `69`
- Register a queue with tag `42`
- Remove `WaitingThread`, we now receive both tagged messages (`42` and `69`) on the unittest thread
- Added assertion to ensure the tagged message of `69` is indeed received correctly (tag AND payload)

* Queue

- Added documentation for the constructor `this(ulong)`
- Fixed issue #5

Unit tests

- The `==` operator on strings does some normalization stuff which results in differing byte sequences and therefore inequality (see the `"Cucumber 😳️"` case), therefore casting to `byte[]`
- Send another message tagged with `69`
- Fixed comment for server code sending tagged `42` message
- Call `manager.stop()` right at the end of the unit test

* Watcher

- Added package-level accessible `startWatcher()` method which calls `start()` for us
- Added some debugging prints which will now only be compiled-in during unittest builds
- If the bformat `receiveMessage(Socket, ref byte[])` method fails (returns `false`) then exit the loop, only continue decoding if it is `true`
- Implemented package-level accesible `shutdown()` method

Manager

- `start()` now calls `watcher.startWatcher()` instead of `watcher.start()`

* Unit test

- Sleep a little longer for profiling tests

* Watcher

- Documented unittest as it is a great example of how to sue tristanable

* Exceptions

- Documented `ErrorType` and all its members
- Documented `TristanableException`

* Exceptions

- Added missing documentation
- Fixed the message generation of the exception's message

* Manager

- Added documentation for `start()` and `stop()`

* Manager

- Removed now-completed TODO
- Added documentation for `queuesLock`

* Watcher

- Added documentation to the constructor for `Watcher`

* Watcher

- Documented module `tristanable.manager.watcher`

* Queue

- Docuemneted `getId()`
- Documented the `Queue` class
- Documented fields `event`, `queue` and `queueLock`

* Package

- Removed whitespace

* Watcher

- Documented method `shutdown()`

* - Upgraded to new `bformat` version `4.1.0` and migrated to using `BClient` (unit tests seem to pass)

* Manager

- Added a TODO for the future `removeQueue(Queue)` and `removeQueu_nothrow(Queue)`

* Manager

- Implemented `releaseQueue(Queue)`
- Implemented `releaseQueue_nothrow(Queue)`

Unit tests

- Added unit test for `releaseQueue(Queue)`

* Unit tests

- Added a TODO

* Dub

- Now requires a minimum version of `libsnooze` of at least `
1.0.0-beta`

* Queue

- Be specific, catch `FatalException` in `enqueue(TaggedMessage)`
- Be specific, catch `InterruptedException` and `FatalException` seperately

* Exceptions

- Added enum members `DEQUEUE_FAILED` and `ENQUEUE_FAILED` to `ErrorType` enum

* Queue

- `enqueue(TaggedMessage)` can now throw a `TristanableException` if a `FataException` with `libsnooze` occurrs
- `dequeue()` can now throw a `TristanableException` if a `FatalException` occurs during the call to `wait()` on `libsnooze`

* Dub

- Required minimum version of `bformat`, `4.1.0`

* Dub

- Upgraded `libsnooze` to version `1.3.0-beta`

* Migrate from libsnooze (#8)

* Dub

- Removed `libsnooze` dependency

* Queue

- Removed `libsnooze` imports

* Queue

- Added mutex+condition variable

* Queue

- Removed old `ensure()` call

* Queue

- Switched one thing over to mutex+condvar

* Queue

- Switched to using condition variable
- Added configurable slumber interval

* Queue

- Removed TODOs which are irrevevant for now

* Queue

- Removed `TListener` references

Everything else

- Removed reference to old/duplicate `queue.d` module

* Hotfix/niknaks (#9)

* Dub

- Added `niknaks` package with a minimum version of `v0.3.0`

* Encoding

- Switched to niknaks for `decode()`

* Encoding

- `encode()` now uses niknaks

* Watcher (unit tests)

- Added testing for default queue
2023-10-02 15:49:54 +02:00
Tristan B. Velloza Kildaire d22a4fa6ad Merge branch 'nextgen' into nextgen_listener 2023-10-02 14:46:51 +02:00
Tristan B. Velloza Kildaire 5cafbb8130 Queue
- Removed `TListener` references

Everything else

- Removed reference to old/duplicate `queue.d` module
2023-10-02 14:43:43 +02:00
Tristan B. Velloza Kildaire be91f5ffc5 Queue
- Removed TODOs which are irrevevant for now
2023-10-02 14:40:44 +02:00
Tristan B. Velloza Kildaire c5c01c32e1 Merge branch 'nextgen' into nextgen_listener 2023-10-02 14:38:51 +02:00
Tristan B. Velloza Kildaire 42c6b111c8
Migrate from libsnooze (#8)
* Dub

- Removed `libsnooze` dependency

* Queue

- Removed `libsnooze` imports

* Queue

- Added mutex+condition variable

* Queue

- Removed old `ensure()` call

* Queue

- Switched one thing over to mutex+condvar

* Queue

- Switched to using condition variable
- Added configurable slumber interval
2023-10-01 20:51:08 +02:00
Tristan B. Velloza Kildaire 761ddb2d1e Merge branch 'master' into nextgen 2023-10-01 19:14:57 +02:00
Tristan B. Velloza Kildaire a23017a747 README
- Added badges
2023-10-01 19:14:16 +02:00
Tristan B. Velloza Kildaire 459f4a8709 Pipelines
- Added code coverage
2023-10-01 19:10:57 +02:00
Tristan B. Velloza Kildaire 7ca6489b58 Dub
- Upgraded `libsnooze` to version `1.3.0-beta`
2023-07-05 13:42:46 +02:00
Tristan B. Velloza Kildaire 4611da0d1d Dub
- Required minimum version of `bformat`, `4.1.0`
2023-06-18 13:40:36 +02:00
Tristan B. Velloza Kildaire 628d8444b7 Queue
- `enqueue(TaggedMessage)` can now throw a `TristanableException` if a `FataException` with `libsnooze` occurrs
- `dequeue()` can now throw a `TristanableException` if a `FatalException` occurs during the call to `wait()` on `libsnooze`
2023-06-14 15:23:18 +02:00
Tristan B. Velloza Kildaire 6dd832f807 Exceptions
- Added enum members `DEQUEUE_FAILED` and `ENQUEUE_FAILED` to `ErrorType` enum
2023-06-13 17:50:25 +02:00
Tristan B. Velloza Kildaire ddcad89d00 Queue
- Be specific, catch `FatalException` in `enqueue(TaggedMessage)`
- Be specific, catch `InterruptedException` and `FatalException` seperately
2023-06-13 17:43:39 +02:00
Tristan B. Velloza Kildaire 3ff4519d99 Dub
- Now requires a minimum version of `libsnooze` of at least `
1.0.0-beta`
2023-06-13 17:39:05 +02:00
9 changed files with 147 additions and 87 deletions

View File

@ -6,9 +6,9 @@ name: D
on:
push:
branches: [ "master" ]
branches: [ "**" ]
pull_request:
branches: [ "master", "nextgen" ]
branches: [ "**" ]
permissions:
contents: read
@ -22,6 +22,11 @@ jobs:
- uses: actions/checkout@v3
- uses: dlang-community/setup-dlang@4c99aa991ce7d19dd3064de0a4f2f6b2f152e2d7
- name: Install Doveralls (code coverage tool)
run: |
dub fetch doveralls
sudo apt install libcurl4-openssl-dev
- name: 'Build & Test'
run: |
# Build the project, with its main file included, without unittests
@ -29,4 +34,8 @@ jobs:
# Build and run tests, as defined by `unittest` configuration
# In this mode, `mainSourceFile` is excluded and `version (unittest)` are included
# See https://dub.pm/package-format-json.html#configurations
dub test --compiler=$DC
dub test --compiler=$DC --coverage
- name: Coverage upload
run: |
dub run doveralls -- -t ${{secrets.COVERALLS_REPO_TOKEN}}

View File

@ -3,7 +3,8 @@
tristanable
===========
[![D](https://github.com/deavmi/tristanable/actions/workflows/d.yml/badge.svg)](https://github.com/deavmi/tristanable/actions/workflows/d.yml)
[![D](https://github.com/deavmi/tristanable/actions/workflows/d.yml/badge.svg)](https://github.com/deavmi/tristanable/actions/workflows/d.yml) ![DUB](https://img.shields.io/dub/v/tristanable?color=%23c10000ff%20&style=flat-square) ![DUB](https://img.shields.io/dub/dt/tristanable?style=flat-square) ![DUB](https://img.shields.io/dub/l/tristanable?style=flat-square) [![Coverage Status](https://coveralls.io/repos/github/deavmi/tristanable/badge.svg?branch=master)](https://coveralls.io/github/deavmi/tristanable?branch=master)
**Tristanable** is a library for D-based libraries and applications that need a way to receive variable-length messages of different types (via a `Socket`) and place these messages into their own respectively tagged queues indicated by their _"type"_ or `id`.

View File

@ -4,8 +4,8 @@
],
"copyright": "Copyright © 2023, Tristan B. Kildaire",
"dependencies": {
"bformat": "4.1.0",
"libsnooze": "0.3.3"
"bformat": ">=4.1.1",
"niknaks": ">=0.3.0"
},
"description": "Tristanable network message queuing framework",
"homepage": "https://deavmi.assigned.network/projects/tristanable",

View File

@ -4,6 +4,7 @@
module tristanable.encoding;
import std.conv : to;
import niknaks.bits : bytesToIntegral, Order, order, toBytes;
/**
* Represents a tagged message that has been decoded
@ -60,31 +61,9 @@ public final class TaggedMessage
/* The decoded tag */
ulong decodedTag;
/* If on little endian then dump direct */
version(LittleEndian)
{
decodedTag = *cast(ulong*)encodedMessage.ptr;
}
/* If on big endian then reverse received 8 bytes */
else version(BigEndian)
{
/* Base of our tag */
byte* tagHighPtr = cast(byte*)decodedTag.ptr;
*(tagHighPtr+0) = encodedMessage[7];
*(tagHighPtr+1) = encodedMessage[6];
*(tagHighPtr+2) = encodedMessage[5];
*(tagHighPtr+3) = encodedMessage[4];
*(tagHighPtr+4) = encodedMessage[3];
*(tagHighPtr+5) = encodedMessage[2];
*(tagHighPtr+6) = encodedMessage[1];
*(tagHighPtr+7) = encodedMessage[0];
}
/* Blessed is the fruit of thy womb Jesus, hail Mary, mother of God, pray for our sinners - now and at the hour of our death - Amen */
else
{
pragma(msg, "Not too sure about tha 'ey 😳️");
}
/* Take ulong-many bytes and only flip them to LE if not on LE host */
decodedTag = order(bytesToIntegral!(ushort)(cast(ubyte[])encodedMessage), Order.LE);
/* Set the tag */
decodedMessage.setTag(decodedTag);
@ -106,41 +85,9 @@ public final class TaggedMessage
/* The encoded bytes */
byte[] encodedMessage;
/* If on little endian, then dump 64 bit as is - little endian */
version(LittleEndian)
{
/* Base (little first) of tag */
byte* basePtr = cast(byte*)&tag;
/* If on little endian then no re-order, if host is BE flip (the tag) */
encodedMessage ~= toBytes(order(tag, Order.LE));
encodedMessage ~= *(basePtr+0);
encodedMessage ~= *(basePtr+1);
encodedMessage ~= *(basePtr+2);
encodedMessage ~= *(basePtr+3);
encodedMessage ~= *(basePtr+4);
encodedMessage ~= *(basePtr+5);
encodedMessage ~= *(basePtr+6);
encodedMessage ~= *(basePtr+7);
}
/* If on big endian, then traverse 64-bit number in reverse - and tack on */
else version(BigEndian)
{
/* Base (biggest first) of tag */
byte* highPtr = cast(byte*)&tag;
encodedMessage ~= *(highPtr+7);
encodedMessage ~= *(highPtr+6);
encodedMessage ~= *(highPtr+5);
encodedMessage ~= *(highPtr+4);
encodedMessage ~= *(highPtr+3);
encodedMessage ~= *(highPtr+2);
encodedMessage ~= *(highPtr+1);
encodedMessage ~= *(highPtr+0);
}
/* Hail marry, mother of God, pray for our sinners, now and at the our of our death Amen */
else
{
pragma(msg, "Not feeling scrumptious homeslice 😎️");
}
/* Tack on the data */
encodedMessage ~= data;

View File

@ -24,7 +24,17 @@ public enum ErrorType
/**
* If no default queue is configured
*/
NO_DEFAULT_QUEUE
NO_DEFAULT_QUEUE,
/**
* The blocking call to `dequeue()`, somehow, failed
*/
DEQUEUE_FAILED,
/**
* The call to `enqueue()`, somehow, failed
*/
ENQUEUE_FAILED
}
/**

View File

@ -4,7 +4,7 @@
module tristanable.manager.manager;
import std.socket;
import tristanable.queue : Queue;
import tristanable.queue.queue : Queue;
import core.sync.mutex : Mutex;
import tristanable.manager.watcher : Watcher;
import tristanable.encoding : TaggedMessage;

View File

@ -11,7 +11,7 @@ import std.socket;
import bformat;
import tristanable.encoding;
import tristanable.exceptions;
import tristanable.queue;
import tristanable.queue.queue;
import bformat.client;
/**
@ -204,6 +204,28 @@ unittest
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 100 payload Bye
*/
message = new TaggedMessage(100, cast(byte[])"DEFQUEUE_1");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 200 payload Bye
*/
message = new TaggedMessage(200, cast(byte[])"DEFQUEUE_2");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
}
}
@ -223,6 +245,10 @@ unittest
manager.registerQueue(sixtyNine);
manager.registerQueue(fortyTwo);
// Register a default queue (tag ignored)
Queue defaultQueue = new Queue(2332);
manager.setDefaultQueue(defaultQueue);
/* Connect our socket to the server */
client.connect(server.localAddress);
@ -252,6 +278,19 @@ unittest
assert(dequeuedMessage.getPayload() == cast(byte[])"Cucumber 😳️");
/* Dequeue two messages from the default queue */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = defaultQueue.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 100);
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_1");
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = defaultQueue.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 200);
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_2");
/* Stop the manager */
manager.stop();

View File

@ -11,9 +11,8 @@ public import tristanable.manager;
/**
* A queue of queue items all of the same tag
* and queue-related facilities
*/
public import tristanable.queue;
public import tristanable.queue.queue : Queue;
/**
* Error handling type definitions

View File

@ -5,14 +5,13 @@ module tristanable.queue.queue;
import tristanable.queue.listener : TListener;
// TODO: Examine the below import which seemingly fixes stuff for libsnooze
import libsnooze.clib;
import libsnooze;
import core.sync.mutex : Mutex;
import core.sync.condition : Condition;
import core.sync.exception : SyncError;
import std.container.slist : SList;
import tristanable.encoding;
import core.thread : dur;
import core.time : Duration, dur;
import tristanable.exceptions;
version(unittest)
{
@ -35,8 +34,15 @@ public class Queue
/**
* The libsnooze event used to sleep/wake
* on queue events
* Mutex for the condition variable
*/
private Event event;
private Mutex mutex;
/**
* The condition variable used to sleep/wake
* on queue of events
*/
private Condition signal;
/**
* The queue of messages
@ -60,6 +66,15 @@ public class Queue
// TODO: Add listener add/remove methods
// TODO: On queue actions add a notificaiton call to the listeners
/**
* If a message is enqueued prior
* to us sleeping then we won't
* wake up and return for it.
*
* Therefore a periodic wakeup
* is required.
*/
private Duration wakeInterval;
/**
* Constructs a new Queue and immediately sets up the notification
@ -75,14 +90,37 @@ public class Queue
/* Initialize the queue lock */
this.queueLock = new Mutex();
/* Initialize the event */
this.event = new Event();
/* Initialize the condition variable */
this.mutex = new Mutex();
this.signal = new Condition(this.mutex);
/* Set the queue id */
this.queueID = queueID;
/* Ensure pipe existence (see https://deavmi.assigned.network/git/deavmi/tristanable/issues/5) */
event.wait(dur!("seconds")(0));
/* Set the slumber interval */
this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value
}
/**
* Returns the current wake interval
* for the queue checker
*
* Returns: the `Duration`
*/
public Duration getWakeInterval()
{
return this.wakeInterval;
}
/**
* Sets the wake up interval
*
* Params:
* interval = the new interval
*/
public void setWakeInterval(Duration interval)
{
this.wakeInterval = interval;
}
/**
@ -90,6 +128,9 @@ public class Queue
* and then wakes up any thread that has called dequeue
* on this queue as well
*
* On error enqueueing a `TristanableException` will be
* thrown.
*
* Params:
* message = the TaggedMessage to enqueue
*/
@ -121,11 +162,12 @@ public class Queue
try
{
// TODO: Make us wait on the event (optional with a time-out)
event.notifyAll();
signal.notifyAll();
}
catch(SnoozeError snozErr)
catch(SyncError snozErr)
{
// TODO: Add error handling for libsnooze exceptions here
// Throw an exception on a fatal exception
throw new TristanableException(ErrorType.ENQUEUE_FAILED);
}
}
@ -134,6 +176,9 @@ public class Queue
/**
* Blocks till a message can be dequeued from this queue
*
* On error dequeueing a `TristanableException` will be
* thrown.
*
* Returns: the dequeued TaggedMessage
*/
public TaggedMessage dequeue()
@ -157,16 +202,26 @@ public class Queue
/* Block till we dequeue a message successfully */
while(dequeuedMessage is null)
{
scope(exit)
{
// Unlock the mutex
this.mutex.unlock();
}
// Lock the mutex
this.mutex.lock();
try
{
// TODO: Make us wait on the event (optional with a time-out)
event.wait();
this.signal.wait(this.wakeInterval);
}
catch(SnoozeError snozErr)
catch(SyncError e)
{
// TODO: Add error handling for libsnooze exceptions here
// Throw an exception on a fatal exception
throw new TristanableException(ErrorType.DEQUEUE_FAILED);
}
/* Lock the item queue */
queueLock.lock();