1 module osc.server;
2 
3 import std.socket;
4 import std.container;
5 import core.thread;
6 import core.sync.mutex;
7 import osc.message;
8 import osc.packet;
9 import osc.bundle;
10 import std.datetime : msecs;
11 
12 
13 /++
14 +/
15 class PullServer {
16 private:
17     UdpSocket _socket;
18     ubyte[] recvBuffer;
19 
20 public:
21     ~this() {
22         close();
23     }
24 
25     this(ushort port) {
26         this(new InternetAddress ("0.0.0.0", port));
27     }
28 
29     ///
30     this(InternetAddress internetAddress) {
31         import std.socket;
32         _socket = new UdpSocket();
33         _socket.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, 16.msecs);
34         _socket.bind (internetAddress);
35         recvBuffer = new ubyte[ushort.max];
36     }
37 
38     /**
39         Attempts to recieve data from the socket
40         If no data is recieved for 16 milliseconds this function returns empty-handed.
41     */
42     const(Message)[] receive() {
43         ptrdiff_t l = _socket.receive(recvBuffer);
44         if( l != UdpSocket.ERROR ) {
45             return Packet(recvBuffer[0..l]).messages;
46         }
47         return null;
48     }
49 
50     void close() {
51         _socket.close();
52     }
53 }
54 
55 /++
56 +/
57 class Server {
58 private:
59     bool shouldRun;
60     Messages _messages;
61     Thread _thread;
62     Socket socket;
63     ubyte[] recvBuffer;
64     
65     void receive(Socket socket) {
66         while(shouldRun) {
67             ptrdiff_t l = socket.receive(recvBuffer);
68             if (l != UdpSocket.ERROR) {
69                 _messages.pushMessages(Packet(recvBuffer[0..l]).messages);
70             }
71         }
72     }
73 
74 public:
75 
76     /// Construct a server
77     this(ushort port) {
78         this(new InternetAddress ("0.0.0.0", port));
79     }
80     
81     ///
82     this(InternetAddress internetAddress) {
83         import std.socket;
84         _messages = new Messages;
85         socket = new UdpSocket();
86         recvBuffer = new ubyte[ushort.max];
87         socket.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, 16.msecs);
88         socket.bind (internetAddress);
89 
90         shouldRun = true;
91         _thread = new Thread(() => receive(socket));
92         _thread.start();
93     }
94     
95     ///
96     ~this() {
97         close();
98     }
99 
100     const(Message)[] popMessages() {
101         return _messages.popMessages;
102     }
103 
104     void close() {
105         if(_thread) {
106             shouldRun = false;
107             _thread.join;
108         }
109         if (socket) {
110             socket.close();
111         }
112     }
113 }
114 
115 /++
116 +/
117 private class Messages {
118 private:
119         const(Message)[] _contents;
120         Mutex mtx;
121 
122 public:
123     this() {
124         mtx = new Mutex();
125     }
126 
127     const(Message)[] popMessages() {
128         mtx.lock; scope(exit)mtx.unlock;
129         const(Message)[] result = cast(const(Message)[])(_contents);
130         _contents = [];
131         return result;
132     }
133 
134     void pushMessages(const(Message)[] messages) {
135         mtx.lock;
136         _contents ~= cast(const(Message)[])messages;
137         mtx.unlock;
138     }
139 
140     size_t length() const {
141         return _contents.length;
142     }
143 }
144 
145 private{
146     const(Message)[] messages(in Packet packet) {
147         const(Message)[] list;
148         if(packet.hasMessage) {
149             list ~= packet.message;
150         }
151         if(packet.hasBundle) {
152             list = messagesRecur(packet.bundle);
153         }
154         return list;
155         
156     }
157     
158     const(Message)[] messagesRecur(in Bundle bundle) {
159         const(Message)[] list;
160         foreach (ref element; bundle.elements) {
161             if(element.hasMessage) {
162                 list ~= element.message;
163             }
164             if(element.hasBundle) {
165                 list ~= element.bundle.messagesRecur;
166             }
167         }
168         return list;
169     }
170 }