1 module osc.server; 2 3 import std.socket; 4 import std.container; 5 import core.thread; 6 import core.sync.mutex; 7 import osc.message; 8 import osc.packet; 9 import osc.bundle; 10 import std.datetime : msecs; 11 12 13 /++ 14 +/ 15 class PullServer { 16 private: 17 UdpSocket _socket; 18 ubyte[] recvBuffer; 19 20 public: 21 ~this() { 22 close(); 23 } 24 25 this(ushort port) { 26 this(new InternetAddress ("0.0.0.0", port)); 27 } 28 29 /// 30 this(InternetAddress internetAddress) { 31 import std.socket; 32 _socket = new UdpSocket(); 33 _socket.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, 16.msecs); 34 _socket.bind (internetAddress); 35 recvBuffer = new ubyte[ushort.max]; 36 } 37 38 /** 39 Attempts to recieve data from the socket 40 If no data is recieved for 16 milliseconds this function returns empty-handed. 41 */ 42 const(Message)[] receive() { 43 ptrdiff_t l = _socket.receive(recvBuffer); 44 if( l != UdpSocket.ERROR ) { 45 return Packet(recvBuffer[0..l]).messages; 46 } 47 return null; 48 } 49 50 void close() { 51 _socket.close(); 52 } 53 } 54 55 /++ 56 +/ 57 class Server { 58 private: 59 bool shouldRun; 60 Messages _messages; 61 Thread _thread; 62 Socket socket; 63 ubyte[] recvBuffer; 64 65 void receive(Socket socket) { 66 while(shouldRun) { 67 ptrdiff_t l = socket.receive(recvBuffer); 68 if (l != UdpSocket.ERROR) { 69 _messages.pushMessages(Packet(recvBuffer[0..l]).messages); 70 } 71 } 72 } 73 74 public: 75 76 /// Construct a server 77 this(ushort port) { 78 this(new InternetAddress ("0.0.0.0", port)); 79 } 80 81 /// 82 this(InternetAddress internetAddress) { 83 import std.socket; 84 _messages = new Messages; 85 socket = new UdpSocket(); 86 recvBuffer = new ubyte[ushort.max]; 87 socket.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, 16.msecs); 88 socket.bind (internetAddress); 89 90 shouldRun = true; 91 _thread = new Thread(() => receive(socket)); 92 _thread.start(); 93 } 94 95 /// 96 ~this() { 97 close(); 98 } 99 100 const(Message)[] popMessages() { 101 return _messages.popMessages; 102 } 103 104 void close() { 105 if(_thread) { 106 shouldRun = false; 107 _thread.join; 108 } 109 if (socket) { 110 socket.close(); 111 } 112 } 113 } 114 115 /++ 116 +/ 117 private class Messages { 118 private: 119 const(Message)[] _contents; 120 Mutex mtx; 121 122 public: 123 this() { 124 mtx = new Mutex(); 125 } 126 127 const(Message)[] popMessages() { 128 mtx.lock; scope(exit)mtx.unlock; 129 const(Message)[] result = cast(const(Message)[])(_contents); 130 _contents = []; 131 return result; 132 } 133 134 void pushMessages(const(Message)[] messages) { 135 mtx.lock; 136 _contents ~= cast(const(Message)[])messages; 137 mtx.unlock; 138 } 139 140 size_t length() const { 141 return _contents.length; 142 } 143 } 144 145 private{ 146 const(Message)[] messages(in Packet packet) { 147 const(Message)[] list; 148 if(packet.hasMessage) { 149 list ~= packet.message; 150 } 151 if(packet.hasBundle) { 152 list = messagesRecur(packet.bundle); 153 } 154 return list; 155 156 } 157 158 const(Message)[] messagesRecur(in Bundle bundle) { 159 const(Message)[] list; 160 foreach (ref element; bundle.elements) { 161 if(element.hasMessage) { 162 list ~= element.message; 163 } 164 if(element.hasBundle) { 165 list ~= element.bundle.messagesRecur; 166 } 167 } 168 return list; 169 } 170 }