2020-09-29 17:13:36 +00:00
|
|
|
module tristanable.watcher;
|
2020-09-29 09:57:25 +00:00
|
|
|
|
2022-05-10 13:21:51 +00:00
|
|
|
import std.socket : Socket, SocketSet;
|
2020-09-29 17:18:53 +00:00
|
|
|
import core.sync.mutex : Mutex;
|
|
|
|
import bmessage : receiveMessage;
|
|
|
|
import tristanable.queue : Queue;
|
|
|
|
import tristanable.queueitem : QueueItem;
|
|
|
|
import tristanable.manager : Manager;
|
2022-05-10 13:22:54 +00:00
|
|
|
import core.thread : Thread, Duration, dur;
|
2020-09-29 17:18:53 +00:00
|
|
|
import tristanable.encoding;
|
2021-09-08 18:24:45 +00:00
|
|
|
import tristanable.exceptions;
|
2020-09-29 17:18:53 +00:00
|
|
|
|
2020-09-29 09:57:25 +00:00
|
|
|
public final class Watcher : Thread
|
|
|
|
{
|
2020-09-29 17:13:36 +00:00
|
|
|
/* The manager */
|
|
|
|
private Manager manager;
|
|
|
|
|
|
|
|
/* The socket to read from */
|
2022-05-10 13:27:39 +00:00
|
|
|
private Socket endpoint;
|
2020-09-29 17:13:36 +00:00
|
|
|
|
2021-09-08 18:24:45 +00:00
|
|
|
private bool running;
|
2021-09-08 09:19:05 +00:00
|
|
|
|
2022-05-17 11:44:26 +00:00
|
|
|
|
|
|
|
private bool newSys;
|
2022-05-10 13:21:51 +00:00
|
|
|
private SocketSet socketSetR, socketSetW, socketSetE;
|
|
|
|
|
2022-05-10 13:22:54 +00:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Timeout for select()
|
|
|
|
*/
|
|
|
|
private Duration timeOut;
|
|
|
|
|
2022-05-17 11:44:26 +00:00
|
|
|
this(Manager manager, Socket endpoint, Duration timeOut = dur!("msecs")(100), bool newSys = false)
|
2020-09-29 09:57:25 +00:00
|
|
|
{
|
2020-09-29 17:13:36 +00:00
|
|
|
super(&run);
|
|
|
|
this.manager = manager;
|
2022-05-10 13:27:39 +00:00
|
|
|
this.endpoint = endpoint;
|
2020-10-16 15:11:32 +00:00
|
|
|
|
2022-05-17 11:44:26 +00:00
|
|
|
|
|
|
|
this.newSys = newSys;
|
|
|
|
if(newSys)
|
|
|
|
{
|
|
|
|
initSelect();
|
|
|
|
}
|
|
|
|
|
2022-05-10 13:21:51 +00:00
|
|
|
|
2022-05-10 13:22:54 +00:00
|
|
|
this.timeOut = timeOut;
|
|
|
|
|
2021-09-08 18:54:34 +00:00
|
|
|
running = true;
|
2020-10-16 15:11:32 +00:00
|
|
|
start();
|
2020-09-29 09:57:25 +00:00
|
|
|
}
|
|
|
|
|
2022-05-10 13:21:51 +00:00
|
|
|
/**
|
|
|
|
* Initializes the SocketSet which is needed for the use
|
|
|
|
* of the select() method0
|
|
|
|
*/
|
|
|
|
private void initSelect()
|
|
|
|
{
|
|
|
|
socketSetR = new SocketSet();
|
|
|
|
socketSetW = new SocketSet();
|
|
|
|
socketSetE = new SocketSet();
|
|
|
|
}
|
|
|
|
|
2021-09-08 09:19:05 +00:00
|
|
|
public void shutdown()
|
|
|
|
{
|
2021-09-08 18:24:45 +00:00
|
|
|
running=false;
|
|
|
|
|
2021-09-08 09:19:05 +00:00
|
|
|
/* Close the socket, causing an error, breaking the event loop */
|
2022-05-10 13:27:39 +00:00
|
|
|
endpoint.close();
|
2021-09-08 18:24:45 +00:00
|
|
|
|
2021-09-08 09:19:05 +00:00
|
|
|
}
|
|
|
|
|
2020-09-29 09:57:25 +00:00
|
|
|
private void run()
|
|
|
|
{
|
|
|
|
/* Continuously dequeue tristanable packets from socket */
|
|
|
|
while(true)
|
|
|
|
{
|
|
|
|
/* Receive payload (tag+data) */
|
|
|
|
byte[] receivedPayload;
|
|
|
|
|
2022-05-10 13:25:05 +00:00
|
|
|
|
2022-05-17 11:44:26 +00:00
|
|
|
if(newSys)
|
|
|
|
{
|
|
|
|
/**
|
|
|
|
* We want to check the readable status of the `endpoint` socket, we use
|
|
|
|
* the `select()` function for this. However, after selecting it will need
|
|
|
|
* to be re-added if you want to check again. Example, if you add it to
|
|
|
|
* the `socketSetR` (the readable-socketset) then if we time out or it
|
|
|
|
* is not readable from it will be removed from said set.
|
|
|
|
*
|
|
|
|
* Therefore we will need to add it back again for our next check (via
|
|
|
|
* calling `select()`)
|
|
|
|
*/
|
|
|
|
socketSetR.add(endpoint);
|
|
|
|
int status = Socket.select(socketSetR, null, null, timeOut);
|
|
|
|
|
|
|
|
import std.stdio : writeln;
|
|
|
|
|
|
|
|
/* If we timed out on the select() */
|
|
|
|
if(status == 0)
|
|
|
|
{
|
|
|
|
/* Check if we need to exit */
|
|
|
|
writeln("We got 0");
|
|
|
|
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
/* Interrupt */
|
|
|
|
else if (status == -1)
|
|
|
|
{
|
|
|
|
/* TODO: Not sure what we should do here */
|
|
|
|
writeln("We got -1");
|
|
|
|
|
|
|
|
import core.stdc.errno;
|
|
|
|
writeln(errno);
|
|
|
|
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
/* Either data is available or a network occurred */
|
|
|
|
else
|
|
|
|
{
|
|
|
|
writeln("Info: ", endpoint.isAlive);
|
|
|
|
writeln("info: ", endpoint.handle);
|
|
|
|
|
|
|
|
writeln("We got ", status);
|
|
|
|
|
|
|
|
|
|
|
|
/* If the socket is still connected */
|
|
|
|
if(endpoint.isAlive())
|
|
|
|
{
|
|
|
|
/* If we have data */
|
|
|
|
if(socketSetR.isSet(endpoint))
|
|
|
|
{
|
|
|
|
/* Do nothing (fall through) */
|
|
|
|
writeln("We got ready socket");
|
|
|
|
|
|
|
|
/* I don't want to do mulitple additions, so let's clear the socket read set */
|
|
|
|
socketSetR.reset();
|
|
|
|
}
|
|
|
|
|
|
|
|
/* There is no else as the only socket set checked for IS read */
|
|
|
|
}
|
|
|
|
/* If the socket is not connected (network error) */
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/* TODO: Maybe handle? */
|
|
|
|
writeln("We have socket error");
|
|
|
|
|
|
|
|
// throw new TristanableException(manager, "Network error with endpoint socket");
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
2022-05-10 13:25:05 +00:00
|
|
|
|
2020-09-29 09:57:25 +00:00
|
|
|
/* Block for socket response */
|
2022-05-10 13:27:39 +00:00
|
|
|
bool recvStatus = receiveMessage(endpoint, receivedPayload);
|
2020-09-29 09:57:25 +00:00
|
|
|
|
|
|
|
/* If the receive was successful */
|
|
|
|
if(recvStatus)
|
|
|
|
{
|
|
|
|
/* Decode the ttag-encoded message */
|
|
|
|
DataMessage message = DataMessage.decode(receivedPayload);
|
|
|
|
|
2020-09-29 17:13:36 +00:00
|
|
|
/* TODO: Remove isTag, improve later, oneshot */
|
|
|
|
|
|
|
|
/* The matching queue (if any) */
|
|
|
|
Queue queue = manager.getQueue(message.getTag());
|
|
|
|
|
|
|
|
/* If the tag belongs to a queue */
|
|
|
|
if(queue)
|
|
|
|
{
|
|
|
|
/* Add an item to this queue */
|
|
|
|
queue.enqueue(new QueueItem(message.getData()));
|
|
|
|
}
|
|
|
|
/* If the tag is unknwon */
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/* TODO: Add to dropped queue? */
|
2020-09-29 09:57:25 +00:00
|
|
|
|
2020-09-29 17:13:36 +00:00
|
|
|
/* Do nothing */
|
|
|
|
}
|
2020-09-29 09:57:25 +00:00
|
|
|
}
|
|
|
|
/* If the receive failed */
|
|
|
|
else
|
|
|
|
{
|
2021-09-08 18:24:45 +00:00
|
|
|
/* TODO: depending on `running`, different error */
|
|
|
|
|
2020-09-29 09:57:25 +00:00
|
|
|
/* TODO: Stop everything */
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Like in `dequeue` we don't want the possibility
|
|
|
|
* of racing back to the top of the loop and locking
|
|
|
|
* the mutex again right before a thread switch,
|
|
|
|
* so we make sure that a switch occurs to a different
|
|
|
|
* thread
|
|
|
|
*/
|
|
|
|
Thread.getThis().yield();
|
|
|
|
}
|
2021-09-08 18:24:45 +00:00
|
|
|
|
|
|
|
/* Check if we had an error */
|
|
|
|
if(running)
|
|
|
|
{
|
2022-05-17 12:42:29 +00:00
|
|
|
/* Unblock all current Queue operations and prevent future ones */
|
|
|
|
manager.invalidate();
|
|
|
|
|
|
|
|
/* TODO: Remove this */
|
|
|
|
// throw new TristanableException(manager, "bformat socket error");
|
2021-09-08 18:24:45 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/* Actual shut down, do nothing */
|
2022-05-17 12:42:29 +00:00
|
|
|
|
|
|
|
/* Unblock all current Queue operations and prevent future ones */
|
|
|
|
manager.invalidate();
|
2021-09-08 18:24:45 +00:00
|
|
|
}
|
2020-09-29 09:57:25 +00:00
|
|
|
}
|
|
|
|
}
|