Compare commits
6 Commits
Author | SHA1 | Date |
---|---|---|
|
7de5d00c60 | |
|
57e72899e3 | |
|
d97853cc6f | |
|
281b6acf65 | |
|
c24af8e17a | |
|
199886cd8f |
|
@ -1,12 +1,12 @@
|
|||
module tristanable.watcher;
|
||||
|
||||
import std.socket : Socket;
|
||||
import std.socket : Socket, SocketSet;
|
||||
import core.sync.mutex : Mutex;
|
||||
import bmessage : receiveMessage;
|
||||
import tristanable.queue : Queue;
|
||||
import tristanable.queueitem : QueueItem;
|
||||
import tristanable.manager : Manager;
|
||||
import core.thread : Thread;
|
||||
import core.thread : Thread, Duration, dur;
|
||||
import tristanable.encoding;
|
||||
import tristanable.exceptions;
|
||||
|
||||
|
@ -16,26 +16,57 @@ public final class Watcher : Thread
|
|||
private Manager manager;
|
||||
|
||||
/* The socket to read from */
|
||||
private Socket socket;
|
||||
private Socket endpoint;
|
||||
|
||||
private bool running;
|
||||
|
||||
this(Manager manager, Socket endpoint)
|
||||
|
||||
private bool newSys;
|
||||
private SocketSet socketSetR, socketSetW, socketSetE;
|
||||
|
||||
|
||||
/**
|
||||
* Timeout for select()
|
||||
*/
|
||||
private Duration timeOut;
|
||||
|
||||
this(Manager manager, Socket endpoint, Duration timeOut = dur!("msecs")(100), bool newSys = false)
|
||||
{
|
||||
super(&run);
|
||||
this.manager = manager;
|
||||
socket = endpoint;
|
||||
this.endpoint = endpoint;
|
||||
|
||||
|
||||
this.newSys = newSys;
|
||||
if(newSys)
|
||||
{
|
||||
initSelect();
|
||||
}
|
||||
|
||||
|
||||
this.timeOut = timeOut;
|
||||
|
||||
running = true;
|
||||
start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the SocketSet which is needed for the use
|
||||
* of the select() method0
|
||||
*/
|
||||
private void initSelect()
|
||||
{
|
||||
socketSetR = new SocketSet();
|
||||
socketSetW = new SocketSet();
|
||||
socketSetE = new SocketSet();
|
||||
}
|
||||
|
||||
public void shutdown()
|
||||
{
|
||||
running=false;
|
||||
|
||||
/* Close the socket, causing an error, breaking the event loop */
|
||||
socket.close();
|
||||
endpoint.close();
|
||||
|
||||
}
|
||||
|
||||
|
@ -47,8 +78,82 @@ public final class Watcher : Thread
|
|||
/* Receive payload (tag+data) */
|
||||
byte[] receivedPayload;
|
||||
|
||||
|
||||
if(newSys)
|
||||
{
|
||||
/**
|
||||
* We want to check the readable status of the `endpoint` socket, we use
|
||||
* the `select()` function for this. However, after selecting it will need
|
||||
* to be re-added if you want to check again. Example, if you add it to
|
||||
* the `socketSetR` (the readable-socketset) then if we time out or it
|
||||
* is not readable from it will be removed from said set.
|
||||
*
|
||||
* Therefore we will need to add it back again for our next check (via
|
||||
* calling `select()`)
|
||||
*/
|
||||
socketSetR.add(endpoint);
|
||||
int status = Socket.select(socketSetR, null, null, timeOut);
|
||||
|
||||
import std.stdio : writeln;
|
||||
|
||||
/* If we timed out on the select() */
|
||||
if(status == 0)
|
||||
{
|
||||
/* Check if we need to exit */
|
||||
writeln("We got 0");
|
||||
|
||||
continue;
|
||||
}
|
||||
/* Interrupt */
|
||||
else if (status == -1)
|
||||
{
|
||||
/* TODO: Not sure what we should do here */
|
||||
writeln("We got -1");
|
||||
|
||||
import core.stdc.errno;
|
||||
writeln(errno);
|
||||
|
||||
continue;
|
||||
}
|
||||
/* Either data is available or a network occurred */
|
||||
else
|
||||
{
|
||||
writeln("Info: ", endpoint.isAlive);
|
||||
writeln("info: ", endpoint.handle);
|
||||
|
||||
writeln("We got ", status);
|
||||
|
||||
|
||||
/* If the socket is still connected */
|
||||
if(endpoint.isAlive())
|
||||
{
|
||||
/* If we have data */
|
||||
if(socketSetR.isSet(endpoint))
|
||||
{
|
||||
/* Do nothing (fall through) */
|
||||
writeln("We got ready socket");
|
||||
|
||||
/* I don't want to do mulitple additions, so let's clear the socket read set */
|
||||
socketSetR.reset();
|
||||
}
|
||||
|
||||
/* There is no else as the only socket set checked for IS read */
|
||||
}
|
||||
/* If the socket is not connected (network error) */
|
||||
else
|
||||
{
|
||||
/* TODO: Maybe handle? */
|
||||
writeln("We have socket error");
|
||||
|
||||
// throw new TristanableException(manager, "Network error with endpoint socket");
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/* Block for socket response */
|
||||
bool recvStatus = receiveMessage(socket, receivedPayload);
|
||||
bool recvStatus = receiveMessage(endpoint, receivedPayload);
|
||||
|
||||
/* If the receive was successful */
|
||||
if(recvStatus)
|
||||
|
|
Loading…
Reference in New Issue